Flink新版本1.12以上-2
State(状态)的容错与一致性
1 | 前面我们学习了State的原理和在代码中的使用。下面我们来深入分析一下State的容错和一致性。 |
1 | 注意:这里的仅一次语义,表示数据对最终结果的影响只有一次,并不是说数据只被处理一次。 |
流计算中Exactly-once语义的多种实现思路
1 | 针对流式计算任务中Exactly-once语义的实现思路其实是有多种的,下面我们来分析一下 |

1 | 第一种思路:借助于At-least-once语义,再加上去重功能来实现。这种实现思路需要我们程序员自己来维护一个去重功能,因为At-least-once语义可以保证数据不丢,但是可能会出现数据重复,这样当任务故障后恢复的时候,对重复的数据进行去重,这样就可以间接实现Exactly-once这种仅一次语义了。 |
1 | 针对这三种实现思路,下面有几张图我们来看一下,加深一下理解: |

1 | 相当于我们在中间的算子中需要借助于外部的存储系统实现一个去重功能,当任务故障后恢复的时候,遇到重复数据在处理的时候可以进行去重,这样可以保证实现仅一次语义的效果。 |
1 | 思路2: |

1 | 此时需要依赖于外部存储系统的幂等特性,咱们前面以redis数据库为例进行了分析,如果是使用hbase数据库呢? |

1 | 此时是借助于状态和checkpoint机制实现,任务运行期间,定期对状态生成快照,任务故障恢复时基于之前的快照数据恢复状态,这样可以实现仅一次语义。 |
如何实现Flink任务的端到端一致性?
1 | Flink可以借助于Checkpoint机制保证Flink系统内部状态的一致性。 |
1 | 那下面我们来具体分析一下这三个阶段如何保证自己的一致性? |
1 | 针对WAL预写日志这种方式,他提供的有一个接口是GenericWriteAheadSink,我们来看一下这个接口的大致实现: |
1 | private transient ListState<PendingCheckpoint> checkpointedState; |
1 | 两阶段提交这种方式会用到TwoPhaseCommitSinkFunction这个接口: |
1 | 我们在使用Flink向kafka中写入数据想要保证数据一致性的时候就会用到这个接口,等后面我们再具体分析这个接口在kafka中的具体实现。 |
Checkpoint(快照)机制详解
1 | Checkpoint机制是Flink中实现State容错和一致性最核心的功能。 |
1 | 下面我们来具体看一下如何在代码中开启Checkpoint,以及Checkpoint相关的一些核心配置。 |
1 | package com.imooc.scala.checkpoint |
1 | 注意:这些配置是针对单个任务有效的,并且需要放到具体的任务中才有意义。目前这个类里面的代码是不完整的,无法正常执行,等后面我们把这些代码拿到对应的任务中去使用。 |
1 | 好的,我来解释一下。在Flink中,Checkpoint是用来保存程序运行状态的一种机制。env.enableCheckpointing(1000*60*2)是用来设置Checkpoint的间隔时间,即每隔多长时间进行一次Checkpoint。cpConfig.setMinPauseBetweenCheckpoints(1000*5)是用来设置两次 Checkpoint之间的最小暂停时间,即如果上一次Checkpoint完成后,距离下一次 Checkpoint的间隔时间小于这个值,则会等待直到满足最小暂停时间后再进行下一次 Checkpoint。 |
保存多个Checkpoint
1 | 默认情况下,如果在任务中开启了Checkpoint,则Flink只会保留最近成功生成的1份Checkpoint数据。 |
1 | [root@bigdata04 conf]# vi flink-conf.yaml |
从Checkpoint进行恢复
手工停止的任务,恢复数据
1 | 前面我们分析了Checkpoint的详细配置,下面我们就结合具体的案例来演示一下Checkpoint是如何对状态数据进行持久化保存的,并且当任务故障后,我们如何基于Checkpoint产生的数据进行恢复 |
1 | package com.imooc.scala.checkpoint |
1 | 修改项目的依赖配置,将所有依赖的scope都设置为provided。 |
1 | 到任务界面查看输出结果信息: |
1 | 此时a出现2次,b出现了1次。 |

1 | 解释: |
1 | [root@bigdata01 ~]# hdfs dfs -ls /flink-chk/wordcount/10fbadc413d0ae5c1b91bb460e969117/ |
1 | 这里会显示最近20次checkpoint生成的数据目录。 |

1 | 注意:如果刚才是由于任务内部故障导致的任务停止,则Flink会基于默认的重启策略自动重启,在自动重启的时候会自动使用最新生成的那一份checkpoint数据进行恢复。 |
1 | 但是刚才是我们手工停止的任务,那么任务在重启的时候想要恢复数据就需要手工指定从哪一份checkpoint数据启动。 |
1 | 选择一份checkpoint数据重启任务 |
1 | 注意:-s后面指定的是某一份checkpoint数据,这个时候任务启动的时候会基于这份checkpoint数据进行恢复,这个路径必须写hdfs的全路径,必须要有hdfs路径的前缀,否则会被识别成linux本地路径。 |

1 | 此时图中显示的Restored为1,说明这个任务是基于之前的checkpoint数据恢复启动的。 |
1 | 此时在socket中模拟产生数据: |
1 | 这样就说明此任务在启动的时候恢复到了之前的状态,因为上一次任务停止之前,a出现了2次,b出现了1次。这次任务重启后,可以累加之前状态中记录的次数。这样就实现了任务故障后的数据恢复,可以保证流计算中数据的准确性,如果没有使用状态和checkpoint,当任务重启后,所有的数据都会归0。 |
任务自动重启时checkpoint数据的自动恢复
1 | 下面我们来演示一下任务在运行期间,任务内部故障导致的任务自动重启时checkpoint数据的自动恢复。 |

1 | 从这个图里面可以看出来,taskmanager进程目前运行在bigdata02上。 |
1 | 其实11815 YarnTaskExecutorRunner 这个就是taskmanager进程了。 |
1 | 确定了是哪个进程之后,使用kill 命令杀掉进程。 |
1 | 这个时候回到任务界面,可以看到界面中已经不显示这个taskmanager了 |

1 | 稍等一会任务会重启: |

1 | 重新开启socket |
1 | 再看任务界面中的checkpoint相关的信息 |

1 | 可以发现此时下面的restore部分有信息了,表示是基于之前的第36份数据进行启动的,此时type就是checkpoint了。 |
1 | 这就是针对checkpoint数据的自动恢复和手动恢复。 |
Savepoint详解
1 | 咱们前面讲到的Checkpoint是为了保证应用在出现故障时可以顺利重启恢复。 |
Checkpoint VS Savepoint
1 | 针对Checkpoint 和 Savepoint的详细区别,在这里我整理了一个表格,我们来看一下 |

1 | 中文翻译:Checkpoint可以翻译为检查点 或者快照。Savepoint可以翻译为保存点。 |
Savepoint保证程序可移植性的前提条件
1 | 咱们刚才说了,Savepoint能够保证程序的可移植性,可以保证在代码升级之后,仍然可以恢复数据。 |
算子唯一标识
1 | 首先是算子唯一标识这个参数: |

1 | 上面是最开始的任务版本,里面有Source、Map和Sink这三个组件,假设这三个组件都是有状态的,我们没有给这些组件手工设置唯一标识,使用的是默认的自动生成的。 |
1 | 在DataSouce、中间的转换算子、DataSink上面都可以设置uid。 |

1 | source和map中维护了状态,所以需要设置uid,其它的就不用设置了,让程序自动生成即可 |
算子最大并行度
1 | 接下来我们来分析一下第二个参数:算子最大并行度 |
1 | 往下面追踪,可以追踪到这里: |
1 | 这是我们手工设置算子最大并行度时使用的方法,这里面会用到KeyGroupRangeAssignment这个类。 |
1 | public static int computeDefaultMaxParallelism(int operatorParallelism) { |
1 | 这块代码我们可以拿出来运行一下,测试一下效果 |
1 | package com.imooc.scala.checkpoint |
1 | 运行代码可以发现,如果Flink任务中算子的并行度比较小的时候,则算子最大并行度默认就是128。 |
算子最大并行度-注意事项
1 | 在工作中还遇到过一个关于算子最大并行度的问题,当时我们在计算某一个业务指标的时候,这个任务中也用到了基于keyed state类型的状态,这个业务指标前期数据量比较小,所以最开始给这个任务设置的全局并行度为20,也没有单独给这个任务中的算子设置最大并行度,根据前面的分析,此时默认生成的算子最大并行度就是128了。 |
手工触发Savepoint
1 | 前面对Savepoint的原理有了一定的理解,下面我们来看一下Savepoint的使用 |
1 | 下面我们分别来演示一下: |
1 | 当任务正常启动之后,在socket中模拟产生数据: |
1 | 手工触发savepoint。 |
1 | 此时到任务界面查看,可以看到这里显示的savepoint信息 |

1 | 手工停止这个任务。 |
从Savepoint进行恢复
正常恢复
1 | 从Savepoint进行恢复时使用的命令和前面我们讲的手动从checkpoint恢复的命令是一样的。 |
1 | 当任务正常启动之后,在socket中模拟产生数据: |
1 | 到任务界面查看输出结果信息: |
异常恢复
1 | 咱们前面讲到了在某些特殊情况下会导致任务无法从Savepoint中恢复。 |
故障情况1
1 | 首先演示一下第一种故障情况: |
1 | package com.imooc.scala.savepoint |
1 | 我们首先基于这份代码生成一份savepoint数据,然后再对这个代码进行修改,增加一个算子,看看还能不能基于之前的数据进行恢复。 |
1 | 然后停止任务。 |
1 | 重新编译打包。 |
1 | 此时发现任务提交上去之后会自动失败。 |

1 | 查看日志中的错误信息。 |
1 | 日志中的意思是说不能把savepoint中的状态数据映射到uid发送了变化的算子中。 |
1 | package com.imooc.scala.savepoint |
1 | 编译打包。 |
1 | 手工触发savepoint。 |
1 | 然后停止任务。 |
1 | 此时发现任务可以正常启动。 |

1 | 在socket中模拟产生数据: |
故障情况2
1 | 下面我们来演示一下第二种故障情况: |
1 | package com.imooc.scala.savepoint |
1 | 这个代码就是一个有状态的单词计数,没有给map这个有状态的算子设置uid。 |
1 | 手工触发savepoint。 |
1 | 然后停止任务。 |
1 | 那也就意味着,改变并行度时,也会影响算子默认生成的uid。 |
1 | 重新编译打包 |
1 | 然后停止任务。 |

1 | 在socket中模拟产生数据 |
State Backend(状态的存储方式)
1 | 咱们前面分析过,状态数据默认是存储在taskmanager节点的jvm堆内存中。 |

1 | 第一种是HashMapStateBackend,这是默认的State Backend,这种存储方式会将状态数据存储在TaskManager节点的JVM堆内存中。 |
HashMapStateBackend
1 | HashMapStateBackend这种存储方式,他的优点是,基于内存读写数据,效率高,不需要涉及序列化和反序列化。 |
EmbeddedRocksDBStateBackend
1 | EmbeddedRocksDBStateBackend这种存储方式,他的优点是解决了内存受限的问题,所以可以适用于超大状态的任务,例如:天级别的窗口聚合操作。 |
State Backend的配置
1 | 在flink的执行环境中通过setStateBackend方法进行设置。 |
1 | 基于WordCountStateWithCheckpointDemo代码复制一份进行修改 |
1 | //设置StateBackend |
1 | 使用的时候和之前使用默认的HashMapStateBackend没什么区别。 |
State的生存时间(TTL)
1 | 默认情况下State数据会一直存在,如果存储了过多状态数据,可能会导致内存溢出(针对HashMapStateBackend)。 |
1 | @Override |
1 | update方法中会调用wrapWithTs(value),看一下wrapWithTs这个方法的实现: |
1 | <V> TtlValue<V> wrapWithTs(V value) { |
1 | 继续往下面查看TtlUtils.wrapWithTs方法 |
1 | 这个类里面封装了具体的数据和对应的时间戳。 |
1 | package com.imooc.scala.ttl |
1 | 这个可以可以直接在idea中运行。 |
Window中的数据存在哪里?
1 | 简单来说是内存中,严格来说其实是存储在状态中 |
1 | 进入new WindowedJavaStream() |
1 | public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) { |
1 | 进入WindowOperatorBuilder这个类: |
1 | 进入WindowOperator |
1 | 查看windowState的介绍: |