大数据开发工程师-第十七周 Flink新版本1.12以上-2


Flink新版本1.12以上-2

State(状态)的容错与一致性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
前面我们学习了State的原理和在代码中的使用。下面我们来深入分析一下State的容错和一致性。

4.1 State的容错与一致性
针对一个Flink流式任务,如何保证这个任务故障后恢复到之前的运行状态?
注意:这里所说的恢复到之前的运行状态是表示将算子中计算的中间结果恢复到任务停止之前的样子。

其实咱们前面也提到过,想要实现状态的这种容错效果,需要借助于checkpoint机制。

因为checkpoint可以将状态数据持久化保存到外部存储系统中,这样任务恢复时,可以基于之前存储到外部的状态数据进行恢复。

针对流式计算任务,在故障后恢复状态数据的时候会涉及到三种语义:

至少一次:At-least-once,这种语义可能会导致数据恢复时重复处理数据。
至多一次:At-most-once,这种语义可能会导致数据恢复时丢失数据。
仅一次:Exactly-once,这种语义可以保证数据只对结果影响一次,可以保证结果的准确性,不会出现重复或者丢失的情况。
1
2
3
注意:这里的仅一次语义,表示数据对最终结果的影响只有一次,并不是说数据只被处理一次。

如果想要实现流式计算任务中数据的一致性,其实就是想要在流式计算任务中实现这种仅一次语义。

流计算中Exactly-once语义的多种实现思路

1
针对流式计算任务中Exactly-once语义的实现思路其实是有多种的,下面我们来分析一下

image-20230602123745704

1
2
3
4
5
6
7
8
9
10
11
12
13
14
第一种思路:借助于At-least-once语义,再加上去重功能来实现。这种实现思路需要我们程序员自己来维护一个去重功能,因为At-least-once语义可以保证数据不丢,但是可能会出现数据重复,这样当任务故障后恢复的时候,对重复的数据进行去重,这样就可以间接实现Exactly-once这种仅一次语义了。

这种实现思路属于中等难度,因为需要我们自己开发一个去重功能,保证数据不重复,对应的这个功能也会有一些性能开销,所以性能开销这方面属于中等。

第二种思路:借助于At-least-once语义,再加上幂等操作,这种实现思路需要依赖于第三方存储系统的特性,也就是说这个第三方存储系统需要支持幂等操作,幂等操作表示一条命令重复执行多次,对最终的结果只有一次影响。
举个例子:针对redis中的set命令,我们执行set a 1 ,这条命令不管执行多少次,a的值始终都是1,那么这个命令我们就可以认为是幂等操作。
针对redis的incr命令,我们执行incr a,此时每执行一次,a的值就会加1,所以多次执行会导致结果发生变化,那么这个命令就不是幂等操作。
这样在使用At-least-once语义的时候,他可能会导致数据重复,借助于幂等操作也是可以实现Exactly-once这种仅一次语义的。

这种实现思路比较简单,对应的性能开销也比较低,不需要我们额外维护什么功能。

第三种思路:借助于状态和checkpoint机制来实现,这种实现思路其实我们前面已经分析过了,他是可以实现仅一次语义的。

这种实现思路也比较简单,对应的性能开销也比较低。
1
2
3
针对这三种实现思路,下面有几张图我们来看一下,加深一下理解:

思路1:

image-20230602124044160

1
相当于我们在中间的算子中需要借助于外部的存储系统实现一个去重功能,当任务故障后恢复的时候,遇到重复数据在处理的时候可以进行去重,这样可以保证实现仅一次语义的效果。
1
思路2:

image-20230602124129333

1
2
3
4
此时需要依赖于外部存储系统的幂等特性,咱们前面以redis数据库为例进行了分析,如果是使用hbase数据库呢?
hbase中rowkey是唯一的,我们只要保证重复数据的rowkey不变,那么执行多次put操作也是可以保证结果数据不变的。

思路3:

image-20230602124152921

1
此时是借助于状态和checkpoint机制实现,任务运行期间,定期对状态生成快照,任务故障恢复时基于之前的快照数据恢复状态,这样可以实现仅一次语义。

如何实现Flink任务的端到端一致性?

1
2
3
4
5
6
7
Flink可以借助于Checkpoint机制保证Flink系统内部状态的一致性。

但是一个Flink流计算任务需要有Source、系统内部(也就是算子),以及Sink这三部分组成。

那这个时候应该如何实现Flink流计算任务整个链条的一致性保证?(也可以称为是端到端的一致性)
端到端的一致性,意味着要保证从Source端到系统内部、再到最终的Sink端整个阶段的一致性。
每一个阶段负责保证自己的一致性,整个端到端的一致性级别就取决于所有阶段中一致性最弱的那个阶段了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
那下面我们来具体分析一下这三个阶段如何保证自己的一致性?

首先是Source端:这个时候外部数据源需要支持任务故障恢复时的数据重放机制,不能说数据取出来之后数据源中就没有了,如果数据取出来之后任务失败了,肯定还是要重新再取一次的,所以说,外部数据源需要支持数据重放机制,在流计算任务中常用的外部数据源是kafka,kafka是可以支持数据重放机制的,我们可以通过控制消费者的消费偏移量实现数据重新消费。

接下来是Flink系统内部,这块主要是依赖于Checkpoint机制实现。

最后是Sink端:此时目的地存储系统需要保证数据恢复时不会重复写入。

针对Sink端功能的具体实现包括两种方式,也就是说如何保证写入目的地存储系统时不会出现重复?
第一种方式是:幂等写入
幂等写入其实我们前面已经分析过了,就是说一个操作,可以重复执行很多次,但是只会导致一次结果更改,也就是说,后面再重复执行就不起作用了。

第二种方式是:事务写入
需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入目的地存储系统中。

针对事务写入这种机制大致有两种实现方式:

第一种实现方式:预写日志(WAL)的方式,这种方式无法100%保证Exactly-once语义,可能会出现数据重复。
怎么理解呢?
预写日志这种方式的通用性比较强,它会使用Operator State来存储数据,在任务发生故障时可以恢复,不会导致数据丢失,几乎适合所有外部系统,但是不能提供100%端到端的仅一次语义。
因为基于预写日志的写入方式在某些极端情况下可能会将数据写入多次。
例如:如果外部系统不支持原子性的写入多条数据,那么在向外部系统写入数据的时候可能就会出现部分数据已经写入,但是此时任务出现了故障,导致剩余一部分数据没有写入的情况。当下次恢复的时候会重写全部数据,这样数据就会出现部分重复。

第二种实现方式:两阶段提交(2PC)的方式,这种方式可以100%保证Exactly-once语义。
如果外部系统自身就支持事务(比如MySQL、Kafka),可以使用两阶段提交的方式,这样可以保证端到端的一致性。
两阶段提交这种方式可以提供端到端的一致性保证,但是它的代价也是非常明显的,就是牺牲了延迟。输出数据不再是实时写入到外部系统,而是分批次地提交。
目前来说,没有完美的故障恢复和仅一次语义保障机制,对于开发者来说,我们需要在不同需求之间权衡取舍。
1
2
3
4
针对WAL预写日志这种方式,他提供的有一个接口是GenericWriteAheadSink,我们来看一下这个接口的大致实现:
这是一个抽象类,目前还没有具体的实现。
他里面用到了ListState,在initializeState方法中可以看到这个ListState属于Operator State这种类型。
这个ListState负责在checkpoint的时候保存目前等待写出去的数据,便于任务故障时的数据恢复。
1
2
3
4
5
6
7
private transient ListState<PendingCheckpoint> checkpointedState;

checkpointedState =
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>(
"pending-checkpoints", new JavaSerializer<>()));
1
2
3
4
5
6
7
8
9
10
两阶段提交这种方式会用到TwoPhaseCommitSinkFunction这个接口:
这个接口里面主要有几个比较重要的方法:
//开启一个新事务
protected abstract TXN beginTransaction() throws Exception;
//预提交-对应的就是第一阶段的提交
protected abstract void preCommit(TXN transaction) throws Exception;
//提交-对应的是第二阶段的提交,此时数据才会真正提交到外部存储系统中,数据才对外可见
protected abstract void commit(TXN transaction);
//回滚事务
protected abstract void abort(TXN transaction);
1
我们在使用Flink向kafka中写入数据想要保证数据一致性的时候就会用到这个接口,等后面我们再具体分析这个接口在kafka中的具体实现。

Checkpoint(快照)机制详解

1
2
3
4
5
6
7
8
9
10
Checkpoint机制是Flink中实现State容错和一致性最核心的功能。
它能够根据配置周期性地基于流计算中的状态数据生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外故障崩溃时,在重新运行程序的时候可以有选择地从这些快照进行恢复,从而修正因为故障带来的数据异常。

默认情况下Checkpoint机制是处于禁用状态的,如果想要开启需要在代码层面进行设置。

Checkpoint支持两种语义级别:Exactly-once和At-least-once,其中Exactly-once是默认的语义级别。

Exactly-once语义对于大多数应用来说是合适的。
At-least-once语义可能用在某些延迟超低的应用程序(始终延迟为几毫秒),相对而言,At-least-once语义不需要保证数据的强一致性,所以数据的传输延迟会比较低。
如果某个需求对数据的准确度要求不是特别高,但是需要计算和传输的数据量比较大,还需要低延迟,那么可以考虑使用At-least-once语义。
1
2
3
4
下面我们来具体看一下如何在代码中开启Checkpoint,以及Checkpoint相关的一些核心配置。
创建package:com.imooc.scala.checkpoint

核心代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.imooc.scala.checkpoint

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* Checkpoint相关配置
* Created by xuwei
*/
object CheckpointCoreConf {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

//开启Checkpoint,并且指定自动执行的间隔时间
env.enableCheckpointing(1000*60*2)//2分钟

//高级选项
//获取Checkpoint的配置对象
val cpConfig = env.getCheckpointConfig
//设置语义模式为EXACTLY_ONCE(默认就是EXACTLY_ONCE)
cpConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//设置两次Checkpoint之间的最小间隔时间(设置为5秒:表示Checkpoint完成后的5秒内不会开始生成新的Checkpoint)
cpConfig.setMinPauseBetweenCheckpoints(1000*5)//5秒
//设置最多允许同时运行几个Checkpoint(默认值为1,也建议使用默认值1,这样可以减少资源占用)
cpConfig.setMaxConcurrentCheckpoints(1)
//设置一次Checkpoint的执行超时时间,达到超时时间后会被取消执行,可以避免Checkpoint执行时间过长
cpConfig.setCheckpointTimeout(1000*60*6)//6分钟
//设置允许的连续Checkpoint失败次数(默认值为0,表示Checkpoint只要执行失败任务也会立刻失败)
//偶尔的Checkpoint失败不应该导致任务执行失败,可能是由于一些特殊情况(网络问题)导致Checkpoint失败
//应该设置一个容错值,如果连续多次Checkpoint失败说明确实是有问题了,此时可以让任务失败
cpConfig.setTolerableCheckpointFailureNumber(3)
//设置在手工停止任务时是否保留之前生成的Checkpoint数据(建议使用RETAIN_ON_CANCELLATION)
//RETAIN_ON_CANCELLATION:在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据
//DELETE_ON_CANCELLATION:只有在任务故障时才会保留,如果手工停止任务会删除之前生成的Checkpoint数据
cpConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置Checkpoint后的状态数据的存储位置
//支持JobManagerCheckpointStorage(默认)和FileSystemCheckpointStorage
//JobManagerCheckpointStorage:表示Checkpoint后的状态数据存储在JobManager节点的JVM堆内存中
//FileSystemCheckpointStorage:表示Checkpoint后的状态数据存储在文件系统中(可以使用分布式文件系统HDFS)
//可以简写为:cpConfig.setCheckpointStorage("hdfs://bigdata01:9000/flink-chk001")
cpConfig.setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://bigdata01:9000/flink-chk001"))

}

}
1
注意:这些配置是针对单个任务有效的,并且需要放到具体的任务中才有意义。目前这个类里面的代码是不完整的,无法正常执行,等后面我们把这些代码拿到对应的任务中去使用。
1
2
3
好的,我来解释一下。在Flink中,Checkpoint是用来保存程序运行状态的一种机制。env.enableCheckpointing(1000*60*2)是用来设置Checkpoint的间隔时间,即每隔多长时间进行一次Checkpoint。cpConfig.setMinPauseBetweenCheckpoints(1000*5)是用来设置两次 Checkpoint之间的最小暂停时间,即如果上一次Checkpoint完成后,距离下一次 Checkpoint的间隔时间小于这个值,则会等待直到满足最小暂停时间后再进行下一次 Checkpoint。

这两个设置是相互独立的,可以根据实际情况进行调整。例如,如果你希望程序在出现故障时能够快速恢复,则可以缩短Checkpoint的间隔时间;如果你希望减少 Checkpoint 对程序性能的影响,则可以增加两次Checkpoint之间的最小暂停时间。

保存多个Checkpoint

1
2
3
4
5
6
7
8
默认情况下,如果在任务中开启了Checkpoint,则Flink只会保留最近成功生成的1份Checkpoint数据。
当Flink程序故障重启时,可以从最近的这份Checkpoint数据来进行恢复。

但是我们希望能够保留多份Checkpoint数据,并能够根据实际需要选择其中一份进行恢复,这样会更加灵活。
例如:我们发现最近2个小时的数据处理有问题,希望将整个状态还原到2小时之前,这样就需要找到2个小时之前的Checkpoint数据进行恢复了。

Flink可以支持保留多份Checkpoint数据,需要在Flink的配置文件flink-conf.yaml中,添加如下配置,指定最多需要保存最近多少份Checkpoint数据
这个配置针对整个客户端有效,只要是在这个客户端上提交的任务,都会使用这个配置。
1
2
3
4
[root@bigdata04 conf]# vi flink-conf.yaml 
...
state.checkpoints.num-retained: 20
...

从Checkpoint进行恢复

手工停止的任务,恢复数据

1
2
3
4
5
前面我们分析了Checkpoint的详细配置,下面我们就结合具体的案例来演示一下Checkpoint是如何对状态数据进行持久化保存的,并且当任务故障后,我们如何基于Checkpoint产生的数据进行恢复

首先我们重新开发一个有状态的单词计数案例,并且在代码中开启checkpoint。

代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.imooc.scala.checkpoint

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* 有状态的单词计数 + Checkpoint
* Created by xuwei
*/
object WordCountStateWithCheckpointDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

//开启Checkpoint
env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次
//获取Checkpoint的配置对象
val cpConfig = env.getCheckpointConfig
//在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据
cpConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置Checkpoint后的状态数据的存储位置
cpConfig.setCheckpointStorage("hdfs://bigdata01:9000/flink-chk/wordcount")

val text = env.socketTextStream("bigdata04", 9001)
import org.apache.flink.api.scala._
val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)

keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
//声明一个ValueState类型的状态变量,存储单词出现的总次数
private var countState: ValueState[Int] = _

/**
* 任务初始化的时候这个方法执行一次
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//注册状态
val valueStateDesc = new ValueStateDescriptor[Int](
"countState",//指定状态名称
classOf[Int]//指定状态中存储的数据类型
)
countState = getRuntimeContext.getState(valueStateDesc)
}

override def map(value: (String, Int)): (String,Int) = {
//从状态中获取这个key之前出现的次数
var lastNum = countState.value()
val currNum = value._2
//如果这个key的数据是第一次过来,则将之前出现的次数初始化为0
if(lastNum == null){
lastNum = 0
}
//汇总出现的次数
val sum = lastNum+currNum
//更新状态
countState.update(sum)
//返回单词及单词出现的总次数
(value._1,sum)
}

}).print()

env.execute("WordCountStateWithCheckpointDemo")
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
修改项目的依赖配置,将所有依赖的scope都设置为provided。

打jar包。

将生成的jar包上传到bigdata04机器中。

在bigdata04机器上开启Socket

向集群中提交此任务。
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -c com.imooc.scala.checkpoint.WordCountStateWithCheckpointDemo -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar

当任务正常启动之后,在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
a b
a
1
2
3
4
到任务界面查看输出结果信息:
(a,1)
(b,1)
(a,2)
1
2
3
此时a出现2次,b出现了1次。

此时可以在任务界面中查看checkpoint的执行情况:

image-20230602151049919

1
2
3
4
5
6
7
8
9
10
解释:

Trriggered:表示截止到目前checkpoint触发的次数。
In Progress:表示目前正在执行的checkpoint数量。
Completed:表示成功执行结束的checkpoint次数。
Failed:表示执行失败的checkpoint次数,这里显示为2的意思是失败了2次,这是因为我们现在设置的checkpoint间隔时间太短了,只有10秒,任务提交上去之后很快就会触发checkpoint,此时Flink任务可能还没有初始化完成,所以会出现一些失败次数,等Flink任务正常运行起来之后就没问题了。实际工作中,我们会把checkpoint的间隔时间设置为分钟级别,一般是2分钟,5分钟之类的,这样就不会出现这种问题了。
Restored:0表示这个任务没有基于之前的checkpoint数据启动,显示为1表示基于之前的checkpoint数据启动。
Path:表示当前任务的checkpoint数据保存目录,注意:同一个任务每次启动生成的checkpoint数据目录都不一样,因为这个路径里面用到了flink的任务id,任务id是每次都会重新生成的。

此时到hdfs中查看一下具体生成的checkpoint目录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[root@bigdata01 ~]# hdfs dfs -ls /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/
Found 22 items
drwxr-xr-x - root supergroup 0 2027-11-14 17:23 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-27
drwxr-xr-x - root supergroup 0 2027-11-14 17:23 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-28
drwxr-xr-x - root supergroup 0 2027-11-14 17:23 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-29
drwxr-xr-x - root supergroup 0 2027-11-14 17:24 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-30
drwxr-xr-x - root supergroup 0 2027-11-14 17:24 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-31
drwxr-xr-x - root supergroup 0 2027-11-14 17:24 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-32
drwxr-xr-x - root supergroup 0 2027-11-14 17:24 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-33
drwxr-xr-x - root supergroup 0 2027-11-14 17:24 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-34
drwxr-xr-x - root supergroup 0 2027-11-14 17:24 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-35
drwxr-xr-x - root supergroup 0 2027-11-14 17:25 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-36
drwxr-xr-x - root supergroup 0 2027-11-14 17:25 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-37
drwxr-xr-x - root supergroup 0 2027-11-14 17:25 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-38
drwxr-xr-x - root supergroup 0 2027-11-14 17:25 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-39
drwxr-xr-x - root supergroup 0 2027-11-14 17:25 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-40
drwxr-xr-x - root supergroup 0 2027-11-14 17:25 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-41
drwxr-xr-x - root supergroup 0 2027-11-14 17:26 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-42
drwxr-xr-x - root supergroup 0 2027-11-14 17:26 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-43
drwxr-xr-x - root supergroup 0 2027-11-14 17:26 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-44
drwxr-xr-x - root supergroup 0 2027-11-14 17:26 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-45
drwxr-xr-x - root supergroup 0 2027-11-14 17:26 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-46
drwxr-xr-x - root supergroup 0 2027-11-14 17:18 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/shared
drwxr-xr-x - root supergroup 0 2027-11-14 17:18 /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/taskowned
1
2
3
这里会显示最近20次checkpoint生成的数据目录。

接下来我们来手工停止任务。

image-20230602151730020

1
注意:如果刚才是由于任务内部故障导致的任务停止,则Flink会基于默认的重启策略自动重启,在自动重启的时候会自动使用最新生成的那一份checkpoint数据进行恢复。
1
2
3
4
5
但是刚才是我们手工停止的任务,那么任务在重启的时候想要恢复数据就需要手工指定从哪一份checkpoint数据启动。
如果没有特殊情况就选择使用最新的那一份checkpoint数据进行恢复即可。

先开启socket
[root@bigdata04 ~]# nc -l 9001
1
2
选择一份checkpoint数据重启任务
[root@bigdata04 flink-1.15.0]#flink1-15 run -m yarn-cluster -s hdfs://bigdata01:9000/flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/chk-54/_metadata -c com.imooc.scala.checkpoint.WordCountStateWithCheckpointDemo -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar
1
2
3
注意:-s后面指定的是某一份checkpoint数据,这个时候任务启动的时候会基于这份checkpoint数据进行恢复,这个路径必须写hdfs的全路径,必须要有hdfs路径的前缀,否则会被识别成linux本地路径。

当任务正常启动之后,查看任务界面中的checkpoint信息:

image-20230602152033829

1
2
3
4
5
6
此时图中显示的Restored为1,说明这个任务是基于之前的checkpoint数据恢复启动的。
checkpoint ID目前为55,说明这个任务会延续之前的Checkpoint ID,继续递增增长。
最下面的Latest Restore:ID为54,这个编号是启动任务时使用的Checkpoint 数据的ID编号。
Restore Time表示恢复的时间。
Type:SavePoint,这是因为我们是手工重启的,会显示为Savepoint,如果是任务故障时自动重启的,这里会显示为checkpoint。具体Savepoint什么含义,后面我们会具体分析。
Path:这个表示启动任务时使用的checkpoint数据路径。
1
2
3
4
5
6
7
8
此时在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
a
b

查看任务日志:
(a,3)
(b,2)
1
2
3
这样就说明此任务在启动的时候恢复到了之前的状态,因为上一次任务停止之前,a出现了2次,b出现了1次。这次任务重启后,可以累加之前状态中记录的次数。这样就实现了任务故障后的数据恢复,可以保证流计算中数据的准确性,如果没有使用状态和checkpoint,当任务重启后,所有的数据都会归0。

当重启后的程序正常运行后,他还会按照Checkpoint的配置进行运行,继续生成Checkpoint数据。

任务自动重启时checkpoint数据的自动恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
下面我们来演示一下任务在运行期间,任务内部故障导致的任务自动重启时checkpoint数据的自动恢复。
重新开启一个新的socket

向集群中提交此任务。
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -c com.imooc.scala.checkpoint.WordCountStateWithCheckpointDemo -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar


当任务正常启动之后,在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
a b
a

到任务界面查看输出结果信息:
(a,1)
(b,1)
(a,2)

此时在任务界面中查看一下taskmanager节点信息

image-20230602152607843

1
2
3
4
5
6
7
8
9
10
11
12
从这个图里面可以看出来,taskmanager进程目前运行在bigdata02上。
我们模拟一个故障场景,由于集群节点异常导致bigdata02节点宕机了,那么运行在这个节点上的taskmanager进程肯定也就没了。

在这里我到bigdata02上使用kill命令直接把taskmanager进程杀掉就可以模拟这个场景了。
首先在bigdata02上执行jps命令查看目前的进程信息

[root@bigdata02 ~]# jps
1680 NodeManager
1570 DataNode
11815 YarnTaskExecutorRunner
11992 Jps
11674 YarnJobClusterEntrypoint
1
2
3
4
5
6
7
8
9
其实11815 YarnTaskExecutorRunner 这个就是taskmanager进程了。

想要进一步确认的话可以使用jps -ml命令:
[root@bigdata02 ~]# jps -ml
1680 org.apache.hadoop.yarn.server.nodemanager.NodeManager
12033 sun.tools.jps.Jps -ml
1570 org.apache.hadoop.hdfs.server.datanode.DataNode
11815 org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=1 -D taskmanager.memory.jvm-overhead.max=201326592b --configDir . -Dblob.server.port=32854 -Djobmanager.rpc.address=bigdata02 -Djobmanager.memory.jvm-overhead.min=201326592b -Dtaskmanager.resource-id=container_1826267805863_0011_01_000002 -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b -Dweb.tmpdir=/tmp/flink-web-95fcafd5-13cb-44f8-adbf-e3e42f09aa15 -Dinternal.taskmanager.resource-id.metadata=bigdata02:38033 -Djobmanager.rpc.port=36024 -Dr
11674 org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=469762048b -D jobmanager.memory.jvm-overhead.max=201326592b
1
2
3
确定了是哪个进程之后,使用kill 命令杀掉进程。
直接强制杀进程
[root@bigdata02 ~]# kill -9 11815
1
这个时候回到任务界面,可以看到界面中已经不显示这个taskmanager了

image-20230602152917231

1
2
稍等一会任务会重启:
此时发现任务中有一些失败的task,这是因为刚才taskmanager进程挂掉之后,socket也停止了,任务重启后,连不上socket了。

image-20230602153006041

1
2
重新开启socket
[root@bigdata04 ~]# nc -l 9001
1
再看任务界面中的checkpoint相关的信息

image-20230602153045307

1
2
3
4
5
6
7
可以发现此时下面的restore部分有信息了,表示是基于之前的第36份数据进行启动的,此时type就是checkpoint了。

这个时候我们在socket中再模拟产生一条数据a

再验证一下输出结果,发现是没有问题的,是基于之前的状态进行累加的。

(a,3)
1
这就是针对checkpoint数据的自动恢复和手动恢复。

Savepoint详解

1
2
3
4
5
6
7
8
9
10
11
12
13
咱们前面讲到的Checkpoint是为了保证应用在出现故障时可以顺利重启恢复。

而Savepoint是为了有计划的备份任务,实现任务升级后可恢复。

任务升级主要包括:增减并行度、调整业务逻辑、以及升级Flink版本时的任务迁移。

Flink通过Savepoint功能可以做到程序升级后,继续从升级前的那个点开始执行计算,保证数据不中断。

Savepoint会生成全局,一致性快照,可以保存数据源offset,operator操作状态等信息,可以从应用在过去任意做了savepoint的时刻开始继续消费。

Savepoint的生成算法和Checkpoint是完全一样的,所以可以把Savepoint认为是包含了一些额外元数据的Checkpoint,所以Savepoint本质上是特殊的Checkpoint。

Savepoint和Checkpoint可以同时执行,互不影响,Flink不会因为正在执行Checkpoint而推迟Savepoint的执行。

Checkpoint VS Savepoint

1
针对Checkpoint 和 Savepoint的详细区别,在这里我整理了一个表格,我们来看一下

image-20230602153615972

1
2
3
4
5
6
7
8
中文翻译:Checkpoint可以翻译为检查点 或者快照。Savepoint可以翻译为保存点。

触发方式:Checkpoint是由JobManager定时触发快照并自动清理,不需要用户干预,当任务故障重启时自动恢复。Savepoint面向用户,完全根据用户的需要触发与清理,恢复的时候也是根据需求进行恢复。

作用:Checkpoint主要是为了实现任务故障恢复的,他的侧重点是容错,当Flink作业意外失败,重启时可以从之前生成的CheckPoint自动恢复运行,不影响作业逻辑的准确性。SavePoint侧重点是维护,当Flink作业需要在人工干预下手动重启、升级、或者迁移时,先将状态整体写入可靠存储,维护完毕之后再从SavePoint恢复,他属于有计划的备份。

特点:Checkpoint属于轻量级的快照,因为Checkpoint的频率往往比较高,所以Checkpoint的存储格式非常轻量级,但作为权衡牺牲了一切可移植的东西,例如:不保证改变并行度和升级的兼容性。Savepoint属于重量级的快照,他会以二进制的形式存储所有状态数据和元数据,执行起来比较慢而且贵,但是能够保证程序的可移植性 ,例如并行度改变或代码升级之后,仍然能正常恢复。
Checkpoint是支持增量快照的(如果状态数据存储在RocksDB里面),对于超大状态的 作业而言可以降低写入成本。Savepoint并不会连续自动触发,所以不支持增量,只支 持全量。

Savepoint保证程序可移植性的前提条件

1
2
3
4
5
6
7
8
咱们刚才说了,Savepoint能够保证程序的可移植性,可以保证在代码升级之后,仍然可以恢复数据。
但是他想要实现这个功能的前提条件是需要保证任务中所有有状态的算子都配置好下面这两个参数:
第一个参数是:算子唯一标识。
第二个参数是:算子最大并行度,这个参数只针对使用了keyed State的算子。

这两个参数会被固化到Savepoint数据中,不可更改,如果新任务中这两个参数发生了变化,就无法从之前生成的Savepoint数据中启动并恢复数据了,只能选择丢弃之前的状态从头开始运行。

下面我们来具体分析一下这两个参数
算子唯一标识
1
2
3
4
5
6
7
8
9
10
11
12
13
14
首先是算子唯一标识这个参数:

默认情况下Flink会给每个算子分配一个唯一标识。
但是这个标识是根据前面算子的标识并且结合一些规则生成的,这也就意味着任何一个前置算子发生改变都会导致该算子的标识发生变化 。

例如:我们添加或者删除一个算子,这样后面算子的唯一标识就变了,就不可控了。

只要任务中的算子唯一标识发生了变化,Savepoint保存的状态数据基本上就无法用来恢复了,因为之前保存的算子标识和现在最新的算子标识不一样了。

咱们前面说过,Savepoint会以二进制的形式存储所有状态数据和元数据,这里的算子唯一标识就属于元数据中的内容。当Flink任务从Savepoint启动时,会利用算子的唯一标识将Savepoint保存的状态映射到新任务对应的算子中,只有当新任务的算子唯一标识和Savepoint数据中保存的算子标识相同时,状态才能顺利恢复。

所以说如果我们没有给有状态的算子手工设置唯一标识,那么在任务升级时就会受到很多限制。

看下面这个图

image-20230602153912620

1
2
3
4
5
6
7
8
9
10
11
12
13
14
上面是最开始的任务版本,里面有Source、Map和Sink这三个组件,假设这三个组件都是有状态的,我们没有给这些组件手工设置唯一标识,使用的是默认的自动生成的。
这里的唯一标识其实就是uid。
假设source自动生成的唯一标识是uid-001,Map自动生成的唯一标识是uid-002,Sink自动生成的唯一标识是uid-003。

当这个任务运行了一段时间之后,我们的业务逻辑发生了变化,所以对代码做了一些修改,增加了一个Flatmap组件,看下面这个图:
此时还是使用的默认生成的唯一标识,那此时source自动生成的唯一标识可能还是uid-001,map自动生成的唯一标识也还是uid-002,但是flatmap自动生成的唯一标识可能是uid-003,最后sink自动生成的唯一标识就可能是uid-004了。

这样新任务中sink的唯一标识和之前任务中sink的唯一标识就不一样了,那么再基于之前任务生成的savepoint数据就会导致无法恢复了。

为了能够在任务的不同版本之间顺利升级,我们需要通过 uid(…) 方法手动的给算子设置uid。
类似这样的:
source.uid(...)
transform.uid(...)
sink.uid(...)
1
2
3
4
5
在DataSouce、中间的转换算子、DataSink上面都可以设置uid。

其实也没必要给所有的组件都设置uid,最重要的是给包含了状态的组件设置uid,没有状态的组件也不会涉及到数据恢复,就没必要设置了。

看这块代码中的设置

1
2
3
4
5
source和map中维护了状态,所以需要设置uid,其它的就不用设置了,让程序自动生成即可

注意:在同一个任务内部,uid不能重复。

此时我们手工指定uid之后,后期就算在任务中新增了一个组件,之前的组件的uid也不会变化了,任务基于之前生成的Savepoint数据启动的时候依然是可以恢复数据的。
算子最大并行度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
接下来我们来分析一下第二个参数:算子最大并行度

Flink中Keyed State类型的状态数据在恢复时,是按照KeyGroup为单位恢复的,每个KeyGroup 中包含一部分key的数据。
咱们前面在讲状态的扩缩容模式的时候也提到过KeyGroup,说的是一回事。
针对Keyed State,状态在扩缩容的时候会以KeyGroup为单位进行重新分配。

KeyGroup的个数等于算子的最大并行度。
注意:算子的最大并行度并不是算子的并行度,

算子的最大并行度是通过setMaxParallelism()方法设置的,
算子的并行度是通过setParallelism()方法设置的。

当我们设置的算子并行度大于算子最大并行度时,任务在重启的时候有些并行度就分配不到KeyGroup了,这样会导致Flink任务无法从Savepoint恢复数据。

注意:此时也是无法从Checkpoint中恢复数据的。

那这个算子最大并行度应该如何设置呢?
可以通过这两种方式,一种全局的,一种局部的

设置全局算子最大并行度:env.setMaxParallelism()
设置某个算子最大并行度:.map(..).setMaxParallelism()
那咱们之前如果没有设置过的话,这些算子的最大并行度默认是多少呢?

先说结果,算子最大并行度默认是128,最大是32768。env.setMaxParallelism()

我们来看一下这块逻辑的代码:
算子的最大并行度可以在算子上独立设置,也可以通过env全局设置。
通过这个方法作为入口查看底层源码:
env.setMaxParallelism()
1
2
3
4
5
6
7
8
9
10
11
12
13
往下面追踪,可以追踪到这里:
public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
Preconditions.checkArgument(
maxParallelism > 0
&& maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
"maxParallelism is out of bounds 0 < maxParallelism <= "
+ KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM
+ ". Found: "
+ maxParallelism);

config.setMaxParallelism(maxParallelism);
return this;
}
1
2
3
4
这是我们手工设置算子最大并行度时使用的方法,这里面会用到KeyGroupRangeAssignment这个类。

在KeyGroupRangeAssignment这个类中有一个方法:computeDefaultMaxParallelism
这个方法会计算默认的算子最大并行度
1
2
3
4
5
6
7
8
9
10
11
public static int computeDefaultMaxParallelism(int operatorParallelism) {

checkParallelismPreconditions(operatorParallelism);

return Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo(
operatorParallelism + (operatorParallelism / 2)),
DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
UPPER_BOUND_MAX_PARALLELISM);
}
1
2
3
这块代码我们可以拿出来运行一下,测试一下效果
创建一个Object:TestDefaultMaxParallelism
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.imooc.scala.checkpoint

import org.apache.flink.util.MathUtils

/**
* 测试默认算子最大并行度
* 结论:Flink生成的算子最大并行度介于128和32768之间
* Created by xuwei
*/
object TestDefaultMaxParallelism {
def main(args: Array[String]): Unit = {
//这个参数表示当前算子(任务)的并行度
val operatorParallelism = 1
//最小值:2的7次方=128
val DEFAULT_LOWER_BOUND_MAX_PARALLELISM:Int = 1 << 7
//最大值:2的15次方=32768
val UPPER_BOUND_MAX_PARALLELISM:Int = 1 << 15

//计算算子最大并行度
val res = Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo(
operatorParallelism + (operatorParallelism / 2)),
DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
UPPER_BOUND_MAX_PARALLELISM)
println(res)
}
}
1
2
3
4
5
运行代码可以发现,如果Flink任务中算子的并行度比较小的时候,则算子最大并行度默认就是128。

如果Flink任务中算子的并行度比较大,则会按照这个公式生成对应的值,这个值不是固定的。

所以说如果我们想要保证Savepoint的可移植性,那么最好是手工设置一个固定的算子最大并行度。
算子最大并行度-注意事项
1
2
3
4
5
6
7
8
9
在工作中还遇到过一个关于算子最大并行度的问题,当时我们在计算某一个业务指标的时候,这个任务中也用到了基于keyed state类型的状态,这个业务指标前期数据量比较小,所以最开始给这个任务设置的全局并行度为20,也没有单独给这个任务中的算子设置最大并行度,根据前面的分析,此时默认生成的算子最大并行度就是128了。
后期随着平台用户规模的增长,这个业务的数据量呈指数级增长,使用之前的并行度来处理增长后的数据规模就有点力不从心了,导致数据出现了积压,我们尝试调整任务的并行度,当把任务的并行度调整为128以上时,发现任务无法从checkpoint和savepoint进行恢复。
这就是所谓的任务并行度调不上去了。
这是因为我们前面分析的,当算子并行度大于算子最大并行度时,任务在重启的时候有些并行度就分配不到KeyGroup了,这样会导致Flink任务无法从Checkpoint或者Savepoint恢复数据。
此时想要提高计算能力,只有一种方法,那就是放弃状态中保存的数据,不从状态中恢复,直接单独启动这个任务,这样是可以提高并行度的。

其实合理的一点方案是这样的:我们在开发基于keyed state类型的有状态的任务的时候,需要提前预估一下这个任务后期可能处理的数据规模会达到哪种级别,提前设置一个合适的算子最大并行度。这样在前期数据量小的时候,我们可以给任务设置一个比较小的并行度,也不浪费资源,后期数据量上来之后,再调整为一个比较大的并行度。

所以针对一个keyed state类型的有状态的Flink任务,它未来能扩展到的最大并行度其实取决于这个任务第一次启动时设置的算子最大并行度。

手工触发Savepoint

1
2
3
4
5
前面对Savepoint的原理有了一定的理解,下面我们来看一下Savepoint的使用
Savepoint需要手工触发,需要使用这种形式的命令:
bin/flink savepoint jobId hdfs://IP:9000/flink/sap -yid yarnAppId

注意:针对flink on yarn模式一定要指定-yid参数。
1
2
3
4
5
6
下面我们分别来演示一下:
开启一个新的socket
nc -l 9001

向集群中提交任务
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -c com.imooc.scala.checkpoint.WordCountStateWithCheckpointDemo -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar
1
2
3
4
5
6
7
8
9
当任务正常启动之后,在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
a b
a

到任务界面查看输出结果信息:
(a,1)
(b,1)
(a,2)
1
2
3
4
5
手工触发savepoint。

注意:需要获取flink任务id和对应的yarn applicationid

[root@bigdata04 flink-1.15.0]# flink1-15 savepoint 09ded8897f6a33eda7bdbb32e42046 hdfs://bigdata01:9000/flink/sap -yid application_1826267805863_0014
1
此时到任务界面查看,可以看到这里显示的savepoint信息

image-20230602155500749

1
手工停止这个任务。

从Savepoint进行恢复

正常恢复

1
2
3
4
5
6
7
8
9
从Savepoint进行恢复时使用的命令和前面我们讲的手动从checkpoint恢复的命令是一样的。
其实这个命令本来就是Savepoint提供的,只不过也是支持基于checkpoint的数据进行恢复。

接下来我们尝试使用之前生成的savepoint数据来重启恢复任务。
开启一个新的socket
[root@bigdata04 ~]# nc -l 9001

向集群中提交任务,需要通过-s参数指定savepoint的数据目录。
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -s hdfs://bigdata01:9000/flink/sap/savepoint-09ded8-e14f515ce7af/_metadata -c com.imooc.scala.checkpoint.WordCountStateWithCheckpointDemo -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar
1
2
3
当任务正常启动之后,在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
a
1
2
3
4
到任务界面查看输出结果信息:
(a,3)

这样就说明任务正常基于savepoint的数据恢复到了之前的状态。

异常恢复

1
2
3
4
咱们前面讲到了在某些特殊情况下会导致任务无法从Savepoint中恢复。
下面来针对两个比较常见的故障场景进行分析:
故障情况1:未手工设置uid,重启时任务中增加了新的算子
故障情况2:未手工设置uid,重启时算子并行度发生了变化
故障情况1
1
2
3
4
首先演示一下第一种故障情况:
创建package:com.imooc.scala.savepoint

原始代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.imooc.scala.savepoint

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* 故障情况1:未手工设置uid,重启时任务中增加了新的算子
* Created by xuwei
*/
object WordCountStateForSavepoint1 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

//开启Checkpoint
env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次
//获取Checkpoint的配置对象
val cpConfig = env.getCheckpointConfig
//在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据
cpConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置Checkpoint后的状态数据的存储位置
cpConfig.setCheckpointStorage("hdfs://bigdata01:9000/flink-chk/wordcount")

val text = env.socketTextStream("bigdata04", 9001)
import org.apache.flink.api.scala._
val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)

keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
//声明一个ValueState类型的状态变量,存储单词出现的总次数
private var countState: ValueState[Int] = _

/**
* 任务初始化的时候这个方法执行一次
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//注册状态
val valueStateDesc = new ValueStateDescriptor[Int](
"countState",//指定状态名称
classOf[Int]//指定状态中存储的数据类型
)
countState = getRuntimeContext.getState(valueStateDesc)
}

override def map(value: (String, Int)): (String,Int) = {
//从状态中获取这个key之前出现的次数
var lastNum = countState.value()
val currNum = value._2
//如果这个key的数据是第一次过来,则将之前出现的次数初始化为0
if(lastNum == null){
lastNum = 0
}
//汇总出现的次数
val sum = lastNum+currNum
//更新状态
countState.update(sum)
//返回单词及单词出现的总次数
(value._1,sum)
}

}).print()

env.execute("WordCountStateForSavepoint1")
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
我们首先基于这份代码生成一份savepoint数据,然后再对这个代码进行修改,增加一个算子,看看还能不能基于之前的数据进行恢复。

编译打包。

上传jar包。

开启一个新的socket

向集群中提交任务
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -c com.imooc.scala.savepoint.WordCountStateForSavepoint1 -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar

当任务正常启动之后,在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
a b
a

到任务界面查看输出结果信息:
(a,1)
(b,1)
(a,2)

手工触发savepoint。
[root@bigdata04 flink-1.15.0]# flink1-15 savepoint d552f822307d4b1ce2e991f4cbb0d065 hdfs://bigdata01:9000/flink/sap -yid application_1826267805863_0016
1
2
3
4
5
6
7
8
9
然后停止任务。

修改代码,在任务中增加一个算子,其他代码不变。
其实增加的这个算子对结果数据没有任何影响。

val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.map(tup=>(tup._1,tup._2))
.keyBy(_._1)
1
2
3
4
5
6
7
8
重新编译打包。

上传jar包

开启一个新的socket

基于之前生成的savepoint数据进行恢复。
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -s hdfs://bigdata01:9000/flink/sap/savepoint-d552f8-ec06489b0100/_metadata -c com.imooc.scala.savepoint.WordCountStateForSavepoint1 -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar
1
此时发现任务提交上去之后会自动失败。

1
2
3
4
5
6
查看日志中的错误信息。

注意:此时至少需要开启Hadoop的historyserver服务。

核心错误日志是这一行:
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://bigdata01:9000/flink/sap/savepoint-d552f8-ec06489b0100. Cannot map checkpoint/savepoint state for operator c27dcf7b54ef6bfd6cff02ca8870b681 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
1
2
3
4
5
6
日志中的意思是说不能把savepoint中的状态数据映射到uid发送了变化的算子中。
如果你想忽略这个问题,可以指定--allowNonRestoredState,这样会忽略掉无法映射的状态数据,强制启动。

想要解决这个问题,咱们之前分析了,需要手工设置算子的uid,至少是要指定有状态的算子的uid。
修改之前的代码,在有状态的map算子后面设置uid,并且删掉之前增加的map算子。
完整代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.imooc.scala.savepoint

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* 故障情况1:未手工设置uid,并且任务中增加了新的算子
* Created by xuwei
*/
object WordCountStateForSavepoint1 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

//开启Checkpoint
env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次
//获取Checkpoint的配置对象
val cpConfig = env.getCheckpointConfig
//在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据
cpConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置Checkpoint后的状态数据的存储位置
cpConfig.setCheckpointStorage("hdfs://bigdata01:9000/flink-chk/wordcount")

val text = env.socketTextStream("bigdata04", 9001)
import org.apache.flink.api.scala._
val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)

keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
//声明一个ValueState类型的状态变量,存储单词出现的总次数
private var countState: ValueState[Int] = _

/**
* 任务初始化的时候这个方法执行一次
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//注册状态
val valueStateDesc = new ValueStateDescriptor[Int](
"countState",//指定状态名称
classOf[Int]//指定状态中存储的数据类型
)
countState = getRuntimeContext.getState(valueStateDesc)
}

override def map(value: (String, Int)): (String,Int) = {
//从状态中获取这个key之前出现的次数
var lastNum = countState.value()
val currNum = value._2
//如果这个key的数据是第一次过来,则将之前出现的次数初始化为0
if(lastNum == null){
lastNum = 0
}
//汇总出现的次数
val sum = lastNum+currNum
//更新状态
countState.update(sum)
//返回单词及单词出现的总次数
(value._1,sum)
}

}).uid("vs_map001").print()

env.execute("WordCountStateForSavepoint1")
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
编译打包。

上传jar包。

开启一个新的socket

向集群中提交任务
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -c com.imooc.scala.savepoint.WordCountStateForSavepoint1 -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar

当任务正常启动之后,在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
a b
a
到任务界面查看输出结果信息:
(a,1)
(b,1)
(a,2)
1
2
手工触发savepoint。
[root@bigdata04 flink-1.15.0]# flink1-15 savepoint ebb9f0efa183e6b3d57f497a4da86ec2 hdfs://bigdata01:9000/flink/sap -yid application_1826267805863_0020
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
然后停止任务。

再修改代码,增加一个map算子
val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.map(tup=>(tup._1,tup._2))
.keyBy(_._1)


重新编译打包。

上传jar包。

开启一个新的socket

基于之前生成的savepoint数据进行恢复。
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -s hdfs://bigdata01:9000/flink/sap/savepoint-ebb9f0-ae46290b2f2c/_metadata -c com.imooc.scala.savepoint.WordCountStateForSavepoint1 -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar
1
2
此时发现任务可以正常启动。
查看任务界面中的信息,可以看到任务是基于之前的savepoint数据进行恢复的。

image-20230602161106105

1
2
3
4
5
在socket中模拟产生数据:

到任务界面查看输出结果信息:

看到这个结果就说明数据是正常恢复了。
故障情况2
1
2
3
下面我们来演示一下第二种故障情况:

代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.imooc.scala.savepoint

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* 未手工设置uid,重启时算子并行度发生了变化
* Created by xuwei
*/
object WordCountStateForSavepoint2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)


//开启Checkpoint
env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次
//获取Checkpoint的配置对象
val cpConfig = env.getCheckpointConfig
//在任务故障和手工停止任务时都会保留之前生成的Checkpoint数据
cpConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置Checkpoint后的状态数据的存储位置
cpConfig.setCheckpointStorage("hdfs://bigdata01:9000/flink-chk/wordcount")

val text = env.socketTextStream("bigdata04", 9001)
import org.apache.flink.api.scala._
val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)

keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
//声明一个ValueState类型的状态变量,存储单词出现的总次数
private var countState: ValueState[Int] = _

/**
* 任务初始化的时候这个方法执行一次
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//注册状态
val valueStateDesc = new ValueStateDescriptor[Int](
"countState",//指定状态名称
classOf[Int]//指定状态中存储的数据类型
)
countState = getRuntimeContext.getState(valueStateDesc)
}

override def map(value: (String, Int)): (String,Int) = {
//从状态中获取这个key之前出现的次数
var lastNum = countState.value()
val currNum = value._2
//如果这个key的数据是第一次过来,则将之前出现的次数初始化为0
if(lastNum == null){
lastNum = 0
}
//汇总出现的次数
val sum = lastNum+currNum
//更新状态
countState.update(sum)
//返回单词及单词出现的总次数
(value._1,sum)
}

}).print()

env.execute("WordCountStateForSavepoint2")
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
这个代码就是一个有状态的单词计数,没有给map这个有状态的算子设置uid。
我一会先基于默认并行度1启动任务,并且生成savepoint,然后在基于savepoint恢复的时候调整并行度,看看会出现什么情况

编译打包。

上传jar包。

开启一个新的socket

向集群中提交任务
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -c com.imooc.scala.savepoint.WordCountStateForSavepoint2 -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar

当任务正常启动之后,在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
a b
a

到任务界面查看输出结果信息:
(a,1)
(b,1)
(a,2)
1
2
手工触发savepoint。
[root@bigdata04 flink-1.15.0]# flink1-15 savepoint 1a871d083497ae7ab775c168a5f75f6d hdfs://bigdata01:9000/flink/sap -yid application_1826267805863_0023
1
2
3
4
5
6
7
8
9
10
11
12
然后停止任务。

再开启一个新的socket

基于之前生成的savepoint数据进行恢复。
通过-p 指定全局并行度为2,
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -p 2 -s hdfs://bigdata01:9000/flink/sap/savepoint-1a871d-b2fce8500a75/_metadata -c com.imooc.scala.savepoint.WordCountStateForSavepoint2 -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar

结果发现任务执行失败
查看任务报错日志,发现和故障情况1里面的错误信息一样
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException:
Failed to rollback to checkpoint/savepoint hdfs://bigdata01:9000/flink/sap/savepoint-1a871d-b2fce8500a75. Cannot map checkpoint/savepoint state for operator c27dcf7b54ef6bfd6cff02ca8870b681 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
1
2
3
4
5
6
7
那也就意味着,改变并行度时,也会影响算子默认生成的uid。
所以想要支持在恢复状态的时候修改并行度,需要给有状态的算子手工设置uid。

修改代码,给这个有状态的map算子手工设置uid。
keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
......
}).uid("vs_map001").print()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
重新编译打包

上传jar包。

开启一个新的socket

向集群中提交任务
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -c com.imooc.scala.savepoint.WordCountStateForSavepoint2 -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar

当任务正常启动之后,在socket中模拟产生数据:

到任务界面查看输出结果信息:

手工触发savepoint。
[root@bigdata04 flink-1.15.0]# flink1-15 savepoint 5e63967dbad8dbc32be5dcd77edcae88 hdfs://bigdata01:9000/flink/sap -yid application_1826267805863_0027
1
2
3
4
5
6
7
8
9
10
然后停止任务。

再开启一个新的socket

基于之前生成的savepoint数据进行恢复。
通过-p指定全局并行度为2,
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -p 2 -s hdfs://bigdata01:9000/flink/sap/savepoint-5e6396-b23c7c08f013/_metadata -c com.imooc.scala.savepoint.WordCountStateForSavepoint2 -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar

此时任务可以正常启动并恢复数据。
查看任务界面,可以看到并行度变成2了。

image-20230602161722870

1
2
3
4
5
在socket中模拟产生数据
a
查看输出结果
2> (a,3)
能看到这个输出,说明任务成功恢复执行了。

State Backend(状态的存储方式)

1
2
3
4
5
6
7
8
咱们前面分析过,状态数据默认是存储在taskmanager节点的jvm堆内存中。
当然还有一种选择是存储在本地的rocksdb数据库中。

具体状态的存储位置,是由state backend来控制的:

目前Flink提供了两种State Backend:

看下面这个图:

image-20230602162512730

1
2
3
4
第一种是HashMapStateBackend,这是默认的State Backend,这种存储方式会将状态数据存储在TaskManager节点的JVM堆内存中。

第二种是EmbeddedRocksDBStateBackend,这种存储方式是借助于内嵌的RocksDB数据库。
Rocksdb这个数据库中的数据会存储在对应TaskManager节点的本地磁盘文件中,此时状态数据会作为本地磁盘上的序列化字节存在。

HashMapStateBackend

1
2
3
4
5
HashMapStateBackend这种存储方式,他的优点是,基于内存读写数据,效率高,不需要涉及序列化和反序列化。

但是它也有缺点,基于内存操作的话,内存大小是有限的,所以不适合存储超大状态的数据,适合用于常规任务,就是状态数据量中等的任务,例如:分钟级别的窗口聚合操作。

Flink在执行checkpoint的时候,每次都会从HashMapStateBackend中获取全量数据进行持久化。

EmbeddedRocksDBStateBackend

1
2
3
4
5
6
7
EmbeddedRocksDBStateBackend这种存储方式,他的优点是解决了内存受限的问题,所以可以适用于超大状态的任务,例如:天级别的窗口聚合操作。

但是它也有缺点,它在存储状态数据的时候使用的是内嵌的Rocksdb数据库,Rocksdb数据库的数据是存储在磁盘文件中的,所以需要操作磁盘,那么它的效率就不如内存高了,并且基于磁盘的读写操作还需要涉及到数据的序列化和反序列,也会影响性能,所以这种方式适合用于对状态读写性能要求不是特别高的任务。

针对EmbeddedRocksDBStateBackend这种存储方式,它可以支持增量checkpoint,这样可以进一步提高Checkpoint性能。当然了,他也可以支持全量checkpoint。

所以在工作中,针对常规的任务还是建议使用HashMapStateBackend,如果某个任务需要在状态里面维护大量的数据,可以考虑使用EmbeddedRocksDBStateBackend。

State Backend的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
在flink的执行环境中通过setStateBackend方法进行设置。
可以给这个方法传递new HashMapStateBackend()或者new EmbeddedRocksDBStateBackend(true)。

注意:new EmbeddedRocksDBStateBackend(true)中的true表示开启增量checkpoint。如果不传参数,则是全量checkpoint。

在代码中的具体使用是这样的:
创建package:com.imooc.scala.statebackend

想要使用rockdb数据库,需要引入对应的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.15.0</version>
<!--<scope>provided</scope>-->
</dependency>
1
2
3
4
5
基于WordCountStateWithCheckpointDemo代码复制一份进行修改

Object:WordCountStateWithStateBackendDemo

核心代码是这些:
1
2
3
4
5
6
7
8
9
10
11
//设置StateBackend
//默认使用HashMapStateBackend
//此时State数据保存在TaskManager节点的JVM堆内存中
//env.setStateBackend(new HashMapStateBackend())

//此时State数据保存在TaskManager节点内置的RocksDB数据库中(TaskManager节点的本地磁盘文件中)
//在EmbeddedRocksDBStateBackend的构造函数中指定参数true会开启增量Checkpoint【建议设置为true】
env.setStateBackend(new EmbeddedRocksDBStateBackend(true))

//注意:具体Checkpoint后的状态数据存储在哪里是由setCheckpointStorage控制的
env.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://bigdata01:9000/flink-chk001"))
1
使用的时候和之前使用默认的HashMapStateBackend没什么区别。

State的生存时间(TTL)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
默认情况下State数据会一直存在,如果存储了过多状态数据,可能会导致内存溢出(针对HashMapStateBackend)。

因此从Flink 1.6版本开始引入了State TTL特性。类似于Redis 中的TTL机制,超时自动删除。

TTL特性可以支持对KeyedState中过期状态数据的自动清理,当状态中的某条数据到了过期时间,这条数据会被Flink自动删除,这样就有效解决了状态数据在无干预情况下无限增长导致内存溢出的问题。

例如:我们在实时统计一段时间内的数据指标的时候,需要在状态中做去重,但是过了这段时间之后,之前的状态数据就没有用了,这样就可以用状态的ttl机制实现自动清理,当然我们也可以通过代码逻辑清空状态中的历史数据。

针对OperatorState类型而言,基本上不需要自动清理,以我们之前开发的OperatorState_MyBufferSinkDemo代码为例。
在MyBufferSink中,平时处理的数据是存储在任务内部的本地缓存中。
只有在触发checkpoint的时候,才会把本地缓存中的数据写入到状态中,当下次触发checkpoint的时候我们会在代码中将之前的状态数据清空。所以状态中存储的数据不会一直增长,这样就没必要设置了状态的TTL了。

数据过期判断依据:上次修改的时间戳 + 我们设置的状态TTL > 当前时间戳
如果满足这个条件,那么这条状态数据就过期了。

本质上来讲,状态的TTL功能其实就是给每个Keyed State增加了一个“时间戳”,而 Flink 在状态创建、写入或读取的时候可以更新这个时间戳,并且判断状态是否过期。如果状态过期,还会根据可见性参数,来决定是否返回已过期但还未清理的状态。
状态的清理并不是即时的,而是使用了一种 Lazy 的算法来实现,从而减少状态清理对性能的影响。

举个例子,我要将一个 String 类型的数据存储到ValueState类型的状态中:
如果没有设置状态的TTL ,则直接将 String 类型的数据存储到ValueState中。
如果设置了状态的TTL,则Flink会将<String, Long>这种结构的数据存储到ValueState中,其中Long为时间戳,用于判断状态是否过期。

在代码层面进行分析的话,是这样的。
如果没有设置状态的TTL,我们存储 String 类型的数据使用的是ValueState。
当我们设置了状态的TTL,那么就需要使用ValueState对应的TtlValueState。

来看一下TtlValueState的源码:
他里面有一个update方法:
1
2
3
4
5
@Override
public void update(T value) throws IOException {
accessCallback.run();
original.update(wrapWithTs(value));
}
1
2
update方法中会调用wrapWithTs(value),看一下wrapWithTs这个方法的实现:
其实在这里可以发现,TtlUtils.wrapWithTs方法接收了两个参数,一个是value,另外一个是当前时间戳。
1
2
3
<V> TtlValue<V> wrapWithTs(V value) {
return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
继续往下面查看TtlUtils.wrapWithTs方法
static <V> TtlValue<V> wrapWithTs(V value, long ts) {
return new TtlValue<>(value, ts);
}

最终发现它返回了一个TtlValue数据类型,来看一下这个类的实现:
public class TtlValue<T> implements Serializable {
private static final long serialVersionUID = 5221129704201125020L;

@Nullable private final T userValue;
private final long lastAccessTimestamp;

public TtlValue(@Nullable T userValue, long lastAccessTimestamp) {
checkArgument(!(userValue instanceof TtlValue));
this.userValue = userValue;
this.lastAccessTimestamp = lastAccessTimestamp;
}

@Nullable
public T getUserValue() {
return userValue;
}

public long getLastAccessTimestamp() {
return lastAccessTimestamp;
}
}
1
2
3
4
5
6
7
8
9
10
这个类里面封装了具体的数据和对应的时间戳。
这个类里面的lastAccessTimestamp就表示上次访问的时间戳,这样就可以基于这个时间戳判断这个数据是否过期了。

这个是TtlValueState的实现,对应的还有TtlMapState、TtlListState等。

下面我们来具体演示一下TTL的使用:

创建package:com.imooc.scala.ttl

完整代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package com.imooc.scala.ttl

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.StateTtlConfig.TtlTimeCharacteristic
import org.apache.flink.api.common.state.{StateTtlConfig, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* State TTL的使用
* Created by xuwei
*/
object StateTTLDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

//开启Checkpoint
env.enableCheckpointing(1000*10)//为了观察方便,在这里设置为10秒执行一次

val text = env.socketTextStream("bigdata04", 9001)
import org.apache.flink.api.scala._
val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)

keyedStream.map(new RichMapFunction[(String,Int),(String,Int)] {
//声明一个ValueState类型的状态变量,存储单词出现的总次数
private var countState: ValueState[Int] = _

/**
* 任务初始化的时候这个方法执行一次
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//设置TTL机制相关配置
val ttlConfig = StateTtlConfig
//1:指定状态的生存时间
.newBuilder(Time.seconds(10))
//2:指定什么时候触发更新延长状态的TTL时间
//OnCreateAndWrite:仅在创建和写入时触发TTL时间更新延长。
//OnReadAndWrite:表示在读取的时候也会触发(包括创建和写入)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
//3:过期数据是否可访问
//NeverReturnExpired:表示永远不会返回过期数据(可能会存在数据已过期但是还没有被清理)
//ReturnExpiredIfNotCleanedUp:表示数据只要没有被删除,就算过期了也可以被访问
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
//4:TTL的时间语义
//判断数据是否过期时使用的时间语义,默认使用处理时间,目前只支持这一种
.setTtlTimeCharacteristic(TtlTimeCharacteristic.ProcessingTime)
//5:过期数据删除策略
//cleanupFullSnapshot:全量删除
//此时只有当任务从checkpoint或者savepoint恢复时才会删除所有过期数据
//这种方式其实并不能真正解决使用HashMapStateBackend时的内存压力问题,只有定时重启恢复才可以解决
//注意:这种方式不适合Rocksdb中的增量Checkpoint方式
//.cleanupFullSnapshot()
//cleanupIncrementally:针对内存的增量删除方式
//增量删除策略只支持基于内存的HashMapStateBackend,不支持EmbeddedRocksDBStateBackend
//它的实现思路是在所有数据上维护一个全局迭代器。当遇到某些事件(例如状态访问)时会触发增量删除
//cleanupSize=100 和 runCleanupForEveryRecord=true 表示每访问一个状态数据就会向前迭代遍历100条数据并删除其中过期的数据
.cleanupIncrementally(100,true)
//针对Rocksdb的增量删除方式
//当Rocksdb在做Compact(合并)的时候删除过期数据
//每Compact(合并)1000个Entry之后,会从Flink中查询当前时间戳,用于判断这些数据是否过期
//.cleanupInRocksdbCompactFilter(1000)
.build
//注册状态
val valueStateDesc = new ValueStateDescriptor[Int](
"countState",//指定状态名称
classOf[Int]//指定状态中存储的数据类型
)
//开启TTL机制
//注意:开启TTL机制会增加状态的存储空间,因为在存储状态的时候还需要将状态的上次修改时间一起存储
valueStateDesc.enableTimeToLive(ttlConfig)
countState = getRuntimeContext.getState(valueStateDesc)
}

override def map(value: (String, Int)): (String,Int) = {
//从状态中获取这个key之前出现的次数
var lastNum = countState.value()
val currNum = value._2
//如果这个key的数据是第一次过来,则将之前出现的次数初始化为0
if(lastNum == null){
lastNum = 0
}
//汇总出现的次数
val sum = lastNum+currNum
//更新状态
countState.update(sum)
//返回单词及单词出现的总次数
(value._1,sum)
}

}).print()

env.execute("StateTTLDemo")
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
这个可以可以直接在idea中运行。
先开启socket:

启动代码。

在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
a b
可以看到任务输出结果:
2> (b,1)
6> (a,1)
由于我们设置了状态的TTL时间是10秒,所以10秒之后,再模拟产生一条数据 a
[root@bigdata04 ~]# nc -l 9001
a
看一下输出的数据效果,发现此时a的次数还是1,说明之前在状态中存储的那个a已经过期了。
6> (a,1)
立刻再模拟产生一条数据 a
a
此时发现输出的a的次数就是2了,在数据过期前是可以使用的,并且使用之后,数据的过期时间会对应的往后顺延,因为我们重新向状态中写入了a的数据,那么a对应的时间戳就会被修改为当前最新时间。
6> (a,2)
这就是状态的TTL。

其实在Flink 的DataStream API 中,状态的TTL功能的应用场景还是比较少的。状态的TTL功能在Flink SQL中是被大规模应用的,除了窗口类操作和ETL之类的任务之外,其余的Flink SQL任务基本都需要用到状态的TTL机制。

注意:
Flink任务中设置了TTL 和不设置TTL 的状态是不兼容的,大家在使用的时候时一定要注意。避免出现任务从 Checkpoint /Savepoint无法恢复的情况。但是我们是可以去修改TTL的时间的,因为修改时长并不会改变状态的存储结构。
如果试图使用设置了TTL的状态恢复先前没有设置TTL时生成的状态,将导致任务恢复失败。

Window中的数据存在哪里?

1
2
3
4
5
6
7
8
9
10
简单来说是内存中,严格来说其实是存储在状态中

核心内容是在WindowOperator这个类里面。

下面我们来从源码层面来具体分析一下。
随便找一个类,在keyBy后面调用window(..)方法。
@PublicEvolving
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W] = {
new WindowedStream(new WindowedJavaStream[T, K, W](javaStream, assigner))
}
1
2
进入new WindowedJavaStream()
这里面会创建WindowOperatorBuilder。
1
2
3
4
5
6
7
8
9
10
11
12
13
public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {

this.input = input;

this.builder =
new WindowOperatorBuilder<>(
windowAssigner,
windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),
input.getExecutionConfig(),
input.getType(),
input.getKeySelector(),
input.getKeyType());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
进入WindowOperatorBuilder这个类:
在这个类里面有一个buildWindowOperator方法,他会创建WindowOperator,
private <ACC, R> WindowOperator<K, T, ACC, R, W> buildWindowOperator(
StateDescriptor<? extends AppendingState<T, ACC>, ?> stateDesc,
InternalWindowFunction<ACC, R, K, W> function) {

return new WindowOperator<>(
windowAssigner,
windowAssigner.getWindowSerializer(config),
keySelector,
keyType.createSerializer(config),
stateDesc,
function,
trigger,
allowedLateness,
lateDataOutputTag);
}
1
2
3
4
5
进入WindowOperator
这个类中有一个核心方法是processElement,处理窗口中的数据
在这个方法中可以看到里面用到了windowState
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
查看windowState的介绍:
这是一个聚合类型的状态。
从注释上可以发现,windowState负责存储窗口内的状态,每个窗口一个命名空间。
/** The state in which the window contents is stored. Each window is a namespace */
private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;
从这里可以看出来,窗口中的数据其实是在状态中维护的。

当窗口触发执行完成之后,窗口中维护的状态数据是会被删除的。
他里面有一个clearAllState方法,当窗口执行结束后这个方法会被触发
他里面会调用windowState.clear(); 清空状态中的数据。
private void clearAllState(
W window, AppendingState<IN, ACC> windowState, MergingWindowSet<W> mergingWindows)
throws Exception {
windowState.clear();
triggerContext.clear();
processContext.window = window;
processContext.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
mergingWindows.persist();
}
}

所以窗口中的状态不会一直增加,除非你的窗口非常大,例如:天级别的窗口,这种情况的话最好是使用rocksdb存储状态,避免内存溢出。

本文标题:大数据开发工程师-第十七周 Flink新版本1.12以上-2

文章作者:TTYONG

发布时间:2023年04月20日 - 16:04

最后更新:2023年06月02日 - 16:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%B8%83%E5%91%A8-Flink%E6%9E%81%E9%80%9F%E4%B8%8A%E6%89%8B%E7%AF%87-Flink%E6%96%B0%E7%89%88%E6%9C%AC1.12%E4%BB%A5%E4%B8%8A-2.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%