大数据开发工程师-第十七周-Flink极速上手篇-Flink新版本1.12以上-3


Flink新版本1.12以上-3

Checkpoint与State剖析

1
2
3
前面我们已经掌握了Checkpoint和State使用,下面我们来从底层原理层面深度分析一下Checkpoint和State的细节流程。

首先是checkpoint的生成过程

Checkpoint的生成过程

image-20230602174046655

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
我们先整体看一下这个图:

首先看图中左边的内容,这块内容表示是输入数据流中的数据以及对应的偏移量。
其中里面的send、follow是具体的数据,下面的1、2、3、4、5、6是数据对应的偏移量。
这个输入数据流表示是直播间内用户产生的实时行为数据,send表示送礼,follow表示关注
在这里我们需要对这些用户行为数据实时统计,统计出每种行为出现的总次数。

中间的[send ,4]表示目前消费者消费到的那条数据及对应的偏移量,这个信息会存储在基于内存的状态中。

右边的[count(send), 3]、[count(follow) ,1],这些是实时汇总的结果,这些数据也会存储在基于内存的状态中。

Flink触发执行Checkpoint之后会把内存中存储的状态数据写入到下面的持久化存储中。

下面我们来详细看一下checkpoint的执行流程。
1:当消费到[send,4]这条数据时,正好达到了checkpoint的执行时机,此时JobManager中的checkpoint coordinator会触发checkpoint开始执行。
此时状态中存储的消费偏移量是4

2-1:checkpoint真正开始执行的时候,他会先把状态中维护的消费偏移量写入到持久化存储中。

2-2:写入结束后,DataSource组件会把状态的存储路径信息反馈给JobManager中的checkpoint coordinator。

3-1、3-2、4-1、4-2:接着后面算子中的状态数据:[count(send), 3]、[count(follow) ,1]也会进行同样的步骤

5:等所有的算子都完成了状态数据的持久化存储,也就是说 checkpoint coordinator 收集到了所有任务反馈给他的状态存储路径,这个时候就认为这一次的checkpoint真正执行成功了,最后他会向持久化存储中再备份一个 checkpoint metadata元数据文件,那么本次整个checkpoint流程就完成了。如果中间有任何一个阶段不成功,那么本次checkpoint就宣告失败。

当达到下一次checkpoint执行时机的时候,会继续重复前面的执行流程。

这就是checkpoint的生成过程。

Checkpoint的恢复过程

1
下面我们来分析一下Checkpoint的恢复过程。

image-20230602174549554

1
2
继续接着前面的业务流程,前面我们在消费完第4条数据的时候触发了一次checkpoint。
checkpoint执行结束后,紧接着消费者开始消费第5条数据,当把第5条数据follow消费出来之后,在计算的时候由于资源问题导致出现了故障,此时任务异常结束了。

image-20230602174608348

1
2
任务结束后,Flink尝试重启任务,并恢复数据到之前的状态。
在最开始重启任务的时候,任务中基于内存的状态都是空的。

image-20230602174632536

1
当任务重新启动之后,会根据指定的快照数据进行恢复,此时上一次在快照时保存的偏移量4、[count(send), 3]、[count(follow) ,1]这些数据对应的都恢复到了正确的位置。

image-20230602174650649

1
2
3
恢复成功之后,任务会基于之前的偏移量4继续往后面消费,所以又把[follow,5]这条数据消费出来了。
此时算子中计算的结果,count(follow)就变成了2。
这就是正常的数据处理流程了。

Checkpoint Barrier

1
2
3
4
5
6
7
8
9
在Checkpoint执行过程中,还有一个重要的角色,Barrier。
可以将Barrier认为是一个标记或者说是一条特殊的数据。

Flink任务在每次触发Checkpoint的时候都会由JobManager的Checkpoint Coordinator向Source端插入一个Barrier。

Barrier中包含有一个Checkpoint ID,用于标识它属于哪一个Checkpoint。

Barrier中的Checkpoint ID是严格单调递增的(自增ID)。
看下面这个图

image-20230602174724039

1
2
3
4
5
6
图里面的A\B\C\D这些属于数据流中的正常数据。
其中1和2是checkpoint ID,这是特殊的Barrier标记数据。
每次触发checkpoint的时候,JobManager的Checkpoint Coordinator都会向数据流中插入一个Barrier。

这个Barrier标记在源码中对应的类是CheckpointBarrier
可以大致看一下他的代码:
1
2
3
4
5
6
7
8
9
10
11
12
public class CheckpointBarrier extends RuntimeEvent {

private final long id;
private final long timestamp;
private final CheckpointOptions checkpointOptions;

public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
this.id = id;
this.timestamp = timestamp;
this.checkpointOptions = checkNotNull(checkpointOptions);
}
}
1
2
3
从这可以看出来他里面除了包含一个自增的id字段,还有一个时间戳字段,以及保存checkpoint参数的字段。

Barrier中的Checkpoint ID其实就是之前我们在任务界面中查看Checkpoint相关信息时显示的那个ID。

image-20230602174822221

1
2
3
4
5
6
7
8
9
Checkpoint在执行的时候,Source在接收到Barrier,会把Barrier广播发送到下游算子,当下游算子A接收到了所有输入数据流的Barrier时,意味着算子A已经处理完了截止到当前Checkpoint的数据。

然后算子A就可以执行Checkpoint,并将Barrier广播发送至下游算子。

Barrier的最大作用是用于算子各个子任务之间对齐检查点,Barrier对齐之后才会保存状态数据,最终保持一致性语义。

怎么理解呢?

看下面这个图:

image-20230602174840536

1
2
3
4
5
6
7
8
9
10
11
12
13
Source会把Barrier广播发送到下游算子A。

Source的并行度为2,所以Source的每个子任务都会给下游的算子广播发送Barrier。

算子A在这里也产生了2个子任务,对应的是算子A-1和算子A-2,每个子任务负责处理一部分数据

在这里Source-1这个子任务会给下游的算子A-1和算子A-2分别发送Barrier。

Source-2这个子任务也会给下游的算子A-1和算子A-2分别发送Barrier。

当算子A-1收到Source-1和Source-2这两个子任务发给他的Barrier标记时,他才会真正执行checkpoint,因为这个时候才实现了Barrier对齐,也就意味着这个子任务把本次checkpoint之前需要处理的数据都处理完了,这个时候执行checkpoint操作,才能保证数据的一致性。

算子A-2也是这样的。

Kafka连接器新API的使用


本文标题:大数据开发工程师-第十七周-Flink极速上手篇-Flink新版本1.12以上-3

文章作者:TTYONG

发布时间:2023年06月02日 - 17:06

最后更新:2023年06月02日 - 18:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%B8%83%E5%91%A8-Flink%E6%9E%81%E9%80%9F%E4%B8%8A%E6%89%8B%E7%AF%87-Flink%E6%96%B0%E7%89%88%E6%9C%AC1-12%E4%BB%A5%E4%B8%8A-3.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%