Flink新版本1.12以上-1
Flink新版本新特性介绍
1 | 下面针对Flink最近几个版本的更新的重要新特性进行汇总,帮助大家快速了解一下每个版本的差异。 |
快速上手使用Flink1.15
开发Flink 1.15代码
1 | 首先创建一个新的maven项目:db_flink15 |
依赖
1 | <dependency> |
1 | <dependency> |
1 | <!-- log4j的依赖 --> |
1 | 在项目的resource目录下添加log4j.properties文件,文件内容如下: |
WordCountScala
1 | package com.imooc.scala |
在已有的大数据集群中集成Flink 1.15环境
1 | 1:下载Flink1.15版本的安装包,上传到bigdata04机器的/data/soft目录中。 |
1 | 4:使用Flink中yarn-session的方式验证一下此Flink客户端是否正常 |

1 | 任务的web界面上显示的是Flink1.15版本,说明是没有问题的。 |
向YARN中同时提交Flink 1.11 和1.15版本的代码
1 | 为了便于在YARN中同时提交Flink 1.11和1.15版本的代码,我们可以参考之前在讲spark3.x版本扩展内容时候的思路,对flink1.15版本中需要用到的脚本名称进行修改,增加版本名称后缀,便于分辨和使用。 |
1 | 然后为了便于在任何目录中使用这些脚本,需要在/etc/profile中配置环境变量 |
1 | 现在还需要针对Flink 1.15.0版本配置对应的环境变量,可以直接在PATH中指定即可 |
1 | [root@bigdata04 bin]# source /etc/profile |
1 | 接下来验证这两个版本中yarn-session的使用是否正常 |
1 | 这里面可以看到确实使用的是1.11.1版本。 |

1 | 查看jobmanager中的信息也可以验证是使用的1.11.1版本 |


1 | 停止此任务 |
Flink1.15.0版本的代码
1 | 2:使用flink1.15.0版本启动一个yarn-session |
1 | 3:配置Flink 1.15的historyserver |
1 | 可以选择在一个节点上启动多个Flink版本的historyserver服务,也可以选择把多个historyserver服务放到多个不同的节点上。 |
1 | 注意:在这里针对flink1.15版本使用的historyserver端口是8083,flink1.11.1使用的是8082。 |
1 | 在启动的时候会提示在这个节点上已经成功启动了一个historyserver,不要紧, 通过jps验证一下这个是否启动成功。 |
1 | 还需要启动Hadoop集群的historyserver服务。 |
1 | 4:向YARN集群中正式提交Flink1.15.0版本的代码 |
1 | 修改已有依赖的scope配置,全部设置为provided |
1 | 执行打包操作 |
1 | [root@bigdata04 ~]# nc -l 9001 |

1 | 停止后任务后再去查看任务界面发现也是可以的。 |
Flink1.11.1版本的代码
1 | 5:向YARN集群中正式提交Flink1.11.1版本的代码 |
1 | 启动socket服务 |

1 | 停止后任务后再去查看任务界面发现也是可以的。 |


1 | 到这为止,bigdata04客户端节点中可以同时支持提交flink1.11和1.15版本的代码。 |
Flink 1.15之State(状态)的容错与一致性
什么是State(状态)
1 | State中文翻译为状态。 |
1 | 状态在代码层面的体现其实就是一种存储数据的数据结构。类似于Java中的list、map之类的数据结构。 |
1 | 下面来通过两个图加深一下理解 |

1 | 这个图里面显示了Flink实现流计算去重的业务流程,source组件接入实时数据流,中间在具体的算子中对实时数据进行去重,这里借助于State实现去重,也就是将source传输过来的数据存储到State中,只保留不重复的数据,最后通过Sink组件将需要的结果数据写出去。 |

1 | 这个图里面显示了Flink在对金融数据实现实时累加求和时的业务场景,source组件对接的是kafka中的数据,中间通过算子实现数据累加求和,将聚合后的结果数据存储到State中,最终通过sink组件将聚合后的结果写出去。 |
1 | 注意:如果我们使用的是Java中的map、list之类数据结构来存储offset偏移量和算子的中间结果,checkpoint的时候无法将这些数据持久化到HDFS中,只有State中的数据才可以,这是Flink框架默认实现的机制,并不是所有的数据都可以被checkpoint持久化的,正因为如此,我们才需要使用State。 |
离线计算是否需要State(状态)?
1 | 在离线计算的时候我们没有提到过状态,但是在实时计算中却经常提到状态,这是为什么? |
State相关概念整体概览
1 | State涉及的相关概念比较多,所以在这里我们首先从全局层面分析一下这些概念,这样可以构建一个全局观,便于后面的深入理解。 |

1 | 首先看图里面蓝色的方框,主要包括JobManager和TaskManager。 |
State(状态)的类型
1 | 前面我们对状态有了基本的了解,其实状态对应的就是数据,这些数据会涉及到存储和恢复。 |
Raw State VS Managed State
1 | 这两种类型的状态有什么区别吗? |

1 | 在这里会从3个角度进行分析: |
托管状态(Managed State)的类型
1 | 针对托管状态,从作用域层面进行划分,还可以再细分为两种类型: |

1 | keyed State可以支持ValueState、ListState、ReducingState、AggregatingState、MapState这5种数据结构,这些数据结构也可以称为原语,原语是一个官方名词,不太好理解,我还是喜欢将它称为是数据结构。 |
Keyed State VS Operator State
1 | Keyed State和Operator State除了在支持的数据结构层面有区别,还有一些其他的区别,看这个表格: |

1 | 在这里会从4个角度进行分析: |
Keyed State类型的状态的扩缩容模式
1 | 针对扩缩容模式,下面有几个图,我们来看一下,详细分析一下 |

1 | 图中左边的task1和task2表示是keyBy后面连接的某个算子的2个并行实例,表示这个算子的并行度为2。 |
Operator State类型的状态的扩缩容模式
1 | 接下来我们来看一下Operator State类型的状态的扩缩容模式, |

1 | 如果算子中使用了Operator State类型中的ListState这种状态,那么算子在扩缩容时会对ListState中的数据重新分配。 |
1 | 接下来是针对Operator State中的UnionListState,看这个图: |

1 | UnionListState底层其实就是ListState,唯一的区别就是在扩缩容时状态数据的分配策略不一样。 |
1 | 最后一个是Operator State中的BroadcastState,看这个图: |

1 | BroadcastState在扩缩容时会把状态广播发送给所有的新任务。 |
Keyed State详解
1 | 首先看一下针对Keyed State的解释 |

1 | 图中左边的Source表示是数据源,这个组件的并行度为2,所以Source产生了2个task。 |
1 | 注意了:无论是Keyed State还是Operator State,Flink的状态都是基于本地的,也就是说每个算子子任务维护着这个子任务对应的状态存储,子任务之间的状态不支持互相访问。 |
1 | 在这个图里面,Stateful 1和Stateful 2这两个子任务虽然都属于同一个算子,但是他们是2个独立的子任务,所以这2个子任务之间的状态数据也是不支持互相访问的。 |
Keyed State中支持的数据结构
1 | 接下来我们来具体分析一下Keyed State中支持的常见数据结构,也可以称之为状态的原语。 |

1 | ValueState:存储类型为T的单值状态,T是一个泛型。这个状态与对应的key绑定,是最简单的状态了。它里面可以存储任意类型的值,这个值也可以是一个复杂数据结构。它可以通过update方法更新状态的值,通过value()方法获取状态值。 |
1 | 在这需要注意一点:我们前面分析的这些State对象,只是用于和状态进行交互(例如:更新、删除、清空、查询等操作),真正的状态值,有可能是存储在内存中,也可能是rocksdb对应的本地磁盘中,相当于我们只是持有了状态的句柄。 |
Keyed State的使用案例
1 | 前面我们针对Keyed State的原理有了一定的了解,下面我们来真正实操一下,掌握Keyed State在工作中的实际应用: |
温度告警-ValueState
1 | 首先看第一个案例:温度告警 |
1 | package com.imooc.scala.state |
1 | 在这段代码中,确实没有设置状态存储位置或状态后端。默认情况下,Flink会使用内存作为状态后端,将状态存储在TaskManager的JVM堆上。如果要使用其他状态后端,例如RocksDB,需要在Flink配置文件中进行配置或在代码中显式设置。这段代码可能只是一个简单的示例,用于演示如何使用`ValueState`。 |
1 | [root@bigdata04 ~]# nc -l 9001 |
1 | 一直到这,程序还是打印不出任何数据,说明没有触发温度异常的逻辑。 |
1 | 此时程序触发打印操作了 |
1 | 为了加深大家的理解,我在代码里面增加一些调试代码,打印一些中间数据,便于我们观察分析: |
1 | package com.imooc.scala.state |
1 | 重启开启socket |
1 | flatmap算子的并行度为8,同时会有8个线程处理,通过keyBy对数据流分组之后,相同规则的数据会进入到同一个线程中。 |
1 | 此时说明这条数据被线程ID为84的线程处理了,s2这个key也是第一次过来的,所以初始也为null,后面给他赋值之后是10。 |
1 | 此时显示的线程ID也是91,说明s3这条数据和s1那条数据都是由同一个线程处理的,但是注意此时s3对应的状态默认依然是null,因为s3也是第一次过来,虽然ts3和s1都是由同一个线程处理的,但是状态是和key绑定的,所以s3对应的状态依然是null。 |
直播间数据统计-MapState
1 | 接下来我们来看第二个案例:直播间数据统计 |
1 | package com.imooc.scala.state |
1 | 在Flink程序中,要使用Keyed State,必须先对数据流进行keyBy操作。keyBy操作会将数据流分区,使得具有相同键的数据被发送到同一个算子实例进行处理。 |
1 | 除了`flatMap`和`process`函数之外,还可以使用`reduce`和`aggregate`函数来访问和操作Keyed State。 |
1 | 开启一个新的socket |
订单数据补全-ListState
1 | 接下来我们来看第三个案例:订单数据补全 |
1 | package com.imooc.scala.state |
1 | 开启两个新的socket |
1 | 运行代码。 |
1 | 此时两条数据不是同一个订单的,所以程序没有任何输出 |
1 | 此时可以看到控制台输出一条数据 |
1 | 这样就实现了订单数据流和支付数据流的数据关联,这就是Flink中的双流Join,双流Join还有很多中实现方式,后面我们会有一个单独的章节详细分析双流Join。 |
Keyed State的使用形式总结
1 | 在程序中想要使用Keyed State,大致可以通过下面这几种形式: |

1 | Scala有一种特殊的数据类型,叫做Option。 |
1 | package com.imooc.scala.state |
1 | 分析一下mapWithState方法的底层实现 |
1 | def mapWithState[R: TypeInformation, S: TypeInformation]( |
1 | 以及它里面封装的状态:ValueState |
1 | trait StatefulFunction[I, O, S] extends RichFunction { |
1 | 开启一个新的socket |
1 | 再模拟产生一条数据 |
1 | 从这里可以看出来,整个代码的执行流程是没有问题的。 |
Operator State 详解
1 | 接下来我们来看一下针对Operator State的解释 |

1 | 图中左边的Source表示是数据源,这个组件的并行度为2,会产生2个task。 |

1 | 实际上这里面会用到Operator State中的UnionListState这种状态,其实就是一个基于List列表的状态。 |
1 | 在FlinkKafkaConsumerBase这个类的第201行有这么一行代码: |
1 | 这里用到了ListState,咱们刚才说的他使用的是UnionListState,其实UnionListState的底层就是ListState,唯一的区别就是任务故障恢复时状态数据的传输方式不一样,咱们之前在讲Operator State的扩缩容模式的时候详细分析过。 |
Operator State中支持的数据结构
1 | 接下来我们来具体分析一下 Operator State中支持的常见数据结构: |

1 | 由于Operator State中存储的是算子的同一个子任务中的状态数据,所以提供的都是可以存储多条数据的状态,没有提供ValueState这种状态。 |
1 | ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception; |
1 | 通过这块代码可以看出来,ListState和UnionListState对应的数据结构是一样的。 |
Operator State的使用案例
1 | 前面我们针对Operator State的原理有了一定的了解,下面我们来实操一下,掌握Operator State在工作中的应用: |
ListState的使用
Flink中自带的有状态的Source
1 | 首先我们来分析一个Flink内部已有的Source:FromElementsFunction |
1 | 在FromElementsFunction这个类中有一个ListState:checkpointedState |
1 | checkpointedState这个状态主要在initializeState和snapshotState这两个方法里面用到了。 |
1 | 其中snapshotState对应的就是刚才我们所分析的那个snapshot过程。 |
1 | 接下来看一下initializeState这个方法: |
1 | 注意:在initializeState里面注册状态时,首先会根据指定的状态名称到状态的外部存储中检查一下是否存在一个和当前状态名称相同的状态,如果存在,则尝试对状态进行恢复,如果不存在,则默认初始化状态。 |
自定义一个有状态的Sink
1 | 下面我们通过自定义一个有状态的Sink来感受一下CheckpointedFunction的具体使用: |
1 | package com.imooc.scala.state |
1 | package com.imooc.scala.state |
1 | 开启一个新的Socket |
1 | 此时控制台没有输出数据。 |
1 | 此时控制台依然没有输出数据。 |
1 | 这样就通过自定义Sink实现了有状态的批量输出功能。 |
UnionListState的使用
1 | 针对UnionListState的使用,在Kafka Connector中的FlinkKafkaConsumerBase里面有应用,我们来看一下对应的代码: |
1 | 在这里可以发现这个类其实也实现了CheckpointedFunction接口。 |
1 | public final void snapshotState(FunctionSnapshotContext context) throws Exception { |
1 | 接下来再看一下initializeState方法 |
1 | public final void initializeState(FunctionInitializationContext context) throws Exception { |
1 | 这就是UnionListState的使用。 |
BroadcastState的使用
1 | 针对BroadcastState的使用,一个典型的应用案例就是两个流连接的场景。 |

1 | 这个图里面有两个实时数据流。 |
1 | package com.imooc.scala.state |
1 | 针对配置数据流在实际工作中基本上是来源于Redis或者MySQL,在这里为了演示方便,我们来开发一个自定义的Source模拟从Redis中获取数据。 |
1 | package com.imooc.scala.state |
1 | package com.imooc.scala.state |
1 | 运行程序,结果发现程序报错,查看错误日志发现提示的是scala的问题: |
1 | 经过排查发现这个问题主要是因为我们代码里面用到了mapAsJavaMap这种功能,此时需要引入scala相关的依赖包: |
1 | <dependency> |
1 | 重新运行程序,发现可以正常运行,并且输出数据: |
1 | 这就是BroadcastState的常用应用场景。 |
Operator State的使用形式总结
1 | 在程序中想要使用Operator State,主要通过实现CheckpointedFunction这个接口,然后实现接口中的initializeState和snapshotState函数。 |
1 | context.getKeyedStateStore |