第十一周 Spark性能优化的道与术-2
checkpoint概述
1 | checkpoint,是Spark提供的一个比较高级的功能。有时候,我们的Spark任务,比较复杂,从初始化RDD开始,到最后整个任务完成,有比较多的步骤,比如超过10个transformation算子。而且,整个任务运行的时间也特别长,比如通常要运行1~2个小时。在这种情况下,就比较适合使用checkpoint功能了。 |
1 | 那如何使用checkPoint呢? |
RDD之checkpoint流程

1 | 1:SparkContext设置checkpoint目录,用于存放checkpoint的数据; |
checkpoint与持久化的区别
1 | 那这里所说的checkpoint和我们之前讲的RDD持久化有什么区别吗? |
checkPoint的使用
scala
1 | 那我们来演示一下:将一个RDD的数据持久化到HDFS上面 |
1 | package com.imooc.scala |
java
1 | package com.imooc.java; |
提交到集群执行
1 | 下面我们把这个任务打包提交到集群上运行一下,看一下效果。 |

1 | 将pom.xml中的spark-core的依赖设置为provided,然后编译打包 |
1 | 将打包的jar包上传到bigdata04的/data/soft/sparkjars目录,创建一个新的spark-submit脚本 |

1 | 执行成功之后可以到setCheckpointDir指定的目录中查看一下,可以看到目录中会生成对应的文件保存rdd中的数据,只不过生成的文件不是普通文本文件,直接查看文件中的内容显示为乱码。 |



1 | 接下来进到YARN的8088界面查看 |

1 | 在这可以看出来产生了2个job, |
1 | 具体想知道哪些Stage属于哪个job任务的话,可以在任务界面,点击Description中的链接就可以看到job对应的Stage |

1 | 第一个job其实就是我们实现的单词计数的功能,这个任务产生了两个stage,这两个stage具体是如何划分的呢? |

1 | stage id:stage的编号,从0开始 |

1 | 进到Stage内部看一下 |


1 | 下面可以看到每个task的具体执行情况,执行状态,执行消耗的时间,GC消耗的时间,处理的数据量和数据条数、通过shuffle输出的数据量和数据条数 |

1 | 这就是这个Stage执行的基本信息了。 |

1 | 这个job只会产生一个stage,因为我们只针对textFile的结果设置了checkpoint |

1 | 这个stage执行消耗了35s,说明这份数据是重新通过textFile读取过来的。 |

1 | 下面我们来验证一下在开启持久化的情况下执行checkpoint操作时的区别 |

1 | 2:查看DAG Visualization,你会发现stage里面也会有有一些不一样的地方 |

checkpoint源码分析
1 | 前面我们通过理论层面分析了checkpoint的原理,以及演示了checkpoint的使用 |

1 | 把下载的安装包解压到idea项目目录中 |

1 | 下面我们就来分析一下RDD的checkpoint功能: |
checkpoint的写操作
1 | 1.1 :当我们在自己开发的spark任务中先调用sc.setCheckpointDir时,底层其实就会调用 |
1 | def setCheckpointDir(directory: String) { |
1 | 1.2:接着我们会调用RDD.checkpoint方法,此时会执行RDD这个class中的 checkpoint方法 |
1 | 1.3:剩下的就是在这个设置了checkpint的RDD所在的job执行结束之后,Spark会调用job中最后一个RDD的doCheckpoint方法 |
1 | //在有action动作时,会触发sparkcontext对runJob的调用 |
1 | 1.4:接着会进入到RDD中的 doCheckpoint 方法 |
1 | private[spark] def doCheckpoint(): Unit = { |
1 | 1.5:接下来进入到 RDDCheckpointData 的 checkpoint 方法中 |
1 | 1.6:接着来进入 ReliableCheckpointRDD 中的 doCheckpoint() 方法 |
1 | protected override def doCheckpoint(): CheckpointRDD[T] = { |
1 | 1.7:接下来进入 ReliableCheckpointRDD 的 writeRDDToCheckpointDirectory 方法 |
1 | //将rdd的数据写入HDFS中checkpoint目录,并且创建CheckpointRDD |
1 | 执行到这,其实调用过checkpoint方法的RDD就被保存到HDFS上了。 |
1 | 注意:在这里通过checkpoint操作将RDD中的数据写入到HDFS中的时候,会调用RDD中的 |
1 | 那我们来分析一下这块的代码 |
1 | final def iterator(split: Partition, context: TaskContext): Iterator[T] = { |
1 | 进入 computeOrReadCheckpoint(split, context) 中 |
1 | private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskCont |
1 | 在这会执行RDD的子类 HadoopRDD 中的 compute 方法 |
1 | override def compute(theSplit: Partition, context: TaskContext): Interruptibl |
1 | 这样经过几次迭代之后就可以获取到RDD中所有分区的数据了,因为这个compute是一次获取一个分区 |
checkpoint的读操作
1 | 那接下来我们来分析一下checkpoint读数据这个操作 |
1 | final def iterator(split: Partition, context: TaskContext): Iterator[T] = { |
1 | 进入computeOrReadCheckpoint方法 |
1 | private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskCont |
1 | 此时会执行 ReliableCheckpointRDD 这个子类中的 compute 方法 |
1 | override def compute(split: Partition, context: TaskContext): Iterator[T] = { |
1 | 这是从checkpoint中读取数据的流程 |
1 | 此时在最后将RDD中的数据通过checkpoint存储到HDFS上的时候,会调用RDD的iterator方法,不过此 |
1 | final def iterator(split: Partition, context: TaskContext): Iterator[T] = { |
1 | 进入 getOrCompute 方法 |
1 | private[spark] def getOrCompute(partition: Partition, context: TaskContext): |