大数据开发工程师-第六周 第二章 剖析数据倾向问题与企业级解决方案


第六周 第二章 剖析数据倾向问题与企业级解决方案

1
2
3
4
5
6
  在实际工作中,如果我们想提高MapReduce的执行效率,最直接的方法是什么呢?
我们知道MapReduce是分为Map阶段和Reduce阶段,其实提高执行效率就是提高这两个阶段的执行效率默认情况下Map阶段中Map任务的个数是和数据的InputSplit相关的,InputSplit的个数一般是和Block块是有关联的,所以可以认为Map任务的个数和数据的block块个数有关系,针对Map任务的个数我们一般是不需要干预的,除非是前面我们说的海量小文件,那个时候可以考虑把小文件合并成大文件。其他情况是不需要调整的,那就剩下Reduce阶段了,咱们前面说过,默认情况下reduce的个数是1个,所以现在MapReduce任务的压力就集中在Reduce阶段了,如果说数据量比较大的时候,一个reduce任务处理起来肯定是比较慢的,所以我们可以考虑增加reduce任务的个数,这样就可以实现数据分流了,提高计算效率了。
但是注意了,如果增加Reduce的个数,那肯定是要对数据进行分区的,分区之后,每一个分区的数据会被一个reduce任务处理。

那如何增加分区呢?
我们来看一下代码,进入WordCountJob中,其实我们可以通过job.setPartitionerClass 来设置分区类,不过目前我们是没有设置的,那框架中是不是有默认值啊,是有的,我们可以通过 job.getPartitionerClass方法看到默认情况下会使用HashPartitioner 这个分区类

源码解析

1
2
3
4
5
6
7
8
9
10
11
12
那我们来看一下HashPartitioner的实现是什么样子的

/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
1
2
3
4
5
6
7
8
9
HashPartitioner继承了Partitioner,这里面其实就一个方法,getPartition,其实map里面每一条数据都会进入这个方法来获取他们所在的分区信息,这里的key就是k2,value就是v2
主要看里面的实现

(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks

其实起决定性的因素就是 numReduceTasks 的值,这个值默认是1,通过 job.getNumReduceTasks() 可知。
所以最终任何值%1 都返回0,那也就意味着他们都在0号分区,也就只有这一个分区。
如果想要多个分区,很简单,只需要把 numReduceTasks 的数目调大即可(当为5时,取余后的值可能为0,1,2,3,4,也就是五个分区),这个其实就reduce任务的数量,那也就意味着,只要redcue任务数量变大了,对应的分区数也就变多了,有多少个分区就会有多少个reduce任务,那我们就不需要单独增加分区的数量了,只需要控制好Redcue任务的数量即可。
增加redcue任务个数在一定场景下是可以提高效率的,但是在一些特殊场景下单纯增加reduce任务个数是无法达到质的提升的。

案例分析

1
2
3
4
5
下面我们来分析一个场景:
假设我们有一个文件,有1000W条数据,这里面的值主要都是数字,1,2,3,4,5,6,7,8,9,10,我们希望统计出来每个数字出现的次数
其实在私底下我们是知道这份数据的大致情况的,这里面这1000w条数据,值为5的数据有910w条左右,剩下的9个数字一共只有90w条,那也就意味着,这份数据中,值为5的数据比较集中,或者说值为5的数据属于倾斜的数据,在这一整份数据中,它占得比重比其他的数据多得多。
下面我们画图来具体分析一下:
假设这1000W条数据的文件有3个block,会产生3个InputSplt,最终会产生3个Map任务,默认情况下只有一个reduce任务,所以所有的数据都会让这一个reduce任务处理,这样这个Reduce压力肯定很大,大量的时间都消耗在了这里

H660gK.png

1
2
那根据我们前面的分析,我们可以增加reduce任务的数量,看下面这张图,我们把reduce任务的数量调
整到10个,这个时候就会把1000w条数据让这10 个reduce任务并行处理了,这个时候效率肯定会有一定的提升,但是最后我们会发现,性能提升是有限的,并没有达到质的提升,那这是为什么呢?

H66w36.png

1
2
3
4
5
6
7
8
9
10
11
12
  我们来分析一下,刚才我们说了我们这份数据中,值为5的数据有910w条,这就占了整份数据的90%了,那这90%的数据会被一个reduce任务处理,在这里假设是让reduce5处理了,reduce5这个任务执行的是比较慢的,其他reduce任务都执行结束很长时间了,它还没执行结束,因为reduce5中处理的数据量和其他reduce中处理的数据量规模相差太大了,所以最终reduce5拖了后腿。咱们mapreduce任务执行消耗的时间是一直统计到最后一个执行结束的reduce任务,所以就算其他reduce任务早都执行结束了也没有用,整个mapreduce任务是没有执行结束的。

那针对这种情况怎么办?
这个时候单纯的增加reduce任务的个数已经不起多大作用了,如果启动太多可能还会适得其反。
其实这个时候最好的办法是把这个值为5的数据尽量打散,把这个倾斜的数据分配到其他reduce任务中去计算,这样才能从根本上解决问题。

这就是我们要分析的一个数据倾斜的问题
MapReduce程序执行时,Reduce节点大部分执行完毕,但是有一个或者几个Reduce节点运行很慢,导致整个程序处理时间变得很长
具体表现为:Ruduce阶段一直卡着不动
根据刚才的分析,有两种方案
1. 增加reduce任务个数,这个属于治标不治本,针对倾斜不是太严重的数据是可以解决问题的,针对倾斜严重的数据,这样是解决不了根本问题的
2. 把倾斜的数据打散这种可以根治倾斜严重的数据。

案例实操

1
2
3
4
5
6
7
8
现在呢我们通过理论层面分析完了,那接下来我们来具体进入一个实际案例上手操作一下
还使用我们刚才说的那一份数据,1000w条的,其中值为5的大致有910w条左右
其他的加起来一共90万条左右。
这个数据文件我已经生成好了,直接上传到linux服务器上就可以,上传到/data/soft目录下

这个文件有点大,在windows本地无法打开,在这里我们去一条数据看一下数据格式,前面是一个数字,后面是一行日志,这个数据是我自己造的,我们主要是使用前面的这个数字,后面的内容主要是为了充数的,要不然文件太小,测试不出来效果。后面我们解析数据的时候只获取前面这个数字即可,前面这个数字是1-10之间的数字
接下来把这个文件上传到hdfs上
[root@bigdata01 soft]# ll

image-20230316010644262

1
[root@bigdata01 soft]# tail -1 hello_10000000.dat

image-20230316010829424

1
2
3
4
接下来把这个文件上传到hdfs上
[root@bigdata01 soft]# hdfs dfs -put hello_10000000.dat /
[root@bigdata01 soft]# hdfs dfs -ls /
-rw-r--r-- 2 root supergroup 1860100000 2020-04-27 22:01 /hello_10000000.d
1
下面我们来具体跑一个这份数据,首先复制一份WordCountJob的代码,新的类名为 WordCountJobSkew
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.imooc.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* 数据倾斜-增加Reduce任务个数
*
* Created by xuwei
*/
public class WordCountJobSkew {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text,Text,LongW
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//输出k1,v1的值
//System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//把单词封装成<k2,v2>的形式
Text k2 = new Text(words[0]);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2,v2);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 
/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongW
Logger logger = LoggerFactory.getLogger(MyReducer.class);
/**
* 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context co
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for(LongWritable v2: v2s){
//输出k2,v2的值
//System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+"
//logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
sum += v2.get();
//模拟Reduce的复杂计算消耗的时间
if(sum % 200 ==0){
Thread.sleep(1);
}
}
//组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//输出k3,v3的值
//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
//logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
// 把结果写出去
context.write(k3,v3);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
if(args.length!=3){
//如果传递的参数不够,程序直接退出
System.exit(100);
}
//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个
job.setJarByClass(WordCountJobSkew.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3的类型
job.setOutputKeyClass(Text.class);
//指定v3的类型
job.setOutputValueClass(LongWritable.class);
//设置reduce任务个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
//提交job
job.waitForCompletion(true);
}catch(Exception e){
e.printStackTrace();
}
}
}
1
2
3
对项目代码进行重新编译、打包,提交到集群去执行
第一次先使用一个reduce任务执行
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-depencies.jar com.imoooc.mr.WordCountJobSkew /hello_10000000 /out10000000 1

image-20230316013044840

image-20230316013213544

image-20230316013304301

image-20230316013408168

image-20230316013625018

1
kill有两个的原因是:当一个某一map任务执行很慢时,会再启动一个map任务去计算同一份数据,其中一个计算执行完成后,会kill掉未执行完那个(map推测执行)

image-20230316014001592

1
查看结果

image-20230316014045366

image-20230316014302669

1
2
接下来增加reduce任务的数量,增加到10个
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-depencies.jar com.imoooc.mr.WordCountJobSkew /hello_10000000 /out10000000 10

image-20230316014638500

1
这里的这两个reduce任务也是推测执行,处理的同一份数据
1
推测执行可以关掉

image-20230316014826952

image-20230316014950983

image-20230316015046078

1
这里显示比第一次1个reduce任务的执行时间还长,虽然和机器当前状态有关,但至少得出这种情况下(数据倾斜)单纯加大reduce任务数量,并不会产生明显的速度提升

把倾斜数据打散

1
2
3
怎么打散呢?其实就是给他加上一些有规律的随机数字就可以了
在这里我们这样处理,我把5这个数值的数据再分成10份,所以我就在这个数值5后面拼上一个0~9的随机数即可。
针对这个操作我们需要去修改代码,在这里我们再重新复制一个类,基于WordCountJobSkew复制,新的类名是 WordCountJobSkewRandKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package com.imooc.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Random;
/**
* 数据倾斜-把倾斜的数据打散
*
* Created by xuwei
*/
public class WordCountJobSkewRandKey {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text,Text,LongW
Logger logger = LoggerFactory.getLogger(MyMapper.class);
Random random = new Random();
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//输出k1,v1的值
//System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//把单词封装成<k2,v2>的形式
String key = words[0];
if("5".equals(key)){
//把倾斜的key打散,分成10份
key = "5"+"_"+random.nextInt(10);
}
Text k2 = new Text(key);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2,v2);
}
}
/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongW
Logger logger = LoggerFactory.getLogger(MyReducer.class);
/**
* 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context co
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for(LongWritable v2: v2s){
//输出k2,v2的值
//System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+"
//logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
sum += v2.get();
//模拟Reduce的复杂计算消耗的时间
if(sum % 200 ==0){
Thread.sleep(1);
}
}
//组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//输出k3,v3的值
//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
//logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
// 把结果写出去
context.write(k3,v3);
}
}
/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
if(args.length!=3){
//如果传递的参数不够,程序直接退出
System.exit(100);
}
//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个
job.setJarByClass(WordCountJobSkewRandKey.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3的类型
job.setOutputKeyClass(Text.class);
//指定v3的类型
job.setOutputValueClass(LongWritable.class);
//设置reduce任务个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
//提交job
job.waitForCompletion(true);
}catch(Exception e){
e.printStackTrace();
}
}
1
2
3
4
只需要在map中把k2的值修改一下就可以了,这样就可以把值为5的数据打散了。
编译打包,提交到集群
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -rm -r /out10000000
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-depencies.jar com.imoooc.mr.WordCountJobSkewRandKey /hello_10000000 /out10000000 10
1
注意,这个时候获取到的并不是最终的结果,因为我们把值为5的数据随机分成多份了,最多分成10份

image-20230316021315233

1
2
任务总的执行消耗时间为: Elapsed: 1mins, 39sec
这次任务执行时间节省了1分钟多的左右,在这就属于质的提升了,相当于节省了将近一半的时间了

image-20230316021422004

1
2
但是这个时候我们获取到的最终结果是一个半成品,还需要进行一次加工,其实我们前面把这个倾斜的数据打散之后相当于做了一个局部聚合,现在还需要再开发一个mapreduce任务再做一次全局聚合,其实也很简单,获取到上一个map任务的输出,在map端读取到数据之后,对数据先使用空格分割,然后对第一列的数据再使用下划线分割,分割之后总是取第一列,这样就可以把值为5的数据还原出来了,这个时候数据一共就这么十几条,怎么处理都很快了,这个代码就给大家留成作业了,我们刚才已经把详细的过程都分析过了,大家下去之后自己写一下,如果遇到了问题,可以在咱们的问答区一块讨论,或者直接找我都是可以的。
这就是针对数据倾斜问题的处理方法,面试的时候经常问到,大家一定要能够把这个思路说明白。

编写代码

1
2
3
4
主要设置:
1.map对k1的处理,
2.reduce阶段模拟复杂数据处理,
3.job.setNumReduceTasks(num);

自己创建的测试数据

1
2
3
一共5000000行
60% 5
5% 1 2 3 4 6 7 8 9

map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// k1代表每一行数据的行首偏移量,v1代表的是每一行的数据
// 需要做的是:把每一行数据的单词切割出来
//logger.info("<k1, v1>: <"+k1.get()+", "+v1.toString()+">");
//System.out.println("<k1, v1>: <"+k1.get()+", "+v1.toString()+">");
String[] words = v1.toString().split(" ");

// 把切割出来的单词,封装成<k2, 1>
Text k2 = new Text(words[0]);
LongWritable v2 = new LongWritable(1L);
context.write(k2,v2);

//super.map(key, value, context);
}

reduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static class myReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
//Logger logger = LoggerFactory.getLogger(myReducer.class);
/**
* 对<k2, {v2...}>的数据进行累加求和,生成<k3,v3>
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
long sum = 0;
for(LongWritable v2:v2s){
//logger.info("<k2, v2>: <"+k2.toString()+", "+v2.get()+">");
//System.out.println("<k2, v2>: <"+k2.toString()+", "+v2.get()+">");
sum += v2.get();
//模拟Reduce的复杂计算消耗的时间
if(sum%200==0){
Thread.sleep((1));
}
}
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//logger.info("<k3, v3>: <"+k3.toString()+", "+v3.get()+">");
//System.out.println("<k3, v3>: <"+k3.toString()+", "+v3.get()+">");
context.write(k3, v3);
// super.reduce(key, values, context);
}
}

main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public static void main(String[] args){
try {
if(args.length!=3){
System.exit(100);
System.out.println("缺少路径参数!!!");
}
// 指定Job需要的配置参数
Configuration conf = new Configuration();
// 创建一个Job
Job job = Job.getInstance(conf);
// 注意:这一行必须设置,否者在集群中执行时找不到WordCountJob这个类
job.setJarByClass(WordCountJobSkew.class);

// 指定输入路径,可以是文件也可以是目录(目录里只有一个文件时可以); 注意FileInputFormat别选成hadoop1.x的了
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定输出路径(只能是hdfs上一个不存在的目录); 注意FileOutFormat别选成hadoop1.x的了
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 指定map相关代码
job.setMapperClass(myMapper.class);
// 指定k2类型
job.setMapOutputKeyClass(Text.class);
// 指定v2类型
job.setMapOutputValueClass(LongWritable.class);

//指定reduce相关代码
job.setReducerClass(myReducer.class);
// 指定k3类型
job.setOutputKeyClass(Text.class);
// 指定v3类型
job.setOutputValueClass(LongWritable.class);
//
job.setNumReduceTasks(Integer.parseInt(args[2]));
// 提交job
job.waitForCompletion(true);

}catch(Exception e){
e.printStackTrace();
}

}

结果查看

H6xkFA.md.png

1个reduceTask

1
hadoop jar bigdata_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mc.WordCountJobSkew /test/shujuqingxie/QingXieData.txt /test/shujuqingxie/outcome1 1

H6x4kd.md.png

1
具体分析页面打不开

9个reduceTask

1
hadoop jar bigdata_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mc.WordCountJobSkew /test/shujuqingxie/QingXieData.txt /test/shujuqingxie/outcome1 9

分析

HczLlt.md.png

1
2
3
4
所以从这可以看出来,性能提升并不大

具体分析Reduce任务的执行时间
这里由于有10个reduce,所以一共有10行,在这我们截取了一部分,其中这里面有一个reduce任务消耗的时间比较长,其他reduce任务的执行时间都是4~5秒,这个reduce任务的执行时间是1分26秒,那就意味着值为5的那910w数据进入到这个reduce了,所以它执行的比较慢。

Hcpypn.md.png

1
map任务14个里面,kill 2个是因为“推测执行”,当map任务里某些maptask任务相比其它maptask明显慢很多时(会认为是有异常),会自动产生新的maptask任务,两个一起执行,最后杀掉慢的 下面这是reduce阶段某个reducetask的推测执行

HgCe78.md.png

把倾斜的数据打散(面试)

1
2
3
4
  那我们再把reduce任务的个数提高一下,会不会提高性能呢?不会了,刚才从1个reduce任务提高到10个reduce任务时间也就减少了三四秒钟,所以再增加reduce任务的个数就没有多大意义了。
那接下来就需要使用我们的绝招了,把倾斜的数据打散,在这里就是把5这个数字打散,
怎么打散呢?其实就是给他加上一些有规律的随机数字就可以了
在这里我们这样处理,我把5这个数值的数据再分成10份,所以我就在这个数值5后面拼上一个0~9的随机数即可。

map

1
2
3
4
5
6
7
8
9
10
11
只需要在map中把k2的值修改一下就可以了,这样就可以把值为5的数据打散了。
编译打包,提交到集群
String[] words = v1.toString().split(" ");
String key = words[0];
if("5".equals(key)){
key = "5_"+random.nextInt(10);
}
// 把切割出来的单词,封装成<k2, 1>
Text k2 = new Text(key);
LongWritable v2 = new LongWritable(1L);
context.write(k2,v2);

reduce

1
没变

分析

1
2
3
4
5
6
注意,这个时候获取到的并不是最终的结果,因为我们把值为5的数据随机分成多份了,最多分成10份

任务总的执行消耗时间为: Elapsed: 1mins, 39sec
这次任务执行时间节省了1分钟多的左右,在这就属于质的提升了,相当于节省了将近一半的时间了

查看一下reduce任务执行情况,在这里就没有发现特别耗时的reduce任务了,消耗的时间几乎都差不多

HgM8TH.png
HgMJkd.md.png
HgMYtA.md.png

1
2
 但是这个时候我们获取到的最终结果是一个半成品,还需要进行一次加工,其实我们前面把这个倾斜的数据打散之后相当于做了一个局部聚合,现在还需要再开发一个mapreduce任务再做一次全局聚合,其实也很简单,获取到上一个map任务的输出,在map端读取到数据之后,对数据先使用空格分割,然后对第一列的数据再使用下划线分割,分割之后总是取第一列,这样就可以把值为5的数据还原出来了,这个时候数据一共就这么十几条,怎么处理都很快了,这个代码就给大家留成作业了,我们刚才已经把详细的过程都分析过了,大家下去之后自己写一下,如果遇到了问题,可以在咱们的问答区一块讨论,或者直接找我都是可以的。
这就是针对数据倾斜问题的处理方法,面试的时候经常问到,大家一定要能够把这个思路说明白。

HWm7uT.png


本文标题:大数据开发工程师-第六周 第二章 剖析数据倾向问题与企业级解决方案

文章作者:TTYONG

发布时间:2022年02月13日 - 14:02

最后更新:2023年06月06日 - 20:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%85%AD%E5%91%A8-%E7%AC%AC%E4%BA%8C%E7%AB%A0-%E5%89%96%E6%9E%90%E6%95%B0%E6%8D%AE%E5%80%BE%E5%90%91%E9%97%AE%E9%A2%98%E4%B8%8E%E4%BC%81%E4%B8%9A%E7%BA%A7%E8%A7%A3%E5%86%B3%E6%96%B9%E6%A1%88.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%