大数据开发工程师-第十周 第5章 RDD持久化


第十周 第5章 RDD持久化

RDD持久化原理

1
2
3
4
5
Spark中有一个非常重要的功能就是可以对RDD进行持久化。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition数据持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存中缓存的partition数据。

这样的话,针对一个RDD反复执行多个操作的场景,就只需要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。
因为正常情况下这个RDD的数据使用过后内存中是不会一直保存的。
例如这样的操作:针对mapRDD需要多次使用的
1
2
3
4
5
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
val mapRDD = dataRDD.map(...)
mapRDD.foreach(...)
mapRDD.saveAsTextFile(...)
mapRDD.collect()
1
2
3
4
5
6
巧妙使用RDD持久化,在某些场景下,对spark应用程序的性能有很大提升。特别是对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。要持久化一个RDD,只需要调用它的cache()或者persist()方法就可以了。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition数据丢失了,那么Spark会自动通过其源RDD,使用transformation算子重新计算该partition的数据。

cache()和persist()的区别在于:
cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,也就是调用persist(MEMORY_ONLY),将数据持久化到内存中。

如果需要从内存中清除缓存,那么可以使用unpersist()方法。

RDD持久化策略

1
2
3
4
5
6
7
8
9
下面看一下目前Spark支持的一些持久化策略

策略 介绍
MEMORY_ONLY 以非序列化的方式持久化在JVM内存中
MEMORY_AND_DISK 同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中
MEMORY_ONLY_SER 同MEMORY_ONLY,但是会序列化
MEMORY_AND_DISK_SER 同MEMORY_AND_DSK,但是会序列化
DISK_ONLY 以非序列化的方式完全存储到磁盘上
MEMORY_ONLY_2、MEMORY_AND_DISK_2等 尾部加了2的持久化级别,表示会将持久化数据复制
1
2
3
4
5
6
7
8
9
10
11
12
补充说明:
MEMORY_ONLY:以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它的时候,重新被计算。

MEMORY_AND_DISK:当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取,不需要重新计算

MEMORY_ONLY_SER:同MEMORY_ONLY,但是会使用Java的序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是在使用的时候需要进行反序列化,因此会增加CPU开销。

MEMORY_AND_DISK_SER:同MEMORY_AND_DSK。但是会使用序列化方式持久化Java对象。

DISK_ONLY:使用非序列化Java对象的方式持久化,完全存储到磁盘上。

MEMORY_ONLY_2、MEMORY_AND_DISK_2等:如果是尾部加了2的持久化级别,表示会将持久化数据复制一份,保存到其它节点,从而在数据丢失时,不需要重新计算,只需要使用备份数据即可。

如何选择RDD持久化策略

1
2
3
4
5
6
7
8
9
Spark提供了多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。
下面是一些通用的持久化级别的选择建议:
1. 优先使用MEMORY_ONLY,纯内存速度最快,而且没有序列化不需要消耗CPU进行反序列化操作,缺点就是比较耗内存
2. MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快,只是在使用的时候需要消耗CPU进行反序列化

注意:
如果需要进行数据的快速失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了

能不使用DISK相关的策略,就不要使用,因为有的时候,从磁盘读取数据,还不如重新计算一次。

案例:使用RDD的持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
下面来写一个案例验证一下RDD持久化的效果
scala代码如下:

package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:RDD持久化
* Created by xuwei
*/
object PersistRddScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("PersistRddScala")
.setMaster("local")
val sc = new SparkContext(conf)
val dataRDD = sc.textFile("D:\\hello_10000000.dat").cache()
var start_time = System.currentTimeMillis()
var count = dataRDD.count()
println(count)
var end_time = System.currentTimeMillis()
println("第一次耗时:"+(end_time-start_time))
start_time = System.currentTimeMillis()
count = dataRDD.count()
println(count)
end_time = System.currentTimeMillis()
println("第二次耗时:"+(end_time-start_time))
sc.stop()
}
}
1
2
在没有添加cache之前,每一次都耗时很长
加上cache之后,第二次计算耗时就很少了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java代码如下:

package com.imooc.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* 需求:RDD持久化
* Created by xuwei
*/
public class PersistRddJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("PersistRddJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> dataRDD = sc.textFile("D:\\hello_10000000.dat").cache()
long start_time = System.currentTimeMillis();
long count = dataRDD.count();
System.out.println(count);
long end_time = System.currentTimeMillis();
System.out.println("第一次耗时:"+(end_time-start_time));
start_time = System.currentTimeMillis();
count = dataRDD.count();
System.out.println(count);
end_time = System.currentTimeMillis();
System.out.println("第二次耗时:"+(end_time-start_time));
sc.stop();
}
}

共享变量

共享变量的工作原理

1
2
3
4
5
6
Spark还有一个非常重要的特性就是共享变量
默认情况下,如果在一个算子函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量数据。如果多个task想要共享某个变量,那么这种方式是做不到的。

Spark为此提供了两种共享变量
一种是Broadcast Variable(广播变量)
另一种是Accumulator(累加变量)

Broadcast Variable

1
2
3
4
5
6
7
8
Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,而不会为每个task都拷贝一份副本,因此其最大的作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗
通过调用SparkContext的broadcast()方法,针对某个变量创建广播变量

注意:广播变量,是只读的

然后在算子函数内,使用到广播变量时,每个节点只会拷贝一份副本。可以使用广播变量的value()方法获取值。

接下来看一个图深入理解一下

image-20230326155804742

1
2
3
4
5
6
7
8
9
10
11
12
13
先看左边的代码
这个是一个咱们经常使用的map算子的代码,map算子中执行对每一个元素乘以一个固定变量的操作,此时这个固定的变量属于外部变量。

默认情况下,算子函数内,使用到的外部变量,会被拷贝到执行这个算子的每一个task中
看图中间的MapTask,这些都是map算子产生的task,也就是说这个外部变量会被拷贝到每一个task中。

如果这个外部变量是一个集合,集合中有上亿条数据,这个网络传输就会很耗时,而且在每个task上,占用的内存空间,也会很大

如果算子函数中使用的外部变量,是广播变量的话,那么每个变量只会拷贝一份到每个节点上。节点上所有的task都会共享这一份变量,就可以减少网络传输消耗的时间,以及减少内存占用了。

大家可以想象一个极端情况,如果map算子有10个task,恰好这10个task还都在一个worker节点上,那么这个时候,map算子使用的外部变量就会在这个worker节点上保存10份,这样就很占用内存了。

下面我们来具体使用一下这个广播变量
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:使用广播变量
* Created by xuwei
*/
object BroadcastOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("BroadcastOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
val varable = 2
//dataRDD.map(_ * varable)
//1:定义广播变量
val varableBroadcast = sc.broadcast(varable)
//2:使用广播变量,调用其value方法
dataRDD.map(_ * varableBroadcast.value).foreach(println(_))
sc.stop()
}
}
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.imooc.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import java.util.Arrays;
/**
* 需求:使用广播变量
* Created by xuwei
*/
public class BroadcastOpJava{
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("BroadcastOpJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
int varable = 2;
//1:定义广播变量
Broadcast<Integer> varableBroadcast = sc.broadcast(varable);
//2:使用广播变量
dataRDD.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer i1) throws Exception {
return i1 * varableBroadcast.value();
}
}).foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer i) throws Exception {
System.out.println(i);
}
});
sc.stop();
}
}

Accumulator

1
2
3
4
5
6
Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。
正常情况下在Spark的任务中,由于一个算子可能会产生多个task并行执行,所以在这个算子内部执行的聚合计算都是局部的,想要实现多个task进行全局聚合计算,此时需要使用到Accumulator这个共享的累加变量。

注意:Accumulator只提供了累加的功能。在task只能对Accumulator进行累加操作,不能读取它的值。只有在Driver进程中才可以读取Accumulator的值。

下面我们来写一个案例
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:使用累加变量
* Created by xuwei
*/
object AccumulatorOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("AccumulatorOpScala")
.setMaster("local")
val sc = new SparkContext(conf)
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
//这种写法是错误的,因为foreach代码是在worker节点上执行的
// var total = 0和println("total:"+total)是在Driver进程中执行的
//所以无法实现累加操作
//并且foreach算子可能会在多个task中执行,这样foreach内部实现的累加也不是最终全局
/*var total = 0
dataRDD.foreach(num=>total += num)
println("total:"+total)*/
//所以此时想要实现累加操作就需要使用累加变量了
//1:定义累加变量
val sumAccumulator = sc.longAccumulator
//2:使用累加变量
dataRDD.foreach(num=>sumAccumulator.add(num))
//注意:只能在Driver进程中获取累加变量的结果
println(sumAccumulator.value)
}
}
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.imooc.java;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.util.LongAccumulator;
import java.util.Arrays;
/**
* 需求:使用累加变量
* Created by xuwei
*/
public class AccumulatorOpJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("AccumulatorOpJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5);
//1:定义累加变量
LongAccumulator sumAccumulator = sc.sc().longAccumulator();
//2:使用累加变量
dataRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer i) throws Exception {
sumAccumulator.add(i);
}
});
//注意:只能在Driver进程中获取累加变量的值
System.out.println(sumAccumulator.value());
}
}

本文标题:大数据开发工程师-第十周 第5章 RDD持久化

文章作者:TTYONG

发布时间:2022年03月02日 - 10:03

最后更新:2023年05月27日 - 00:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%91%A8-%E7%AC%AC5%E7%AB%A0-RDD%E6%8C%81%E4%B9%85%E5%8C%96.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%