大数据开发工程师-第十一周 Spark性能优化的道与术-3


第十一周 Spark性能优化的道与术-企业级最佳实践-3

性能优化分析

1
2
3
一个计算任务的执行主要依赖于CPU、内存、带宽
Spark是一个基于内存的计算引擎,所以对它来说,影响最大的可能就是内存,一般我们的任务遇到了性能瓶颈大概率都是内存的问题,当然了CPU和带宽也可能会影响程序的性能,这个情况也不是没有的,只是比较少。
Spark性能优化,其实主要就是在于对内存的使用进行调优。通常情况下,如果你的Spark程序计算的数据量比较小,并且你的内存足够使用,那么只要网络不至于卡死,一般是不会有大的性能问题的。但是Spark程序的性能问题往往出现在针对大数据量进行计算(比如上亿条数的数据,或者上T规模的数据),这个时候如果内存分配不合理就会比较慢,所以,Spark性能优化,主要是对内存进行优化。

内存都去哪了

1
2
3
1. 每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个int类型的field,那么它的对象头实际上比对象自身还要大。
2. Java的String对象的对象头,会比它内部的原始数据,要多出40个字节。因为它内部使用char数组来保存内部的字符序列,并且还要保存数组长度之类的信息。
3. Java中的集合类型,比如HashMap和LinkedList,内部使用的是链表数据结构,所以对链表中的每一个数据,都使用了Entry对象来包装。Entry对象不光有对象头,还有指向下一个Entry的指针,通常占用8个字节。
1
2
3
4
5
所以把原始文件中的数据转化为内存中的对象之后,占用的内存会比原始文件中的数据要大

那我如何预估程序会消耗多少内存呢?
通过cache方法,可以看到RDD中的数据cache到内存中之后占用多少内存,这样就能看出了
代码如下:这个测试代码就只写一个scala版本的了

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:测试内存占用情况
* Created by xuwei
*/
object TestMemoryScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("TestMemoryScala")
.setMaster("local")
val sc = new SparkContext(conf)
val dataRDD = sc.textFile("hdfs://bigdata01:9000/hello_10000000.dat").cache()
val count = dataRDD.count()
println(count)
//while循环是为了保证程序不结束,方便在本地查看4040页面(在本地运行时的spark web页面)中的storage信息
while(true){
;
}
}
}

image-20230327194520543

image-20230327194456852

1
2
3
4
5
6
7
执行代码,访问localhost的4040端口界面
这个界面其实就是spark的任务界面,在本地运行任务的话可以直接访问4040界面查看
点击stages可以看到任务的原始输入数据是多大

点击storage可以看到将数据加载到内存,生成RDD之后的大小

这样我们就能知道这一份数据在RDD中会占用多少内存了,这样在使用的时候,如果想要把数据全部都加载进内存,就需要给这个任务分配这么多内存了,当然了你分配少一些也可以,只不过这样计算效率会变低,因为RDD中的部分数据内存放不下就会放到磁盘了。

性能优化方案

1
2
3
4
5
6
7
下面我们通过这几个方式来实现对Spark程序的性能优化
高性能序列化类库
持久化或者checkpoint
JVM垃圾回收调优
提高并行度
数据本地化
算子优化

高性能序列化类库

1
2
3
4
5
6
7
8
在任何分布式系统中,序列化都是扮演着一个重要的角色的。
如果使用的序列化技术,在执行序列化操作的时候很慢,或者是序列化后的数据还是很大,那么会让分布式应用程序的性能下降很多。所以,进行Spark性能优化的第一步,就是进行序列化的性能优化。

Spark默认会在一些地方对数据进行序列化,如果我们的算子函数使用到了外部的数据(比如Java中的自定义类型),那么也需要让其可序列化,否则程序在执行的时候是会报错的,提示没有实现序列化,这个一定要注意。

原因是这样的:
因为Spark的初始化工作是在Driver进程中进行的,但是实际执行是在Worker节点的Executor进程中进行的;当Executor端需要用到Driver端封装的对象时,就需要把Driver端的对象通过序列化传输到Executor端,这个对象就需要实现序列化。
否则会报错,提示对象没有实现序列化
1
注意了,其实遇到这种没有实现序列化的对象,解决方法有两种
1
2
3
1. 如果此对象可以支持序列化,则将其实现Serializable接口,让它支持序列化
2. 如果此对象不支持序列化,针对一些数据库连接之类的对象,这种对象是不支持序列化的,所以可以把这个代码放到算子内部,这样就不会通过driver端传过去了,它会直接在executor中执行。
Spark对于序列化的便捷性和性能进行了一个取舍和权衡。默认情况下,Spark倾向于序列化的便捷性,使用了Java自身提供的序列化机制——基于ObjectInputStream和 ObjectOutputStream的序列化机制,因为这种方式是Java原生提供的,使用起来比较方便,但是Java序列化机制的性能并不高。序列化的速度相对较慢,而且序列化以后的数据,相对来说还是比较大,比较占空间。所以,如果你的Spark应用程序对内存很敏感,那默认的Java序列化机制并不是最好的选择。
1
2
3
Spark实际上提供了两种序列化机制:
Java序列化机制和Kryo序列化机制
Spark只是默认使用了java这种序列化机制
1
2
3
Java序列化机制:默认情况下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口,那么都是可以序列化的。Java序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大,这是它的缺点

Kryo序列化机制:Spark也支持使用Kryo序列化。Kryo序列化机制比Java序列化机制更快,而且序列化后的数据占用的空间更小,通常比Java序列化的数据占用的空间要小10倍左右。
1
2
3
4
5
6
7
8
Kryo序列化机制之所以不是默认序列化机制的原因:

第一点:因为有些类型虽然实现了Seriralizable接口,但是它也不一定能够被Kryo进行序列化;
第二点:如果你要得到最佳的性能,Kryo还要求你在Spark应用程序中,对所有你需要序列化的类型都进行手工注册,这样就比较麻烦了

如果要使用Kryo序列化机制
首先要用SparkConf设置spark.serializer的值为org.apache.spark.serializer.KryoSerializer,就是将Spark的序列化器设置为KryoSerializer。这样,Spark在进行序列化时,就会使用Kryo进行序列化了。使用Kryo时针对需要序列化的类,需要预先进行注册,这样才能获得最佳性能——如果不注册的话,Kryo也能正常工作,只是Kryo必须时刻保存类型的全类名,反而占用不少内存。
Spark默认对Scala中常用的类型在Kryo中做了注册,但是,如果在自己的算子中,使用了外部的自定义类型的对象,那么还是需要对其进行注册。
1
2
3
4
5
6
注册自定义的数据类型格式:
conf.registerKryoClasses(...)

注意:如果要序列化的自定义的类型,字段特别多,此时就需要对Kryo本身进行优化,因为Kryo内部的缓存可能不够存放那么大的class对象

需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大,默认值为2,单位是MB,也就是说最大能缓存2M的对象,然后进行序列化。可以在必要时将其调大。
1
2
3
4
5
6
7
8
什么场景下适合使用Kryo序列化?

一般是针对一些自定义的对象,例如我们自己定义了一个对象,这个对象里面包含了几十M,或者上百M的数据,然后在算子函数内部,使用到了这个外部的大对象
如果默认情况下,让Spark用java序列化机制来序列化这种外部的大对象,那么就会导致序列化速度比较慢,并且序列化以后的数据还是比较大。

所以,在这种情况下,比较适合使用Kryo序列化类库,来对外部的大对象进行序列化,提高序列化速度,减少序列化后的内存空间占用。
用代码实现一个案例:
scala代码如下:

使用kryo实现序列化

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.imooc.scala
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:Kryo序列化的使用
* Created by xuwei
*/
object KryoSerScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("KryoSerScala")
.setMaster("local")
//指定使用kryo序列化机制,注意:如果使用了registerKryoClasses,其实这一行设置可以省略的
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Person]))//注册自定义的数据类型(Array里也可以传多个),注册了性能高一些
val sc = new SparkContext(conf)
val dataRDD = sc.parallelize(Array("hello you","hello me"))
val wordsRDD = dataRDD.flatMap(_.split(" "))
val personRDD = wordsRDD.map(word=>Person(word,18)).persist(StorageLevel.MEMORY_ONLY_SER)
personRDD.foreach(println(_))
//while循环是为了保证程序不结束,方便在本地查看4040页面中的storage信息
while (true) {
;
}
}
}


case class Person(name: String,age: Int) extends Serializable
1
2
执行任务,然后访问localhost的4040界面
在界面中可以看到cache的数据大小是 31 字节。

image-20230327210029655

1
2
3
4
5
那我们把kryo序列化设置去掉,使用默认的java序列化看一下效果
修改代码,注释掉这两行代码即可

//.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
//.registerKryoClasses(Array(classOf[Person]))
1
2
3
运行任务,再访问4040界面
发现此时占用的内存空间是138字节,比使用kryo的方式内存空间多占用了将近5倍。
所以从这可以看出来,使用kryo序列化方式对内存的占用会降低很多。

image-20230327210118413

1
注意:如果我们只是将spark的序列化机制改为了kryo序列化,但是没有对使用到的自定义类型手工进行注册,那么此时内存的占用会介于前面两种情况之间
1
2
3
4
修改代码,只注释掉registerKryoClasses这一行代码

.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
//.registerKryoClasses(Array(classOf[Person]))//注册自定义的数据类型
1
2
3
运行任务,再访问4040界面
发现此时的内存占用为123字节,介于前面的31字节和138字节之间。
所以从这可以看出来,在使用kryo序列化的时候,针对自定义的类型最好是手工注册一下,否则就算开启了kryo序列化,性能的提升也是有限的。

image-20230327210309583

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.imooc.java;
import com.esotericsoftware.kryo.Kryo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.serializer.KryoRegistrator;
import org.apache.spark.storage.StorageLevel;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
/**
* 需求:Kryo序列化的使用
* Created by xuwei
*/
public class KryoSerjava {
public static void main(String[] args) {
//创建SparkContext:
SparkConf conf = new SparkConf();
conf.setAppName("KryoSerjava")
.setMaster("local")
.set("spark.serializer", "org.apache.spark.serializer.KryoSer
.set("spark.kryo.classesToRegister", "com.imooc.java.Person");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("hello you", "hello me"));
JavaRDD<String> wordsRDD = dataRDD.flatMap(new FlatMapFunction<String,String>(){
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});

JavaRDD<Person> personRDD = wordsRDD.map(new Function<String, Person>
@Override
public Person call(String word) throws Exception {
return new Person(word, 18);
}
}).persist(StorageLevel.MEMORY_ONLY_SER());

personRDD.foreach(new VoidFunction<Person>() {
@Override
public void call(Person person) throws Exception {
System.out.println(person);
}
});
while (true){
;
}
}
}

class Person implements Serializable{
private String name;
private int age;
Person(String name,int age){ // 这里讲可以通过什么自动生成,没听清
this.name = name;
this.age = age;
}
@Override
public String toString() { // alt+shift+s,右键generate->toString
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}

持久化或者checkpoint

1
2
3
针对程序中多次被transformation或者action操作的RDD进行持久化操作,避免对一个RDD反复进行计算,再进一步优化,使用Kryo序列化的持久化级别,减少内存占用
为了保证RDD持久化数据在可能丢失的情况下还能实现高可靠,则需要对RDD执行Checkpoint操作
这两个操作我们前面讲过了,在这就不再演示了

JVM垃圾回收调优

1
2
3
4
5
6
7
8
9
10
由于Spark是基于内存的计算引擎,RDD缓存的数据,以及算子执行期间创建的对象都是放在内存中的,所以针对Spark任务如果内存设置不合理会导致大部分时间都消耗在垃圾回收上

对于垃圾回收来说,最重要的就是调节RDD缓存占用的内存空间,和算子执行时创建的对象占用的内存空间的比例。

默认情况下,Spark使用每个executor 60%的内存空间来缓存RDD,那么只有40%的内存空间来存放算子执行期间创建的对象
在这种情况下,可能由于内存空间的不足,并且算子对应的task任务在运行时创建的对象过大,那么一旦发现40%的内存空间不够用了,就会触发Java虚拟机的垃圾回收操作。

因此在极端情况下,垃圾回收操作可能会被频繁触发。
在这种情况下,如果发现垃圾回收频繁发生。那么就需要对这个比例进行调优了,spark.storage.memoryFraction参数的值默认是0.6。
使用SparkConf().set("spark.storage.memoryFraction", "0.5")可以进行修改,就是将RDD缓存占用内存空间的比例降低为50% ,从而提供更多的内存空间来保存task运行时创建的对象。
1
2
3
4
5
因此,对于RDD持久化而言,完全可以使用Kryo序列化,加上降低其executor内存占比的方式,来减少其内存消耗。给task提供更多的内存,从而避免task在执行时频繁触发垃圾回收。
我们可以对task的垃圾回收进行监测,在spark的任务执行界面,可以查看每个task执行消耗的时间,以及task gc消耗的时间。

重新向集群中提交checkpoint的代码,查看spark任务的task指标信息
确保Hadoop集群、yarn的historyserver进程以及spark的historyserver进程是正常运行的
1
2
3
4
删除checkpoint任务的输出目录
[root@bigdata04 sparkjars]# hdfs dfs -rm -r /out-chk001

[root@bigdata04 sparkjars]# sh -x checkPointJob.sh
1
点击生成的第一个job,再点击进去查看这个job的stage,进入第一个stage,查看task的执行情况,看这里面的GC time的数值会不会比较大,最直观的就是如果gc time这里标红了,则说明gc时间过长。

image-20230327223033714

1
上面这个是分任务查看,其实还可以查看全局的,看Executor进程中整个任务执行总时间和gc的消耗时间。

image-20230327223148318

java GC

1
2
3
4
5
6
7
既然说到了Java中的GC,那我们就需要说道说道了。
Java堆空间被划分成了两块空间:一个是年轻代,一个是老年代。
年轻代放的是短时间存活的对象
老年代放的是长时间存活的对象。
年轻代又被划分了三块空间,Eden、Survivor1、Survivor2

来看一下这个内存划分比例图

image-20230327223408841

1
年轻代占堆内存的1/3,老年代占堆内存的2/3
1
2
3
4
5
6
7
8
9
10
11
12
其中年轻代又被划分了三块, Eden,Survivor1,Survivor2 的比例为 8:1:1
Eden区域和Survivor1区域用于存放对象,Survivor2区域备用。

我们创建的对象,首先会放入Eden区域,如果Eden区域满了,那么就会触发一次Minor GC,进行年轻代的垃圾回收(其实就是回收Eden区域内没有人使用的对象),然后将存活的对象存入Survivor1区域,再创建对象的时候继续放入Eden区域。

第二次Eden区域满了,那么Eden和Survivor1区域中存活的对象,会一块被移动到Survivor2区域中。然后Survivor1和Survivor2的角色调换,Survivor1变成了备用。

当第三次Eden区域再满了的时候,Eden和Survivor2区域中存活的对象,会一块被移动到Survivor1区域中,按照这个规律进行循环

如果一个对象,在年轻代中,撑过了多次垃圾回收(默认是15次),都没有被回收掉,那么会被认为是长时间存活的,此时就会被移入老年代。此外,如果在将Eden和Survivor1中的存活对象,尝试放入Survivor2中时,发现Survivor2放满了,那么会直接放入老年代。此时就出现了,短时间存活的对象,也会进入老年代的问题。

如果老年代的空间满了,那么就会触发Full GC,进行老年代的垃圾回收操作,如果执行Full GC也释放不了内存空间,就会报内存溢出的错误了。
1
注意了,Full GC是一个重量级的垃圾回收,Full GC执行的时候,程序是处于暂停状态的,这样会非常影响性能。

spark GC调优方案

1
2
3
Spark中,垃圾回收调优的目标就是,只有真正长时间存活的对象,才能进入老年代,短时间存活的对象,只能呆在年轻代。不能因为某个Survivor区域空间不够,在Minor GC时,就进入了老年代,从而造成短时间存活的对象,长期呆在老年代中占据了空间,这样Full GC时要回收大量的短时间存活的对象,导致Full GC速度缓慢。

如果发现,在task执行期间,大量full gc发生了,那么说明,年轻代的Eden区域,给的空间不够大。此时可以执行一些操作来优化垃圾回收行为
1
2
3
4
5
6
7
8
9
1:最直接的就是提高Executor的内存
在spark-submit中通过参数指定executor的内存
--executor-memory 1G

2:调整Eden与s1和s2的比值【一般情况下不建议调整这块的比值】
-XX:NewRatio=4:设置年轻代(包括Eden和两个Survivor区)与年老代的比值(除去持久代).设置为4,则年轻代与年老代所占比值为1:4,年轻代占整个堆栈的1/5
-XX:SurvivorRatio=4:设置年轻代中Eden区与Survivor区的大小比值.设置为4,则两个Survivor区与一个Eden区的比值为2:4,一个Survivor区占整个年轻代的1/6
具体使用的时候在 spark-submit 脚本中通过 --conf 参数设置即可
--conf "spark.executor.extraJavaOptions= -XX:SurvivorRatio=4 -XX:NewRatio=4"
1
2
3
4
其实最直接的就是增加Executor的内存,如果这个内存上不去,其它的修改都是徒劳。
举个例子就是说,一个20岁的成年人和一个3岁的小孩
3岁的小孩掌握再多的格斗技巧都没有用,在绝对的实力面前一切都是花架子。
所以说我们一般很少需要去调整Eden、s1、s2的比值,一般都是直接增加Executor的内存比较靠谱。

提高并行度

1
2
3
4
5
实际上Spark集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源,这样才能提高Spark程序的性能。

Spark会自动设置以文件作为输入源的RDD的并行度,依据其大小,比如HDFS,就会给每一个block创建一个partition,也依据这个设置并行度。对于reduceByKey等会发生shuffle操作的算子,会使用并行度最大的父RDD的并行度

可以手动使用textFile()、parallelize()等方法的第二个参数来设置并行度(只针对这一个RDD);也可以使用spark.default.parallelism参数(全局),来设置统一的并行度。Spark官方的推荐是,给集群中的每个cpu core设置2~3个task。
1
2
3
4
5
6
7
8
9
下面来举个例子
我在spark-submit脚本中给任务设置了5个executor,每个executor,设置了2个cpu core
spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 5 \ // 为job设置5个executor
--executor-cores 2 \ // 分配两个CPU
.....
1
2
3
4
此时,如果我在代码中设置了默认并行度为5
conf.set("spark.default.parallelism","5")

这个参数设置完了以后,也就意味着所有RDD的partition都被设置成了5个,针对RDD的每一个partition,spark会启动一个task来进行计算,所以对于所有的算子操作,都只会创建5个task来处理对应的RDD中的数据。
1
2
3
但是注意了,我们前面在spark-submit脚本中设置了5个executor,每个executor 2个cpu core,所以这个时候spark其实会向yarn集群申请10个cpu core,但是我们在代码中设置了默认并行度为5,只会产生5个task,一个task使用一个cpu core,那也就意味着有5个cpu core是空闲的,这样申请的资源就浪费了一半。

其实最好的情况,就是每个cpu core都不闲着,一直在运行,这样可以达到资源的最大使用率,其实让一个cpu core运行一个task都是有点浪费的,官方也建议让每个cpu core运行2~3个task,这样可以充分压榨CPU的性能
1
2
3
4
为什么这样说呢?

是这样的,因为每个task执行的顺序和执行结束的时间很大概率是不一样的,如果正好有10个cpu,运行10个taks,那么某个task可能很快就执行完了,那么这个CPU就空闲下来了,这样资源就浪费了。
所以说官方推荐,给每个cpu分配2~3个task是比较合理的,可以充分利用CPU资源,发挥它最大的价值。

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
下面我们来实际写个案例看一下效果
Scala代码如下:

package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:设置并行度
* 1:可以在textFile或者parallelize等方法的第二个参数中设置并行度
* 2:或者通过spark.default.parallelism参数统一设置并行度
* Created by xuwei
*/
object MoreParallelismScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("MoreParallelismScala")
//.setMaster("local")
//设置全局并行度
conf.set("spark.default.parallelism","5")
val sc = new SparkContext(conf)
val dataRDD = sc.parallelize(Array("hello","you","hello","me","hehe","hello","you","hello","me","hehe"))
dataRDD.map((_,1))
.reduceByKey(_ + _)
.foreach(println(_))
sc.stop()
}
}

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.imooc.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
* 需求:设置并行度
* Created by xuwei
*/
public class MoreParallelismJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("MoreParallelismJava");
//设置全局并行度
conf.set("spark.default.parallelism","5");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("hello","you","hello","me","hehe","hello","you","hello","me","hehe"));
dataRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception
return new Tuple2<String, Integer>(word,1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
}).foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tup) throws Exception {
System.out.println(tup);
}
});
sc.stop(); }
}
1
2
3
4
5
6
7
8
9
10
11
12
对代码编译打包
spark-submit脚本内容如下:

[root@bigdata04 sparkjars]# vi moreParallelismJob.sh
spark-submit \
--class com.imooc.scala.MoreParallelismScala \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 5 \
--executor-cores 2 \
db_spark-1.0-SNAPSHOT-jar-with-dependencies.jar

image-20230328003937453

1
2
3
4
任务提交到集群运行之后,查看spark的任务界面(job)
先看executors,这里显示了4个executor和1个driver进程,为什么不是5个executor进程呢?

是因为我们现在使用的是yarn-cluster模式,driver进程运行在集群内部,所以它占了一个executor,如果使用的是yarn-client模式,就会产生5个executor和1个单独的driver进程。

image-20230328004230697

1
然后去看satges界面,两个Stage都是5个task并行执行,这5个task会使用5个cpu,但是我们给这个任务申请了10个cpu,所以就有5个是空闲的了(这里没考虑driver的占用)。

image-20230328005706629

注意

image-20230328005414569

1
当在sparkContext生成对象后,再设置默认并行度会出现问题

image-20230328005531042

提高性能

1
2
3
4
5
如果想要最大限度利用CPU的性能,至少将spark.default.parallelism的值设置为10,这样可以实现一个cpu运行一个task,其实官方推荐是设置为20或者30。
其实这个参数也可以在spark-submit脚本中动态设置,通过--conf参数设置,这样就比较灵活了。

注意:此时需要将代码中设置spark.default.parallelism的配置注释掉
//conf.set("spark.default.parallelism","5")
1
2
3
4
5
6
7
8
9
10
11
12
为了看起来更清晰,在这我们使用 yarn-client 模式,这样driver就不会占用我们的分配的executor了

[root@bigdata04 sparkjars]# vi moreParallelismJob2.sh
spark-submit \
--class com.imooc.scala.MoreParallelismScala \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--num-executors 5 \
--executor-cores 2 \
--conf "spark.default.parallelism=10" \
db_spark-1.0-SNAPSHOT-jar-with-dependencies.jar
1
2
由于修改了代码,所以需要重新编译,打包,执行
执行结束后再来查看spark的任务界面,可以看到此时有10个task并行执行

image-20230328011738873

image-20230328011801559

image-20230328011600636

1
2
这就是并行度相关的设置
接下来我们来看一个图,加深一下理解

image-20230328012605704

1
这个图中描述的就是刚才我们演示的两种情况下Executor和Task之间的关系

spark-submit常用参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
最后我们来分析总结一下spark-submit脚本中经常配置的一些参数

--name mySparkJobName:指定任务名称(代码里也可以设置)
--class com.imooc.scala.xxxxx :指定入口类
--master yarn :指定集群地址(standalone),on yarn模式指定yarn
--deploy-mode cluster :client代表yarn-client,cluster代表yarn-cluster
--executor-memory 1G :executor进程的内存大小,实际工作中设置2~4G即可
--num-executors 2 :分配多少个executor进程
--executor-cores 2 : 一个executor进程分配多少个cpu core

--driver-cores 1 :(如果使用yarn-cluster模式,这里不用设置也行,直接按executor的配置来)driver进程分配多少cpu core,默认为1即可
--driver-memory 1G:(如果使用yarn-cluster模式,这里不用设置也行,直接按executor的配置来)driver进程的内存,如果需要使用类似于collect之类的action算子向Driver端拉取数据,则这里可以设置大一些
--jars fastjson.jar,abc.jar 在这里可以设置job依赖的第三方jar包(pom里spark-core没提供的)【不建议把第三方依赖打入程序的jar包中,一方面会导致jar变大;另一方面,同一个项目组同事用的第三方依赖版本问题;还有这里可以使用本地路径,或者hdfs路径(建议使用hdfs路径,因为使用本地路径依赖,还是会读取到hdfs上)】
--conf "spark.default.parallelism=10":可以动态指定一些spark任务的参数,指定多个参
1
2
3
4
5
6
7
8
最后注意一点:针对 --num-executors 和 --executor-cores 的设置
大家看这两种方式设置有什么区别:
第一种方式:
--num-executors 2
--executor-cores 1
第二种方式:
--num-executors 1
--executor-cores 2
1
2
3
4
5
6
7
8
9
这两种设置最终都会向集群申请2个cpu core,可以并行运行两个task,但是这两种设置方式有什么区别呢?

第一种方法:多executor模式
由于每个executor只分配了一个cpu core,我们将无法利用在同一个JVM中运行多个任务的优点。我们假设这两个executor是在两个节点中启动的,那么针对广播变量这种操作,将在两个节点的中都复制1份,最终会复制两份

第二种方法:多core模式
此时一个executor中会有2个cpu core,这样可以利用同一个JVM中运行多个任务的优点,并且针对广播变量的这种操作,只会在这个executor对应的节点中复制1份即可。

那是不是我可以给一个executor分配很多的cpu core,也不是的,因为一个executor的内存大小是固定的,如果在里面运行过多的task可能会导致内存不够用,所以这块一般在工作中我们会给一个executor分配2~4G内存,对应的分配 2~4个cpu core。

数据本地化

1
2
3
4
5
6
7
8
9
10
数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的,那么性能当然会非常高。但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。通常来说,移动代码到其它节点,会比移动数据到代码所在的节点,速度要得多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。

数据本地化,指的是,数据离计算它的代码有多近。基于数据距离代码的距离,有几种数据本地化级别:

数据本地化级别 解释
PROCESS_LOCAL 进程本地化,性能最好:数据和计算它的代码在同一个JVM进程中
NODE_LOCAL 节点本地化:数据和计算它的代码在一个节点上,但是不在一个JVM进程
NO_PREF 数据从哪里过来,性能都是一样的,比如从数据库中获取数据,对于task而言在哪个机器上都是一样的
RACK_LOCAL 数据和计算它的代码在一个机架上,数据需要通过网络在节点之间进行传输
ANY 数据可能在任意地方,比如其它网络环境内,或者其它机架上,性能最差

image-20230328105831703

1
2
3
4
5
6
7
8
Spark倾向使用最好的本地化级别调度task,但这是不现实的
如果目前我们要处理的数据所在的executor上目前没有空闲的CPU,那么Spark就会放低本地化级别。这时有两个选择:
第一,等待,直到executor上的cpu释放出来,那么就分配task过去;
第二,立即在任意一个其它executor上启动一个task。

Spark默认会等待指定时间,期望task要处理的数据所在的节点上的executor空闲出一个cpu,从而将task分配过去,只要超过了时间,那么Spark就会将task分配到其它任意一个空闲的executor上

可以设置参数, spark.locality 系列参数,来调节Spark等待task可以进行数据本地化的时间
1
2
3
4
5
6
spark.locality.wait(3000毫秒):默认等待3秒(通用的所有级别)
spark.locality.wait.process:等待指定的时间看能否达到数据和计算它的代码在同一个JVM
spark.locality.wait.node:等待指定的时间看能否达到数据和计算它的代码在一个节点上执行
spark.locality.wait.rack:等待指定的时间看能否达到数据和计算它的代码在一个机架上

看这个图里面的task,此时的数据本地化级别是最优的 PROCESS_LOCAL

image-20230328110840007


本文标题:大数据开发工程师-第十一周 Spark性能优化的道与术-3

文章作者:TTYONG

发布时间:2023年03月27日 - 19:03

最后更新:2023年05月29日 - 14:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%B8%80%E5%91%A8-Spark%E6%80%A7%E8%83%BD%E4%BC%98%E5%8C%96%E7%9A%84%E9%81%93%E4%B8%8E%E6%9C%AF-%E4%BC%81%E4%B8%9A%E7%BA%A7%E6%9C%80%E4%BD%B3%E5%AE%9E%E8%B7%B5-3.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%