第十一周 Spark性能优化的道与术-企业级最佳实践-3
性能优化分析
1 | 一个计算任务的执行主要依赖于CPU、内存、带宽 |
内存都去哪了
1 | 1. 每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个int类型的field,那么它的对象头实际上比对象自身还要大。 |
1 | 所以把原始文件中的数据转化为内存中的对象之后,占用的内存会比原始文件中的数据要大 |
scala
1 | package com.imooc.scala |


1 | 执行代码,访问localhost的4040端口界面 |
性能优化方案
1 | 下面我们通过这几个方式来实现对Spark程序的性能优化 |
高性能序列化类库
1 | 在任何分布式系统中,序列化都是扮演着一个重要的角色的。 |
1 | 注意了,其实遇到这种没有实现序列化的对象,解决方法有两种 |
1 | 1. 如果此对象可以支持序列化,则将其实现Serializable接口,让它支持序列化 |
1 | Spark实际上提供了两种序列化机制: |
1 | Java序列化机制:默认情况下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口,那么都是可以序列化的。Java序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大,这是它的缺点 |
1 | Kryo序列化机制之所以不是默认序列化机制的原因: |
1 | 注册自定义的数据类型格式: |
1 | 什么场景下适合使用Kryo序列化? |
使用kryo实现序列化
scala
1 | package com.imooc.scala |
1 | 执行任务,然后访问localhost的4040界面 |

1 | 那我们把kryo序列化设置去掉,使用默认的java序列化看一下效果 |
1 | 运行任务,再访问4040界面 |

1 | 注意:如果我们只是将spark的序列化机制改为了kryo序列化,但是没有对使用到的自定义类型手工进行注册,那么此时内存的占用会介于前面两种情况之间 |
1 | 修改代码,只注释掉registerKryoClasses这一行代码 |
1 | 运行任务,再访问4040界面 |

java
1 | package com.imooc.java; |
持久化或者checkpoint
1 | 针对程序中多次被transformation或者action操作的RDD进行持久化操作,避免对一个RDD反复进行计算,再进一步优化,使用Kryo序列化的持久化级别,减少内存占用 |
JVM垃圾回收调优
1 | 由于Spark是基于内存的计算引擎,RDD缓存的数据,以及算子执行期间创建的对象都是放在内存中的,所以针对Spark任务如果内存设置不合理会导致大部分时间都消耗在垃圾回收上 |
1 | 因此,对于RDD持久化而言,完全可以使用Kryo序列化,加上降低其executor内存占比的方式,来减少其内存消耗。给task提供更多的内存,从而避免task在执行时频繁触发垃圾回收。 |
1 | 删除checkpoint任务的输出目录 |
1 | 点击生成的第一个job,再点击进去查看这个job的stage,进入第一个stage,查看task的执行情况,看这里面的GC time的数值会不会比较大,最直观的就是如果gc time这里标红了,则说明gc时间过长。 |

1 | 上面这个是分任务查看,其实还可以查看全局的,看Executor进程中整个任务执行总时间和gc的消耗时间。 |

java GC
1 | 既然说到了Java中的GC,那我们就需要说道说道了。 |

1 | 年轻代占堆内存的1/3,老年代占堆内存的2/3 |
1 | 其中年轻代又被划分了三块, Eden,Survivor1,Survivor2 的比例为 8:1:1 |
1 | 注意了,Full GC是一个重量级的垃圾回收,Full GC执行的时候,程序是处于暂停状态的,这样会非常影响性能。 |
spark GC调优方案
1 | Spark中,垃圾回收调优的目标就是,只有真正长时间存活的对象,才能进入老年代,短时间存活的对象,只能呆在年轻代。不能因为某个Survivor区域空间不够,在Minor GC时,就进入了老年代,从而造成短时间存活的对象,长期呆在老年代中占据了空间,这样Full GC时要回收大量的短时间存活的对象,导致Full GC速度缓慢。 |
1 | 1:最直接的就是提高Executor的内存 |
1 | 其实最直接的就是增加Executor的内存,如果这个内存上不去,其它的修改都是徒劳。 |
提高并行度
1 | 实际上Spark集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源,这样才能提高Spark程序的性能。 |
1 | 下面来举个例子 |
1 | 此时,如果我在代码中设置了默认并行度为5 |
1 | 但是注意了,我们前面在spark-submit脚本中设置了5个executor,每个executor 2个cpu core,所以这个时候spark其实会向yarn集群申请10个cpu core,但是我们在代码中设置了默认并行度为5,只会产生5个task,一个task使用一个cpu core,那也就意味着有5个cpu core是空闲的,这样申请的资源就浪费了一半。 |
1 | 为什么这样说呢? |
scala
1 | 下面我们来实际写个案例看一下效果 |
java
1 | package com.imooc.java; |
1 | 对代码编译打包 |

1 | 任务提交到集群运行之后,查看spark的任务界面(job) |

1 | 然后去看satges界面,两个Stage都是5个task并行执行,这5个task会使用5个cpu,但是我们给这个任务申请了10个cpu,所以就有5个是空闲的了(这里没考虑driver的占用)。 |

注意

1 | 当在sparkContext生成对象后,再设置默认并行度会出现问题 |

提高性能
1 | 如果想要最大限度利用CPU的性能,至少将spark.default.parallelism的值设置为10,这样可以实现一个cpu运行一个task,其实官方推荐是设置为20或者30。 |
1 | 为了看起来更清晰,在这我们使用 yarn-client 模式,这样driver就不会占用我们的分配的executor了 |
1 | 由于修改了代码,所以需要重新编译,打包,执行 |



1 | 这就是并行度相关的设置 |

1 | 这个图中描述的就是刚才我们演示的两种情况下Executor和Task之间的关系 |
spark-submit常用参数
1 | 最后我们来分析总结一下spark-submit脚本中经常配置的一些参数 |
1 | 最后注意一点:针对 --num-executors 和 --executor-cores 的设置 |
1 | 这两种设置最终都会向集群申请2个cpu core,可以并行运行两个task,但是这两种设置方式有什么区别呢? |
数据本地化
1 | 数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的,那么性能当然会非常高。但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。通常来说,移动代码到其它节点,会比移动数据到代码所在的节点,速度要得多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。 |

1 | Spark倾向使用最好的本地化级别调度task,但这是不现实的 |
1 | spark.locality.wait(3000毫秒):默认等待3秒(通用的所有级别) |
