大数据开发工程师-第十一周 Spark性能优化的道与术-2


第十一周 Spark性能优化的道与术-2

checkpoint概述

1
2
3
4
checkpoint,是Spark提供的一个比较高级的功能。有时候,我们的Spark任务,比较复杂,从初始化RDD开始,到最后整个任务完成,有比较多的步骤,比如超过10个transformation算子。而且,整个任务运行的时间也特别长,比如通常要运行1~2个小时。在这种情况下,就比较适合使用checkpoint功能了。

因为对于特别复杂的Spark任务,有很高的风险会出现某个要反复使用的RDD因为节点的故障导致丢失,虽然之前持久化过,但是还是导致数据丢失了。那么也就是说,出现失败的时候,没有容错机制,所以当后面的transformation算子,又要使用到该RDD时,就会发现数据丢失了,此时如果没有进行容错处理的话,那么就需要再重新计算一次数据了。
所以针对这种Spark Job,如果我们担心某些关键的,在后面会反复使用的RDD,因为节点故障导致数据丢失,那么可以针对该RDD启动checkpoint机制,实现容错和高可用
1
2
3
4
5
那如何使用checkPoint呢?
首先要调用SparkContext的setCheckpointDir()方法,设置一个容错的文件系统的目录,比如HDFS;然后,对RDD调用checkpoint()方法。
最后,在RDD所在的job运行结束之后,会启动一个单独的job,将checkpoint设置过的RDD的数据写入之前设置的文件系统中。

这是checkpoint使用的基本步骤,很简单,那我们下面先从理论层面分析一下当我们设置好checkpoint之后,Spark底层都做了哪些事情

RDD之checkpoint流程

image-20230327144700422

1
2
3
4
5
6
1:SparkContext设置checkpoint目录,用于存放checkpoint的数据;
对RDD调用checkpoint方法,然后它就会被RDDCheckpointData对象进行管理,此时这个RDD的checkpoint状态会被设置为Initialized
2:待RDD所在的job运行结束,会调用job中最后一个RDD的doCheckpoint方法,该方法沿着RDD的血缘关系向上查找被checkpoint()方法标记过的RDD,并将其checkpoint状态从Initialized设置为CheckpointingInProgress
3:启动一个单独的job,来将血缘关系中标记为CheckpointInProgress的RDD执行checkpoint操作,也就是将其数据写入checkpoint目录
4:将RDD数据写入checkpoint目录之后,会将RDD状态改变Checkpointed;
并且还会改变RDD的血缘关系,即会清除掉RDD所有依赖的RDD;最后还会设置其父RDD为新创建的CheckpointRDD

checkpoint与持久化的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
那这里所说的checkpoint和我们之前讲的RDD持久化有什么区别吗?
lineage是否发生改变linage(血缘关系)说的就是RDD之间的依赖关系

持久化,只是将数据保存在内存中或者本地磁盘文件中,RDD的lineage(血缘关系)是不变的;

Checkpoint执行之后,RDD就没有依赖的RDD了,也就是它的lineage改变了
丢失数据的可能性持久化的数据丢失的可能性较大,如果采用persist把数据存在内存中的话,虽然速度最快但是也是最不可靠的,就算放在磁盘上也不是完全可靠的,因为磁盘也会损坏。Checkpoint的数据通常是保存在高可用文件系统中(HDFS),丢失的可能性很低

建议:对需要checkpoint的RDD,先执行persist(StorageLevel.DISK_ONLY)
为什么呢?

因为默认情况下,如果某个RDD没有持久化,但是设置了checkpoint,那么这个时候,本来Spark任务已经执行结束了,但是由于中间的RDD没有持久化,在进行checkpoint的时候想要将这个RDD的数据写入外部存储系统的话,就需要重新计算这个RDD的数据,再将其checkpoint到外部存储系统中。
如果对需要checkpoint的rdd进行了基于磁盘的持久化,那么后面进行checkpoint操作时,就会直接从磁盘上读取rdd的数据了,就不需要重新再计算一次了,这样效率就高了。

那在这能不能使用基于内存的持久化呢?当然是可以的,不过没那个必要。

checkPoint的使用

scala

1
2
那我们来演示一下:将一个RDD的数据持久化到HDFS上面
scala代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:checkpoint的使用
* Created by xuwei
*/
object CheckPointOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("CheckPointOpScala")
val sc = new SparkContext(conf)
if(args.length==0){
System.exit(100)
}
val outputPath = args(0)
//1:设置checkpint目录
sc.setCheckpointDir("hdfs://bigdata01:9000/chk001")
val dataRDD = sc.textFile("hdfs://bigdata01:9000/hello_10000000.dat")

//2:对rdd执行checkpoint操作
dataRDD.checkpoint()
dataRDD.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_ + _)
.saveAsTextFile(outputPath)
sc.stop()
}
}

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.imooc.java;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* 需求:checkpoint的使用
* Created by xuwei
*/
public class CheckPointOpJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("CheckPointOpJava");
JavaSparkContext sc = new JavaSparkContext(conf);
if(args.length==0){
System.exit(100);
}
String outputPath = args[0];

//1:设置checkpint目录
sc.setCheckpointDir("hdfs://bigdata01:9000/chk002");
JavaRDD<String> dataRDD = sc.textFile("hdfs://bigdata01:9000/hello_100000.dat");
//2: 对rdd执行checkpoint操作
dataRDD.checkpoint();
dataRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
}).mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception
return new Tuple2<String, Integer>(word,1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
}).saveAsTextFile(outputPath);
sc.stop();
}
}

提交到集群执行

1
2
3
4
5
下面我们把这个任务打包提交到集群上运行一下,看一下效果。
代码master部分注释掉

先确保hadoop集群是正常运行的,以及hadoop中的historyserver进程和spark的historyserver进程也是正常运行的。
测试数据之前已经上传到了hdfs上面,如果没有则需要上传

image-20230327151632675

1
2
3
4
5
6
7
8
将pom.xml中的spark-core的依赖设置为provided,然后编译打包

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
1
将打包的jar包上传到bigdata04的/data/soft/sparkjars目录,创建一个新的spark-submit脚本

image-20230327151903625

1
执行成功之后可以到setCheckpointDir指定的目录中查看一下,可以看到目录中会生成对应的文件保存rdd中的数据,只不过生成的文件不是普通文本文件,直接查看文件中的内容显示为乱码。

image-20230327152317696

image-20230327152338046

image-20230327152509680

1
2
接下来进到YARN的8088界面查看
点击Tracking UI进入spark的ui界面看第一个界面jobs

image-20230327152656935

1
2
3
4
5
在这可以看出来产生了2个job,
第一个job是我们正常的任务执行,执行了39s,一共产生了28个task任务
第二个job是checkpoint启动的job,执行了35s,一共产生了14个task任务

看第二个界面Stages,这里面的3个Stage是前面2个job产生的
1
具体想知道哪些Stage属于哪个job任务的话,可以在任务界面,点击Description中的链接就可以看到job对应的Stage

image-20230327152845211

1
2
3
第一个job其实就是我们实现的单词计数的功能,这个任务产生了两个stage,这两个stage具体是如何划分的呢?
咱们前面讲过,stage的划分是由宽依赖决定的,在这个任务中reduceByKey这个过程会产生宽依赖,所以会产生2个Stage
这里面显示的有这两个stage的一些基本信息

image-20230327153130946

1
2
3
4
5
6
7
stage id:stage的编号,从0开始
Duration:stage执行消耗的时间
Tasks:Successed/Total:task执行成功数量/task总量
Input:输入数据量
ouput:输出数据量
shuffle read/shuffle read:shuffle过程传输数据量
点击这个界面中的DAG Visualization可以看到当前这个任务stage的划分情况,可以看到每个Stage包含哪些算子

image-20230327153403130

1
进到Stage内部看一下

image-20230327154126374

image-20230327154248904

1
2
下面可以看到每个task的具体执行情况,执行状态,执行消耗的时间,GC消耗的时间,处理的数据量和数据条数、通过shuffle输出的数据量和数据条数
其实从这里也可以看出来文件的每一个block块会产生一个task

image-20230327154559437

1
2
这就是这个Stage执行的基本信息了。
加下来看一下第二个Job,这个job是checkpoint启动的任务,查看它的stage的信息

image-20230327154637511

1
这个job只会产生一个stage,因为我们只针对textFile的结果设置了checkpoint

image-20230327154722955

1
2
这个stage执行消耗了35s,说明这份数据是重新通过textFile读取过来的。
针对Storage这块,显示的其实就是持久化的数据,如果对RDD做了持久化,那么在任务执行过程中能看到,任务执行结束就看不到了。

image-20230327154816099

1
2
3
4
下面我们来验证一下在开启持久化的情况下执行checkpoint操作时的区别
在代码中针对RDD开启持久化
1:对比此时产生的两个job总的消耗的时间,以及job中的Stage消耗的时间
其实你会发现开启持久化之后,checkpoint的那个job消耗的时间就变少了

image-20230327155305626

1
2:查看DAG Visualization,你会发现stage里面也会有有一些不一样的地方

image-20230327155658220

checkpoint源码分析

1
2
3
前面我们通过理论层面分析了checkpoint的原理,以及演示了checkpoint的使用
下面我们通过源码层面来对我们前面分析的理论进行验证
先下载spark源码,下载流程和下载spark安装包的流程一样

image-20230327195356368

1
2
3
4
把下载的安装包解压到idea项目目录中

打开spark-2.4.3源码目录,进入core目录,这个是spark的核心代码,我们要查看的checkpoint的源码就在这个项目中
在idea中打开core这个子项目

image-20230327195430272

1
2
3
4
5
6
下面我们就来分析一下RDD的checkpoint功能:
checkpoint功能可以分为两块
1:checkpoint的写操作
将指定RDD的数据通过checkpoint存储到指定外部存储中
2:checkpoint的读操作
任务中RDD数据在使用过程中丢失了,正好这个RDD之前做过checkpoint,所以这时就需要通过checkpoint来恢复数据

checkpoint的写操作

1
2
1.1 :当我们在自己开发的spark任务中先调用sc.setCheckpointDir时,底层其实就会调用
SparkContext中的setCheckpointDir方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def setCheckpointDir(directory: String) {
// If we are running on a cluster, log a warning if the directory is local.
// Otherwise, the driver may attempt to reconstruct the checkpointed RDD fr
// its own local file system, which is incorrect because the checkpoint fil
// are actually on the executor machines.
if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
logWarning("Spark is not running in local mode, therefore the checkpoint
s"must not be on the local filesystem. Directory '$directory' " +
"appears to be on the local filesystem.")
}
//根据我们传过来的目录,后面再拼上一个子目录,子目录使用一个UUID随机字符串
//使用HDFS的javaAPI 在HDFS上创建目录
checkpointDir = Option(directory).map { dir =>
val path = new Path(dir, UUID.randomUUID().toString)
val fs = path.getFileSystem(hadoopConfiguration)
fs.mkdirs(path)
fs.getFileStatus(path).getPath.toString
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1.2:接着我们会调用RDD.checkpoint方法,此时会执行RDD这个class中的 checkpoint方法

//这里相当于是checkpoint的一个标记,并没有真正执行checkpoint
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensu
// children RDD partitions point to the correct parent partitions. In the f
// we should revisit this consideration.
//如果SparkContext没有设置checkpointDir,则抛出异常
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the Sp
} else if (checkpointData.isEmpty) {
//如果设置了,则创建RDDCheckpointData的子类,这个子类主要负责管理RDD的checkpoi
//并且会初始化checkpoint状态为Initialized
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}

这个checkpoint方法执行完成之后,这个流程就结束了。
1
2
3
1.3:剩下的就是在这个设置了checkpint的RDD所在的job执行结束之后,Spark会调用job中最后一个RDD的doCheckpoint方法
这个逻辑是在SparkContext这个class的runJob方法中,当执行到Spark中的action算子时,这个runJob方法会被触发,开始执行任务。
这个runJob的最后一行会调用rdd中的 doCheckpoint 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//在有action动作时,会触发sparkcontext对runJob的调用
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStageException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler,
progressBar.foreach(_.finishAll())
//在这里会执行doCheckpoint()
rdd.doCheckpoint()
}
1
2
3
1.4:接着会进入到RDD中的 doCheckpoint 方法
这里面最终会调用 RDDCheckpointData 的 checkpoint 方法
checkpointData.get.checkpoint()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreP
//该rdd是否已经调用doCheckpoint,如果还没有,则开始处理
if (!doCheckpointCalled) {
doCheckpointCalled = true
//若已经被checkpoint()标记过,则checkpointData.isDefined为true
if (checkpointData.isDefined) {
//查看是否需要把该rdd的所有依赖全部checkpoint
//checkpointAllMarkedAncestors取自配置"spark.checkpoint.checkpointAllM
//默认不配时值为false
if (checkpointAllMarkedAncestors) {
// TODO We can collect all the RDDs that needs to be checkpointed,
// them in parallel.
// Checkpoint parents first because our lineage will be truncated af
// checkpoint ourselves
// 血缘上的每一个父rdd递归调用该方法
dependencies.foreach(_.rdd.doCheckpoint())
}
//调用RDDCheckpointData的checkpoint方法
checkpointData.get.checkpoint()
} else {
//沿着rdd的血缘关系向上查找被checkpoint()标记过的RDD
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1.5:接下来进入到 RDDCheckpointData 的 checkpoint 方法中
这里面会调用子类 ReliableCheckpointRDD 中的 doCheckpoint()方法

final def checkpoint(): Unit = {
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the Stage of this RDDCheckpointData
////将checkpoint的状态从Initialized置为CheckpointingInProgress
RDDCheckpointData.synchronized {
if (cpStage == Initialized) {
cpStage = CheckpointingInProgress
} else {
return
}
}
//调用子类的doCheckpoint,默认会使用ReliableCheckpointRDD子类,创建一个新的Chec
val newRDD = doCheckpoint()
// Update our Stage and truncate the RDD lineage
//将checkpoint状态置为Checkpointed状态,并且改变rdd之前的依赖,设置父rdd为新创建
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpStage = Checkpointed
rdd.markCheckpointed()
}
}
1
2
3
1.6:接着来进入 ReliableCheckpointRDD 中的 doCheckpoint() 方法
这里面会调用 ReliableCheckpointRDD 中的 writeRDDToCheckpointDirectory 方法将rdd的数据写入HDFS
中的 checkpoint 目录,并且返回创建的 CheckpointRDD
1
2
3
4
5
6
7
8
9
10
11
12
protected override def doCheckpoint(): CheckpointRDD[T] = {
//将rdd的数据写入HDFS中的checkpoint目录,并且创建的CheckpointRDD
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir
// Optionally clean our checkpoint files if the reference is out of scope
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints",
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${n
newRDD
}
1
2
1.7:接下来进入 ReliableCheckpointRDD 的 writeRDDToCheckpointDirectory 方法
这里面最终会启动一个job,将checkpoint的数据写入到指定的HDFS目录中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//将rdd的数据写入HDFS中checkpoint目录,并且创建CheckpointRDD
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val checkpointStartTimeNs = System.nanoTime()
val sc = originalRDD.sparkContext
//Create the output path for the checkpoint
//创建checkpoint输出目录
val checkpointDirPath = new Path(checkpointDir)
//获取HDFS文件系统API接口
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
//创建目录
if (!fs.mkdirs(checkpointDirPath)) {
throw new SparkException(s"Failed to create checkpoint path $checkpointDi
}
// Save to file, and reload it as an RDD
//将Hadoop配置文件信息广播到所有节点
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
// TODO: This is expensive because it computes the RDD again unnecessarily
//这里强调了checkpoint是一个昂贵的操作,主要是说它昂贵在需要沿着血缘关系重新计算该
//重新启动一个job,将rdd的分区数据写入HDFS
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcasted
//如果rdd的partitioner不为空,则将partitioner写入checkpoint目录
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpoi
}
val checkpointDurationMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
logInfo(s"Checkpointing took $checkpointDurationMs ms.")
//创建一个CheckpointRDD,该RDD的分区数目和原始的rdd的分区数是一样的
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
throw new SparkException(
"Checkpoint RDD has a different number of partitions from original RDD.
s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partit
s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
s"${newRDD.partitions.length}].")
}
newRDD
}
1
执行到这,其实调用过checkpoint方法的RDD就被保存到HDFS上了。
1
2
注意:在这里通过checkpoint操作将RDD中的数据写入到HDFS中的时候,会调用RDD中的
iterator方法,遍历RDD中所有分区的数据。
1
2
那我们来分析一下这块的代码
此时我们没有对RDD进行持久化,所以走else中的代码
1
2
3
4
5
6
7
8
9
10
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
//如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘
if (storageLevel != StorageLevel.NONE) {
//直接从持久化存储中读取或者计算(如果数据丢失)
getOrCompute(split, context)
} else {
//进行rdd partition的计算或者从checkpoint中读取数据
computeOrReadCheckpoint(split, context)
}
}
1
2
进入 computeOrReadCheckpoint(split, context) 中
此时这个RDD是将要进行checkpoint,还没有完成checkpoint,所以走 else ,会执行 compute 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskCont
{
//当前rdd是否已经checkpoint并且物化,如果已经checkpoint并且物化
//则调用firstParent的iterator方法获取
if (isCheckpointedAndMaterialized) {
//注意:针对checpoint操作,它的血缘关系已经被切断了,那么它的firstParent就是前面
firstParent[T].iterator(split, context)
} else {
//如果没有,则表示持久化数据丢失,或者根本就没有持久化,
//需要调用rdd的compute方法开始重新计算,返回一个Iterator对象
compute(split, context)
}
}
1
2
在这会执行RDD的子类 HadoopRDD 中的 compute 方法
在这里会通过 RecordReader 获取RDD中指定分区的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
override def compute(theSplit: Partition, context: TaskContext): Interruptibl
val iter = new NextIterator[(K, V)] {
private val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
private val jobConf = getJobConf()
private val inputMetrics = context.taskMetrics().inputMetrics
private val existingBytesRead = inputMetrics.bytesRead
// Sets InputFileBlockHolder for the file block's information
split.inputSplit.value match {
case fs: FileSplit =>
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLengt
case _ =>
InputFileBlockHolder.unset()
}
// Find a function that will return the FileSystem bytes read by this thr
// creating RecordReader, because RecordReader's constructor might read s
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.v
case _: FileSplit | _: CombineFileSplit =>
Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}
// We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partit
// task and in the same thread, in which case we need to avoid override v
// previous partitions (SPARK-13071).
private def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}
private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader =
try {
inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
null
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split
finished = true
null
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener[Unit] { context =>
// Update the bytes read before closing is to make sure lingering bytes
// this thread get correctly added.
updateBytesRead()
closeIfNeeded()
}
private val key: K = if (reader == null) null.asInstanceOf[K] else reader
private val value: V = if (reader == null) null.asInstanceOf[V] else read
override def getNext(): (K, V) = {
try {
finished = !reader.next(key, value)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split
finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INT
updateBytesRead()
}
(key, value)
}
override def close(): Unit = {
if (reader != null) {
InputFileBlockHolder.unset()
try {
reader.close()
} catch {
case e: Exception =>
if (!ShutdownHookManager.inShutdown()) {
logWarning("Exception in RecordReader.close()", e)
}
} finally {
reader = null
}
if (getBytesReadCallback.isDefined) {
updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to t
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for ta
}
}
}
}
}
new InterruptibleIterator[(K, V)](context, iter)
}
1
2
3
这样经过几次迭代之后就可以获取到RDD中所有分区的数据了,因为这个compute是一次获取一个分区
的数据。获取到之后checkpoint就可以把这个RDD的数据存储到HDFS上了。
这就是checkpoint的写操作

checkpoint的读操作

1
2
3
4
那接下来我们来分析一下checkpoint读数据这个操作
当RDD中的数据丢失了以后,需要通过checkpoint读取存储在hdfs上的数据,
2.1:这个时候还是会执行RDD中的iterator方法
由于我们没有做持久化,只做了checkpoint,所以还是会走 else
1
2
3
4
5
6
7
8
9
10
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
//如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘
if (storageLevel != StorageLevel.NONE) {
//直接从持久化存储中读取或者计算(如果数据丢失)
getOrCompute(split, context)
} else {
//进行rdd partition的计算或者从checkpoint中读取数据
computeOrReadCheckpoint(split, context)
}
}
1
2
3
4
5
6
7
8
进入computeOrReadCheckpoint方法
此时rdd已经checkpoint并且物化,所以 if 分支满足
执行 firstParent[T].iterator(split, context) 这行代码

这行代码的意思是会找到当前这个RDD的父RDD,其实这个RDD执行过checkpoint之后,血缘关系已经被切断了,它的父RDD就是我们前面创建的那个ReliableCheckpointRDD
这个ReliableCheckpointRDD中没有覆盖 iterator 方法,所以在调用 iterator 的时候还是执行RDD这个
父类中的 iterator ,重新进来之后再判断,这个 ReliableCheckpointRDD 再执行if判断的时候就不满足
了,因为它的 checkpoint 属性不满足,所以会走 else ,执行 compute
1
2
3
4
5
6
7
8
9
10
11
12
13
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskCont
{
//当前rdd是否已经checkpoint并且物化,如果已经checkpoint并且物化
//则调用firstParent的iterator方法获取
if (isCheckpointedAndMaterialized) {
//注意:针对checpoint操作,它的血缘关系已经被切断了,那么它的firstParent就是前面
firstParent[T].iterator(split, context)
} else {
//如果没有,则表示持久化数据丢失,或者根本就没有持久化,
//需要调用rdd的compute方法开始重新计算,返回一个Iterator对象
compute(split, context)
}
}
1
2
此时会执行 ReliableCheckpointRDD 这个子类中的 compute 方法
这里面就会找到之前checkpoint的文件,从HDFS上恢复RDD中的数据。
1
2
3
4
5
6
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
//获取checkpoint文件
val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileNam
//从HDFS上的checkpoint文件中读取checkpoint的数据
ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}
1
2
3
这是从checkpoint中读取数据的流程
咱们前面说过,建议对需要做checkpoint的数据先进行持久化,如果我们设置了持久化,针对
checkpoint的写操作,在执行iterator方法的时候会是什么现象呢?
1
2
3
此时在最后将RDD中的数据通过checkpoint存储到HDFS上的时候,会调用RDD的iterator方法,不过此
时 storageLevel 就不为 null 了,因为我们对这个RDD做了基于磁盘的持久化,所以会走 if 分支,执行
getOrCompute
1
2
3
4
5
6
7
8
9
10
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
//如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘
if (storageLevel != StorageLevel.NONE) {
//直接从持久化存储中读取或者计算(如果数据丢失)
getOrCompute(split, context)
} else {
//进行rdd partition的计算或者从checkpoint中读取数据
computeOrReadCheckpoint(split, context)
}
}
1
2
3
进入 getOrCompute 方法
由于这个RDD的数据已经做了持久化,所以在这就可以从 blockmanager 中读取数据了,就不需要重新从
源头计算或者拉取数据了,所以会提高 checkpoint 的效率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private[spark] def getOrCompute(partition: Partition, context: TaskContext): 
val blockId = RDDBlockId(id, partition.index)
var readCachedBlock = true
// This method is called on executors, so we need call SparkEnv.get instead
SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementCla
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[It
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iter
}
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
}

本文标题:大数据开发工程师-第十一周 Spark性能优化的道与术-2

文章作者:TTYONG

发布时间:2023年03月27日 - 14:03

最后更新:2023年05月29日 - 13:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%B8%80%E5%91%A8-Spark%E6%80%A7%E8%83%BD%E4%BC%98%E5%8C%96%E7%9A%84%E9%81%93%E4%B8%8E%E6%9C%AF-checkpoint-2.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%