大数据开发工程师-第十一周 Spark性能优化的道与术-1


第十一周 Spark性能优化的道与术-1

宽依赖和窄依赖

1
2
3
4
5
6
先看一下什么是窄依赖:
窄依赖(Narrow Dependency):指父RDD的每个分区只被子RDD的一个分区所使用,例如map、filter等这些算子
一个RDD,对它的父RDD只有简单的一对一的关系,也就是说,RDD的每个partition仅仅依赖于父RDD中的一个partition,父RDD和子RDD的partition之间的对应关系,是一对一的。

宽依赖(Shuffle Dependency):父RDD的每个分区都可能被子RDD的多个分区使用,例如groupByKey、reduceByKey,sortBykey等算子,这些算子其实都会产生shuffle操作
也就是说,每一个父RDD的partition中的数据都可能会传输一部分到下一个RDD的每个partition中。此时就会出现,父RDD和子RDD的partition之间,具有错综复杂的关系,那么,这种情况就叫做两个RDD之间是宽依赖,同时,他们之间会发生shuffle操作。
1
2
下面来看图具体分析一个案例
以单词计数案例来分析

image-20230327114152987

1
2
3
4
5
6
7
8
9
最左侧是linesRDD,这个表示我们通过textFile读取文件中的数据之后获取的RDD
接着是我们使用flatMap算子,对每一行数据按照空格切开,然后可以获取到第二个RDD,这个RDD中包含的是切开的每一个单词
在这里这两个RDD就属于一个窄依赖,因为父RDD的每个分区只被子RDD的一个分区所使用,也就是说他们的分区是一对一的,这样就不需要经过shuffle了。
接着是使用map算子,将每一个单词转换成(单词,1)这种形式,
此时这两个RDD也是一个窄依赖的关系,父RDD的分区和子RDD的分区也是一对一的
最后我们会调用reduceByKey算子,此时会对相同key的数据进行分组,分到一个分区里面,并且进行聚合操作,此时父RDD的每个分区都可能被子RDD的多个分区使用,那这两个RDD就属于宽依赖了。

这就是宽窄依赖的区别,那我们在这区分宽窄依赖有什么意义吗?
不要着急,往下面看

Stage

1
2
3
4
5
6
7
spark job是根据action算子触发的,遇到action算子就会起一个job
Spark Job会被划分为多个Stage,每一个Stage是由一组并行的Task组成的

注意:stage的划分依据就是看是否产生了shuflle(即宽依赖),遇到一个shuffle操作就划分为前后两个stage
stage是由一组并行的task组成,stage会将一批task用TaskSet来封装,提交给TaskScheduler进行分配,最后发送到Executor执行

下面来看一张图来具体分析一下

image-20230327114640278

1
2
3
4
5
6
7
8
9
10
为什么是从后往前呢?因为RDD之间是有血缘关系的,后面的RDD依赖前面的RDD,也就是说后面的RDD要等前面的RDD执行完,才会执行。
所以从后往前遇到宽依赖就划分为两个stage,shuffle前一个,shuffle后一个。如果整个过程没有产生shuffle那就只会有一个stage

看这个图
RDD G往前推,到RDD B的时候,是窄依赖,所以不切分Stage,再往前到RDD A,此时产生了宽依赖,所以RDD A属于一个Stage、RDD B和G属于一个Stage
再看下面,RDD G到RDD F,产生了宽依赖,所以RDD F属于一个Stage,因为RDD F和RDD C、D、E这几个RDD没有产生宽依赖,都是窄依赖,所以他们属于一个Stage。
所以这个图中,RDD A单独一个stage1,RDD C、D、E、F被划分在stage2中,
最后RDD B和RDD G划分在了stage3 里面。

注意:Stage划分是从后往前划分,但是stage执行时从前往后的,这就是为什么后面先切割的stage为什么编号是3.

Spark Job的三种提交模式

1
2
3
4
5
6
7
8
9
10
1. 第一种,standalone模式,基于Spark自己的standalone集群。
指定spark-submit –master spark://bigdata01:7077

2. 第二种,是基于YARN的client模式。
指定–master yarn --deploy-mode client
这种方式主要用于测试,查看日志方便一些,部分日志会直接打印到控制台上面,因为driver进程运行在本地客户端,就是提交Spark任务的那个客户端机器,driver负责调度job,会与yarn集群产生大量的通信,一般情况下Spark客户端机器和Hadoop集群的机器是无法内网通信,只能通过外网,这样在大量通信的情况下会影响通信效率,并且当我们执行一些action操作的时候数据也会返回给driver端,driver端机器的配置一般都不高,可能会导致内存溢出等问题。

3. 第三种,是基于YARN的cluster模式。【推荐】
指定–master yarn --deploy-mode cluster
这种方式driver进程运行在集群中的某一台机器上,这样集群内部节点之间通信是可以通过内网通信的,并且集群内的机器的配置也会比普通的客户端机器配置高,所以就不存在yarn-client模式的一些问题了,只不过这个时候查看日志只能到集群上面看了,这倒没什么影响。

image-20230327120631638

1
2
3
4
5
左边是standalone模式,现在我们使用的提交方式,driver进程是在客户端机器中的,其实针对standalone模式而言,这个Driver进程也是可以运行在集群中的
来看一下官网文档,standalone模式也是支持的,通过指定deploy-mode为cluster即可

中间的值yarn client模式,由于是on yarn模式,所以里面是yarn集群的进程,此时driver进程就在提交spark任务的客户端机器上了
最右边这个是yarn cluster模式,driver进程就会在集群中的某一个节点上面。

image-20230327121044013

Shuffle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
在MapReduce框架中,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce;而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。Spark也会有自己的shuffle实现过程。

我们首先来看一下
在Spark中,什么情况下,会发生shuffle?
reduceByKey、groupByKey、sortByKey、countByKey、join等操作都会产生shuffle。

那下面我们来详细分析一下Spark中的shuffle过程。
Spark的shuffle历经了几个过程
1. Spark 0.8及以前使用Hash Based Shuffle
2. Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
3. Spark1.6之后使用Sort-Base Shuffle,因为Hash Based Shuffle存在一些不足所以就把它替换掉了。

所以Spark Shuffle 一共经历了这几个过程:
1. 未优化的Hash Based Shuffle
2. 优化后的Hash Based Shuffle
3. Sort-Based Shuffle

未优化的Hash Based Shuffle

1
2
来看一个图,假设我们是在执行一个reduceByKey之类的操作,此时就会产生shuffle
shuffle里面会有两种task,一种是shuffleMapTask,负责拉取前一个RDD中的数据,还有一个ResultTask,负责把拉取到的数据按照规则汇总起来

image-20230327121800239

1
2
3
4
5
6
7
8
1:假设有1个节点,这个节点上有2个CPU,上面运行了4个ShuffleMapTask,这样的话其实同时只有2个ShuffleMapTask是并行执行的,因为一个cpu core同时只能执行一个ShuffleMapTask。
2:每个ShuffleMapTask都会为每个ResultTask创建一份Bucket缓存,以及对应的ShuffleBlockFile磁盘文件
这样的话,每一个ShuffleMapTask都会产生4份Bucket缓存和对应的4个ShuffleBlockFile文件,分别对应下面的4个ResultTask

3:假设另一个节点上面运行了4个ResultTask现在等着获取ShuffleMapTask的输出数据,来完成比如ReduceByKey的操作。
这是这个流程,注意了,如果有100个MapTask,100个ResultTask,那么会产生10000个本地磁盘文件,这样需要频繁的磁盘IO,是比较影响性能的。

注意,那个bucket缓存是非常重要的,ShuffleMapTask会把所有的数据都写入Bucket缓存之后,才会刷写到对应的磁盘文件中,但是这就有一个问题,如果map端数据过多,那么很容易造成内存溢出,所以spark在优化后的Hash Based Shuffle中对这个问题进行了优化,默认这个内存缓存是100kb,当Bucket中的数据达到了阈值之后,就会将数据一点一点地刷写到对应的ShuffleBlockFile磁盘中了。这种操作的优点,是不容易发生内存溢出。缺点在于,如果内存缓存过小的话,那么可能发生过多的磁盘io操作。所以,这里的内存缓存大小,是可以根据实际的业务情况进行优化的。

优化后的Hash Based Shuffle

image-20230327122626812

1
2
3
4
5
6
7
看这个优化后的shuffle流程
1:假设机器上有2个cpu,4个shuffleMaptask,这样同时只有2个在并行执行
2:在这个版本中,Spark引入了consolidation机制,一个ShuffleMapTask将数据写入ResultTask数量的本地文件中,这个是不变的,但是当下一个ShuffleMapTask运行的时候,可以直接将数据写入之前产生的本地文件中,相当于对多个ShuffleMapTask的输出进行了合并,从而大大减少了本地磁盘中文件的数量。
此时文件的数量变成了CPU core数量 * ResultTask数量,比如每个节点上有2个CPU,有100个ResultTask,那么每个节点上会产生200个文件
这个时候文件数量就变得少多了。

但是如果ResultTask端的并行任务过多的话则CPU core * Result Task依旧过大,也会产生很多小文件

Sort-Based Shuffle

1
2
3
4
5
引入Consolidation机制虽然在一定程度上减少了磁盘文件数量,但是不足以有效提高Shuffle的性能,这种情况只适合中小型数据规模的数据处理。
为了让Spark能在更大规模的集群上高性能处理大规模的数据,因此Spark引入了Sort-Based Shuffle。

该机制针对每一个ShuffleMapTask都只创建一个文件,将所有的ShuffleMapTask的数据都写入同一个文件,并且对应生成一个索引文件。
以前的数据是放在内存中,等到数据写完了再刷写到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将内存中的数据溢写到磁盘,结束的时候,再将这些溢写的文件联合内存中的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少缓存所占用的内存大小,而且同时避免GC的风险和频率。

image-20230327143450475


本文标题:大数据开发工程师-第十一周 Spark性能优化的道与术-1

文章作者:TTYONG

发布时间:2023年03月27日 - 11:03

最后更新:2023年05月28日 - 00:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%B8%80%E5%91%A8-Spark%E6%80%A7%E8%83%BD%E4%BC%98%E5%8C%96%E7%9A%84%E9%81%93%E4%B8%8E%E6%9C%AF-1.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%