大数据开发工程师-第十一周 Spark性能优化的道与术-4


第十一周 Spark性能优化的道与术-算子优化-4

map vs mapPartitions

1
2
3
4
5
6
7
8
map操作:对RDD 中的每个元素进行操作,一次处理一条数据

mapPartitions操作(transformation操作):对RDD中每个partition进行操作,一次处理一个分区的数据

所以:
map操作:执行1次map算子只处理1个元素,如果partition中元素较多,假设当前已经处理了1000个元素,在内存不足的情况下,Spark可以通过GC等方法回收内存(比如将已处理掉的1000个元素从内存中回收)。因此,map操作通常不会导致OOM异常;

mapPartitions操作:执行1次map算子需要接收该partition中的所有元素,因此一旦元素很多而内存不足,就容易导致OOM的异常,也不是说一定就会产生OOM异常,只是和map算子对比的话,相对来说容易产生OOM异常(OOM,java.lang.OutOfMemoryError 错误,也就是java内存溢出错误。)
1
2
3
4
不过一般情况下,mapPartitions的性能更高;初始化操作、数据库链接等操作适合使用mapPartitions操作

这是因为:
假设需要将RDD中的每个元素写入数据库中,这时候就应该把创建数据库链接的操作放置在mapPartitions中,创建数据库链接这个操作本身就是个比较耗时的,如果该操作放在map中执行,将会频繁执行,比较耗时且影响数据库的稳定性。

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
/**
* 需求:mapPartitons的使用
* Created by xuwei
*/
object MapPartitionsOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("MapPartitionsOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
//设置分区数量为2
val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)
//map算子一次处理一条数据
/*val sum = dataRDD.map(item=>{
println("==============")
item * 2
}).reduce( _ + _)*/
//mapPartitions算子一次处理一个分区的数据
val sum = dataRDD.mapPartitions(it=>{
//建议针对初始化链接之类的操作,使用mapPartitions,放在mapPartitions内部
//例如:创建数据库链接,使用mapPartitions可以减少链接创建的次数,提高性能
//注意:创建数据库链接的代码建议放在次数,不要放在Driver端或者it.foreach内部
//数据库链接放在Driver端会导致链接无法序列化,无法传递到对应的task中执行,所以
//数据库链接放在it.foreach()内部还是会创建多个链接,和使用map算子的效果是一样
println("==================")
val result = new ArrayBuffer[Int]()
//这个foreach是调用的scala里面的函数
it.foreach(item=>{
result.+=(item * 2)
})
//关闭数据库链接
result.toIterator
}).reduce(_ + _)
println("sum:"+sum)
sc.stop()
}
}

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.imooc.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
/**
* 需求:mapPartitons的使用
* Created by xuwei
*/
public class MapPartitionsOpJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("MapPartitionsOpJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
Integer sum = dataRDD.mapPartitions(new FlatMapFunction<Iterator<Inte
@Override
public Iterator<Integer> call(Iterator<Integer> it) throws Except
//数据库链接的代码需要放在这个位置
ArrayList<Integer> list = new ArrayList<>();
while (it.hasNext()) {
list.add(it.next() * 2);
}
//关闭数据库链接
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
System.out.println("sum:"+sum);
}
}

foreach vs foreachPartition

1
2
3
foreach:一次处理一条数据
foreachPartition:一次处理一个分区的数据
foreachPartition的特性和mapPartitions的特性是一样的,唯一的区别就是mapPartitions是transformation操作(不会立即执行),foreachPartition是action操作(会立即执行)

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:foreachPartition的使用
* Created by xuwei
*/
object ForeachPartitionOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ForeachPartitionOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
//设置分区数量为2
val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)
//foreachPartition:一次处理一个分区的数据,作用和mapPartitions类似
//唯一的区是mapPartitions是transformation算子,foreachPartition是action算子
dataRDD.foreachPartition(it=>{
//在此处获取数据库链接
println("===============")
it.foreach(item=>{ // 和对RDD处理的foreach不一样
//在这里使用数据库链接
println(item)
})
//关闭数据库链接
})
sc.stop()
}
}

image-20230328144113035

image-20230328144052865

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.imooc.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
import java.util.Iterator;
/**
* 需求:foreachPartition的使用
* Created by xuwei
*/
public class ForeachPartitionOpJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("ForeachPartitionOpJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5
dataRDD.foreachPartition(new VoidFunction<Iterator<Integer>>() {
@Override
public void call(Iterator<Integer> it) throws Exception {
System.out.println("=============");
//在此处获取数据库链接
while (it.hasNext()){
//在这里使用数据库链接
System.out.println(it.next());
}
//关闭数据库链接
}
});
sc.stop();
}
}

repartition的使用

1
2
3
4
5
对RDD进行重分区,repartition主要有两个应用场景:
1. 可以调整RDD的并行度
针对个别RDD,如果感觉分区数量不合适,想要调整,可以通过repartition进行调整,分区调整了之后,对应的并行度也就可以调整了
2. 可以解决RDD中数据倾斜的问题
如果RDD中不同分区之间的数据出现了数据倾斜,可以通过repartition实现数据重新分发,可以均匀分发到不同分区中

Repartition和Coalesce 的关系与区别,能简单说说吗?

1
2
3
4
5
6
7
8
这道题就已经开始掺和有“源码”的味道了,为什么呢?
1)关系:
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)

2)区别:
repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。

一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:repartition的使用
* Created by xuwei
*/
object RepartitionOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("RepartitionOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
//设置分区数量为2
val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)
//重新设置RDD的分区数量为3,这个操作会产生shuffle
//也可以解决RDD中数据倾斜的问题
dataRDD.repartition(3)
.foreachPartition(it=>{
println("=========") // 运行程序会输出3次
it.foreach(println(_))
})
//通过repartition可以控制输出数据产生的文件个数
dataRDD.saveAsTextFile("hdfs://bigdata01:9000/rep-001") // 生成3个文件
dataRDD.repartition(1).saveAsTextFile("hdfs://bigdata01:9000/rep-002") // 生成1个文件
sc.stop()
}
}

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.imooc.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
import java.util.Iterator;
/**
* 需求:repartition的使用
* Created by xuwei
*/
public class RepartitionOpJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("RepartitionOpJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5),2);
dataRDD.repartition(3)
.foreachPartition(new VoidFunction<Iterator<Integer>>() {
@Override
public void call(Iterator<Integer> it) throws Exception {
System.out.println("==============");
while (it.hasNext()){
System.out.println(it.next());
}
});
sc.stop();
}
}

reduceByKey和groupByKey的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
在实现分组聚合功能时这两个算子有什么区别?
看这两行代码
val counts = wordCountRDD.reduceByKey(_ + _)
val counts = wordCountRDD.groupByKey().map(wc => (wc._1, wc._2.sum))

这两行代码的最终效果是一样的,都是对wordCountRDD中每个单词出现的次数进行聚合统计
那这两种方式在原理层面有什么区别吗?
首先这两个算子在执行的时候都会产生shuffle
但是:
1:当采用reduceByKey时,数据在进行shuffle之前会先进行局部聚合
2:当使用groupByKey时,数据在shuffle之间不会进行局部聚合,会原样进行shuffle

这样的话reduceByKey就减少了shuffle的数据传送,所以效率会高一些。
如果能用reduceByKey,就用reduceByKey,因为它会在map端,先进行本地combine,可以大大减少要传输到reduce端的数据量,减少网络传输的开销
1
2
3
下面来看这个图,加深一下理解

从图中可以看出来reduceByKey在shuffle之前会先对数据进行局部聚合,而groupByKey不会,所以在实现分组聚合的需求中,reduceByKey性能略胜一筹。

image-20230328151345043


本文标题:大数据开发工程师-第十一周 Spark性能优化的道与术-4

文章作者:TTYONG

发布时间:2023年03月28日 - 01:03

最后更新:2023年05月29日 - 18:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%B8%80%E5%91%A8-Spark%E6%80%A7%E8%83%BD%E4%BC%98%E5%8C%96%E7%9A%84%E9%81%93%E4%B8%8E%E6%9C%AF-%E7%AE%97%E5%AD%90%E4%BC%98%E5%8C%96-4.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%