大数据开发工程师-第五周 第四章 精讲Shuffle执行过程及源码分析输入输出


第五周 第四章 精讲Shuffle执行过程及源码分析输入输出

Shuffle过程详解

1
咱们前面简单说过,shuffer是一个网络拷贝的过程,是指通过网络把数据从map端拷贝到reduce端的过程,下面我们来详细分析一下这个过程

HwABhq.png

1
2
3
接下来我们来根据这张图分析一下shuffle的一些细节信息,
首先看map阶段,最左边有一个input split,最终会产生一个map任务,map任务在执行的时候会把k1,v1转化为k2,v2,这些数据会先临时存储到一个内存缓冲区中,这个内存缓冲区的大小默认是100M(io.sort.mb属性),当达到内存缓冲区大小的80%(io.sort.spill.percent)也就是80M的时候,会把内存中的数据溢写到本地磁盘中(mapred.local.dir),一直到map把所有的数据都计算完,最后会把内存缓冲区中的数据一次性全部刷新到本地磁盘文件中,在这个图里面表示产生了3个临时文件,每个临时文件中有3个分区,这是由于map阶段中对数据做了分区,所以数据在存储的时候,在每个临时文件中也划分为了3块,最后需要对这些临时文件进行合并,合并为一个大文件,因为一个map任务最终只会产生一个文件,这个合并之后的文件也是有3个分区的,这3个分区的数据会被shuffle线程分别拷贝到三个不同的reduce节点,图里面只显示了一个reduce节点,下面还有两个没有显示。不同map任务中的相同分区的数据会在同一个reduce节点进行合并,合并以后会执行reduce的功能,最终产生结果数据。
在这里shuffle其实是横跨map端和reduce端的,它主要是负责把map端产生的数据通过网络拷贝到reduce阶段进行统一聚合计算

Hadoop中序列化机制

HwA0Nn.png

1
2
3
4
5
6
  我们的map阶段在读取数据的是需要从hdfs中读取的,这里面需要经过磁盘IO和网络IO,不过正常情况下map任务会执行本地计算,也就是map任务会被分发到数据所在的节点进行计算,这个时候,网络io几乎就没有了,就剩下了磁盘io,再往后面看,map阶段执行完了以后,数据会被写入到本地磁盘文件,这个时候也需要经过磁盘io,后面的shuffle拷贝数据其实也需要先经过磁盘io把数据从本地磁盘读出来再通过网络发送到reduce节点,再写入reduce节点的本地磁盘,然后reduce阶段在执行的时候会经过磁盘io读取本地文件中的数据,计算完成以后还会经过磁盘io和网络io把数据写入到hdfs中。
经过我们刚才的分析,其实在这里面占得比重最高的是磁盘io,所以说影响mapreduce任务执行效率的主要原因就是磁盘io,如果想要提高任务执行效率,就需要从这方面着手分析。

当程序在向磁盘中写数据以及从磁盘中读取数据的时候会对数据进行序列化和反序列化,磁盘io这些步骤我们省略不了,但是我们可以从序列化和反序列化这一块来着手做一些优化,首先我们分析一下序列化和反序列化,看这个图,当我们想把内存中的数据写入到文件中的时候,会对数据序列化,然后再写入,这个序列化其实就是把内存中的对象信息转成二进制的形式,方便存储到文件中,默认java中的序列化会把对象及其父类、超类的整个继承体系信息都保存下来,这样存储的信息太大了,就会导致写入文件的信息过大,这样写入是会额外消耗性能的。
反序列化也是一样,reduce端想把文件中的对象信息加载到内存中,如果文件很大,在加载的时候也会额外消耗很多性能,所以如果我们把对象存储的信息尽量精简,那么就可以提高数据写入和读取消耗的性能。
基于此,hadoop官方实现了自己的序列化和反序列化机制,没有使用java中的序列化机制,所以hadoop中的数据类型没有沿用java中的数据类型,而是自己单独设计了一些writable的实现了,例如、longwritable、text等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
那我们来看一下Hadoop中提供的常用的基本数据类型的序列化类

Java基本类型 Writable 序列化大小(字节)
布尔型(boolean) BooleanWritable 1
字节型(byte) ByteWritable 1
整型(int) IntWritable 4
VIntWritable 1~5
浮点型(float) FloatWritable 4
长整型(long) LongWritable 8
VLongWritable 1~9
双精度浮点型(double) DoubleWritable 8

在这需要注意一下
Text等价于java.lang.String的Writable,针对UTF-8序列
NullWritable是单例,获取实例使用NullWritable.get()

hadoop自己实现的序列化的特点

1
2
3
4
1. 紧凑: 高效使用存储空间
2. 快速: 读写数据的额外开销小
3. 可扩展: 可透明地读取老格式的数据
4. 互操作: 支持多语言的交互

java中序列化的不足

1
2
1. 不精简,附加信息多,不太适合随机访问
2. 存储空间大,递归地输出类的超类描述直到不再有超类

实战(序列化)

1
前面我们分析了Java中的序列化和Hadoop中的序列化,其实最主要的区别就是针对相同的数据,Java中的序列化会占用较大的存储空间,而Hadoop中的序列化可以节省很多存储空间,这样在海量数据计算的场景下,可以减少数据传输的大小,极大的提高计算效率

java序列化对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.imooc.mc;

import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
* java中的序列化
*/
public class javaSerialize {
public static void main(String[] args) throws Exception{
// 创建student对象,并设置id和name属性
StudentJava studentJava = new StudentJava();
studentJava.setId(1L);
studentJava.setName("Hadoop");

// 将student当前状态写入本地文件中
FileOutputStream fos =new FileOutputStream("D:\\student_java.txt");
ObjectOutputStream oos = new ObjectOutputStream(fos);
oos.writeObject(studentJava);
oos.close();
fos.close();
}
}

class StudentJava implements Serializable{ // 一个文件只能有一个主类
private long Id;
private String name;
private static final long serialVersionUID = 1L;

public long getId() {
return Id;
}

public void setId(long id) {
Id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

hadoop序列化对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.imooc.mc;
import org.apache.hadoop.io.Writable;
import java.io.*;

/**
* Hadoop序列化机制
*/
public class HadoopSerialize {
public static void main(String[] args) throws Exception{
//创建student对象并设置name,id属性
StudentWritable studentWritable = new StudentWritable();
studentWritable.setId(1L);
studentWritable.setName("Hadoop");

//将student对象当前状态写入到本地文件中
FileOutputStream fos = new FileOutputStream("D:\\student_hadoop.txt");
ObjectOutputStream oos = new ObjectOutputStream(fos);
studentWritable.write(oos); // 与java序列化不同的地方
oos.close();
fos.close();
}
}

class StudentWritable implements Writable{
private long Id;
private String name;

public long getId() {
return Id;
}

public void setId(long id) {
Id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

@Override // 多的部分
public void write(DataOutput out) throws IOException {
out.writeLong(this.Id);
out.writeUTF(this.name);
}

@Override // 多的部分
public void readFields(DataInput in) throws IOException {
this.Id=in.readLong();
this.name=in.readUTF();
}
}
1
执行这两个代码,最终会在D盘下产生两个文件,查看这两个文件的大小,最终发现Java序列化的文件大小是Hadoop序列化文件大小的10倍左右。

HwGZge.png

HwGV3D.png

小结

1
2
1.IDEA快速编写属性对应的方法
2.IDEA快速覆盖继承类的方法的方法

InputFormat分析

HwhRpT.md.png

InputFormat

1
2
3
Hadoop中有一个抽象类是InputFormat,InputFormat抽象类是MapReduce输入数据的顶层基类,这个抽象类中只定义了两个方法:
一个是getSplits方法
另一个是createRecordReader方法

InputFormat的子类

1
2
3
4
5
6
7
这个抽象类下面有三个子继承类,
DBInputFormat是操作数据库的,
FileInputFormat是操作文件类型数据的,
DelegatingInputFormat是用在处理多个输入时使用的
这里面比较常见的也就是FileInputFormat了,FileInputFormat是所有以文件作为数据源的基类,
FileInputFormat保存job输入的所有文件,并实现了对输入文件计算splits的方法,至于获得文件中数据
的方法是由子类实现的。

FileInputFormat的子类

1
2
3
4
5
6
FileInputFormat下面还有一些子类:
CombineFileInputFormat:处理小文件问题的,后面我们再详细分析
TextInputFormat:是默认的处理类,处理普通文本文件,他会把文件中每一行作为一个记录,将每一行的起始偏移量作为key,每一行的内容作为value,这里的key和value就是我们之前所说的k1,v1 它默认以换行符或回车键作为一行记录
NLineInputFormat:可以动态指定一次读取多少行数据
这 里 面 的 TextInputFormat 是 我 们 处 理 文 本 数 据 的 默 认 处 理 类 , TextInputFormat 的 顶 层 基 类 是
InputFormat,下面我们先来看一下这个抽象类的源码

详细分析一下getSplits方法的具体实现代码

下载源码 hadoop-3.2.2-src.tar.gz
解压
1
将D:\code\IDEA\hadoop-3.2.2-src\hadoop-mapreduce-project\hadoop-mapreduce-client\hadoop-mapreduce-client-core作为IDEA的项目
下载依赖
1
2
3
4
5
1.将maven的配置文件里加上镜像,这样下载依赖会快些
2.可以就用idea自动下载,但容易卡死,且过程可视性差;建议用cmd(打开路径D:\code\IDEA\hadoop-3.2.2-src\hadoop-mapreduce-project\hadoop-mapreduce-client\hadoop-mapreduce-client-core)
3.执行mvn clean package/compile -DskipTests
4.由于用国内镜像下载依赖可能会漏下依赖(已遇到);改回maven的配置文件里的镜像设置
5.可能会遇到IDEA Cannot Resolve Symbol 问题的解决方法汇总(博客里有解决方法)
getsplits源码剖析
1
2
3
需要将项目里的java文件夹设置为root source(选中后右键mark as),这样设置后查看源码ctrl 左键 才会快速关联
1.找到项目里的InputFormat
2.选中getsplits方法后 ctrl 左 ,选继承类FileInputFormat,再看方法的具体实现细节
小结
1
2
3
4
5
6
7
8
9
10
11
12
13
SPLIT_SLOP=1.1
文件剩余字节大小/1134217728[128M] > 1.1
意思就是当文件剩余大小bytesRemaining与splitSize的比值大于1.1的时候,就继续切分
否则,剩下的直接作为一个InputSplit

敲黑板,划重点:只要bytesRemaining/splitSize<=1.1就会停止划分,将剩下的作为一个InputSplit

140.8

也就是说当默认splitsize=128M时,inputsplit可能:
<128M
=128M
>128M
面试题
1
2
3
4
5
6
7
8
9
10
11
12
13
1. 一个1G的文件,会产生多少个map任务? 8
Block块默认是128M,所以1G的文件会产生8个Block块
默认情况下InputSplit的大小和Block块的大小一致,每一个InputSplit会产生一个map任务
所以:1024/128=8个map任务
2. 1000个文件,每个文件100KB,会产生多少个map任务?
一个文件,不管再小,都会占用一个block,所以这1000个小文件会产生1000个Block,那最终会产生1000个InputSplit,也就对应着会产生1000个map任务
3. 一个140M的文件,会产生多少个map任务?
根据前面的分析
140M的文件会产生2个Block,那对应的就会产生2个InputSplit了?
注意:这个有点特殊,140M/128M=1.09375<1.1
所以,这个文件只会产生一个InputSplit,也最终也就只会产生1个map 任务。

这个文件其实再稍微大1M就可以产生2个map 任务了
实战140 141
生成140 141大小的文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.imooc.mc;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;

public class GenerateDat {
public static void main(String[] args) throws Exception{
generate_140M();
generate_141M();
}

private static void generate_140M() throws IOException {
String fileName = "D:\\s_name_140.dat";
System.out.println("start: 开始生成140M文件->" + fileName);
BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName));
int num = 0;
while (num < 8201592) {
bfw.write("zhangsan beijing");
bfw.newLine();
num++;
if (num % 10000 == 0) {
bfw.flush();
}

}
System.out.println("end: 140M文件已生成");
}


private static void generate_141M() throws IOException {
String fileName = "D:\\s_name_141.dat";
System.out.println("start: 开始生成141M文件->" + fileName);
BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName));
int num = 0;
while (num < 8221592) {
bfw.write("zhangsan beijing");
bfw.newLine();
num++;
if (num % 10000 == 0) {
bfw.flush();
}
}
}
}
打jar包,上传数据到hdfs,提交任务到集群
结果查看

141

HBnBCQ.md.png

140

HBnD3j.md.png

createRecordReader方法
看TextInputFormat对这个方法的实现
1
2
3
注意:如果这个InputSplit不是第一个InputSplit,我们将会丢掉读取出来的第一行
因为我们总是通过next()方法多读取一行(会多读取下一个InputSplit的第一行)
这就解释了这个问题:如果一行数据被拆分到了两个InputSplit中,会不会有问题?PPT中通过一个例子详细分析了这个问题

HD1cmd.md.png

OutputFormat分析

1
2
3
4
5
前面我们分析了InputFormat,下面我们来分析一下OutputFormat,顾名思义,这个是控制MapReduce输出的

OutputFormat是输出数据的顶层基类
FileOutputFormat:文件数据处理基类
TextOutputFormat:默认文本文件处理类
1
2
3
4
5
6
这几个其实和InputFormat中的那几个文本处理类是对应着的,当然了针对输出数据还有其它类型的处理
类,我们在这先分析最常见的文本文件处理类,其他类型的等我们遇到具体场景再具体分析。
我们来看一下OutputFormat的源码,这个类主要由三个方法:
getRecordWriter
checkOutputSpecs //检测输出路径是否存在
getOutputCommitter

本文标题:大数据开发工程师-第五周 第四章 精讲Shuffle执行过程及源码分析输入输出

文章作者:TTYONG

发布时间:2022年02月09日 - 18:02

最后更新:2023年07月03日 - 18:07

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E4%BA%94%E5%91%A8-%E7%AC%AC%E5%9B%9B%E7%AB%A0-%E7%B2%BE%E8%AE%B2Shuffle%E6%89%A7%E8%A1%8C%E8%BF%87%E7%A8%8B%E5%8F%8A%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E8%BE%93%E5%85%A5%E8%BE%93%E5%87%BA.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%