大数据开发工程师-Flink新版本1.12以上-1


Flink新版本1.12以上-1

Flink新版本新特性介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
下面针对Flink最近几个版本的更新的重要新特性进行汇总,帮助大家快速了解一下每个版本的差异。
1:Flink 1.12版本重要新特性

在DataStreamAPI 中支持批处理(批流一体化)。
DataSetAPI被标记为过时。
针对离线计算需求,建议使用Table API和SQL,或者使用DataStreamAPI 中的批处理模式。
增加新的DataSinkAPI。
扩展了KafkaSQL Connector,可以支持Upsert模式。

2:Flink 1.13版本重要新特性

正式弃用旧的Flink Planner。
相关SQL日期函数的修正,解决了时区的问题。
例如:增加了 timestamp_ltz类型。
正式弃用 Mesos 支持。
提升Hive SQL的兼容性。
增强DataStream API 和 Table API 的交互。
Flink SQL Client 的改进。

3:Flink 1.14版本重要新特性

移除旧的Flink Planner,将Blink Planner作为默认实现。
批流一体化功能的完善。
Checkpoint机制的改进。
性能优化与效率提升。
Table API & SQL的功能完善与升级。

4:Flink 1.15版本重要新特性

批流一体化功能的进一步完善。
针对KafkaConnector模块提供了KafkaSource和KafkaSink。
后续会移除FlinkKafkaConsumer和FlinkKafkaProducer。
Flink SQL中添加了对JSON相关函数的支持。
对 Flink 的运维操作进行了简化。

快速上手使用Flink1.15

1
2
3
4
5
6
7
首先创建一个新的maven项目:db_flink15
修改基础环境:
引入scala2.12,选择本地安装的java1.8、修改编译级别为8
在src/main中添加scala目录
在src/main/scala中创建package:com.imooc.scala

引入flink相关依赖,注释掉依赖中的<scope>provided</scope>

依赖

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.15.0</version>
<!-- <scope>provided</scope> -->
</dependency>
1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.15.0</version>
<!-- <scope>provided</scope> -->
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- log4j的依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
<!-- <scope>provided</scope> -->
</dependency>
1
2
3
4
5
6
7
在项目的resource目录下添加log4j.properties文件,文件内容如下:
log4j.rootLogger=warn,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

WordCountScala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.imooc.scala

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
* 单词计数案例
* 注意:基于最新Flink批流一体化的API开发
* Created by xuwei
*/
object WordCountScala {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//指定处理模式,默认支持流处理模式,也支持批处理模式。
/**
* STREAMING:流处理模式,默认。
* BATCH:批处理模式。
* AUTOMATIC:让系统根据数据源是否有界来自动判断是使用STREAMING还是BATCH。
*
* 建议在客户端中使用flink run提交任务的时候通过-Dexecution.runtime-mode=BATCH指定
*/
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

//指定DataSource
val text = env.socketTextStream("bigdata04", 9001)//当处理模式指定为AUTOMATIC时,会按照流处理模式执行
//val text = env.readTextFile("D:\\data\\hello.txt")//当处理模式指定为AUTOMATIC时,会按照批处理模式执行

//指定具体的业务逻辑
import org.apache.flink.api.scala._
val wordCount = text.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
//针对时间窗口,目前官方建议使用window,主要是为了让用户指定窗口触发是使用处理时间 or 事件时间
.window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
.sum(1)

//指定DataSink
wordCount.print().setParallelism(1)

//执行程序
env.execute("WordCountScala")
}
}
1
2
3
4
5
6
7
8
1:下载Flink1.15版本的安装包,上传到bigdata04机器的/data/soft目录中。
2:解压
3:在·/etc/profile·中配置·HADOOP_CLASSPATH·环境变量
如果之前在安装·Flink1.11·版本环境的时候配置过了,则忽略此步骤。
[root@bigdata04 soft]# vi /etc/profile
...
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
...
1
2
3
4
4:使用Flink中yarn-session的方式验证一下此Flink客户端是否正常
[root@bigdata04 flink-1.15.0]# bin/yarn-session.sh -jm 1024m -tm 1024m -d

提交之后到YARN上验证一下

image-20230531150430359

1
2
任务的web界面上显示的是Flink1.15版本,说明是没有问题的。
5:最后停止这个yarn-session
1
2
3
4
5
6
为了便于在YARN中同时提交Flink 1.11和1.15版本的代码,我们可以参考之前在讲spark3.x版本扩展内容时候的思路,对flink1.15版本中需要用到的脚本名称进行修改,增加版本名称后缀,便于分辨和使用。
提交代码时常用的是yarn-session.sh和flink这两个脚本,修改这两个脚本的名称

[root@bigdata04 ~]# cd /data/soft/flink-1.15.0/bin
[root@bigdata04 bin]# mv yarn-session.sh yarn-session-1-15.sh
[root@bigdata04 bin]# mv flink flink1-15
1
2
3
4
5
6
7
8
9
10
然后为了便于在任何目录中使用这些脚本,需要在/etc/profile中配置环境变量

注意:针对之前的Flink 1.11.1版本对应的环境变量是这样的
[root@bigdata04 bin]# vi /etc/profile
...
export FLINK_HOME=/data/soft/flink-1.11.1
export PATH=.:$JAVA_HOME/bin:$HADOOP_HOME/bin:$FLINK_HOME/bin:/data/so
ft/flink-1.15.0/bin:$SPARK_HOME/bin:$HIVE_HOME/bin:/data/soft/spark-3.
2.1-bin-hadoop3.2/bin:$PATH
...
1
2
3
4
5
6
7
现在还需要针对Flink 1.15.0版本配置对应的环境变量,可以直接在PATH中指定即可
[root@bigdata04 bin]# vi /etc/profile
...
export PATH=.:$JAVA_HOME/bin:$HADOOP_HOME/bin:$FLINK_HOME/bin:/data/so
ft/flink-1.15.0/bin:$SPARK_HOME/bin:$HIVE_HOME/bin:/data/soft/spark-3.
2.1-bin-hadoop3.2/bin:$PATH
...
1
[root@bigdata04 bin]# source /etc/profile
1
2
3
4
5
6
7
8
接下来验证这两个版本中yarn-session的使用是否正常
1:使用flink1.11.1版本启动一个yarn-session
[root@bigdata04 ~]# yarn-session.sh -jm 1024m -tm 1024m -d

在启动窗口可以看到如下日志
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/soft/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/soft/hadoop-3.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
1
2
3
这里面可以看到确实使用的是1.11.1版本。

启动之后查看对应的任务页面,发现显示的是1.11.1版本

image-20230531151053162

1
查看jobmanager中的信息也可以验证是使用的1.11.1版本

image-20230531151114768

image-20230531151148156

1
停止此任务

Flink1.15.0版本的代码

1
2
3
4
5
6
2:使用flink1.15.0版本启动一个yarn-session

在启动窗口可以看到如下日志
(按照上述步骤验证)

停止此任务
1
2
3
4
5
6
3:配置Flink 1.15的historyserver
目前任务停止后无法查看任务界面,需要配置并开启flink的historyserver
之前针对flink 1.11.1版本我们已经配置过historyserver了,在这里先把他的historyserver服务启动起来。
[root@bigdata04 flink-1.11.1]# bin/historyserver.sh start

注意:针对Flink的每个版本都需要启动一个对应的historyserver服务,如果一个客户端节点上部署了多个flink的版本,则需要启动多个historyserver服务。
1
2
3
4
5
6
7
8
9
10
11
12
可以选择在一个节点上启动多个Flink版本的historyserver服务,也可以选择把多个historyserver服务放到多个不同的节点上。
如果是在一个节点上启动多个historyserver服务,则需要修改historyserver服务的端口,每个historyserver服务使用一个独立的端口。

在这里需要修改flink 1.15版本的相关配置
修改flink 1.15中conf目录下的flink-conf.yaml配置文件,内容如下:
[root@bigdata04 conf]# vi flink-conf.yaml
...
jobmanager.archive.fs.dir: hdfs://bigdata01:9000/completed-jobs-1-15/
historyserver.web.address: bigdata04
historyserver.web.port: 8083
historyserver.archive.fs.dir: hdfs://bigdata01:9000/completed-jobs-1-15/
historyserver.archive.fs.refresh-interval: 10000
1
2
3
4
5
6
7
8
注意:在这里针对flink1.15版本使用的historyserver端口是8083,flink1.11.1使用的是8082。

还有就是flink1.15版本使用的hdfs路径也改了一下,使用的是completed-jobs-1-15,不要和flink1.11.1版本使用相同的路径,否则后期会出现错乱。

再启动flink1.15中的historyserver服务
[root@bigdata04 flink-1.15.0]# bin/historyserver.sh start
[INFO] 1 instance(s) of historyserver are already running on bigdata04.
Starting historyserver daemon on host bigdata04.
1
2
3
4
5
6
7
8
9
10
在启动的时候会提示在这个节点上已经成功启动了一个historyserver,不要紧, 通过jps验证一下这个是否启动成功。
[root@bigdata04 flink-1.15.0]# jps
2289 HistoryServer
3650 HistoryServer

发现有两个HistoryServer,具体哪个是Flink1.15启动的,可以使用jps -ml查看
[root@bigdata04 flink-1.15.0]# jps -ml
2289 org.apache.flink.runtime.webmonitor.history.HistoryServer --configDir /data/soft/flink-1.11.1/conf
3650 org.apache.flink.runtime.webmonitor.history.HistoryServer --configDir /data/soft/flink-1.15.0/conf
4067 sun.tools.jps.Jps -ml
1
2
3
4
5
6
7
8
9
还需要启动Hadoop集群的historyserver服务。
在bigdata01上执行。
[root@bigdata01 hadoop-3.2.0]# bin/mapred --daemon start historyserver

在bigdata02上执行。
[root@bigdata02 hadoop-3.2.0]# bin/mapred --daemon start historyserver

在bigdata03上执行。
[root@bigdata03 hadoop-3.2.0]# bin/mapred --daemon start historyserver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
4:向YARN集群中正式提交Flink1.15.0版本的代码
基于前面刚开发的代码进行打包,打包之前需要在pom.xml中先添加编译和打包的配置

<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
修改已有依赖的scope配置,全部设置为provided
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
</dependency>
<!-- log4j的依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
<scope>provided</scope>
</dependency>
1
2
3
4
5
6
7
执行打包操作

启动socket服务
[root@bigdata04 ~]# nc -l 9001

将jar包上传到flink1.15目录下,然后使用flink run命令提交任务
[root@bigdata04 flink-1.15.0]# flink1-15 run -m yarn-cluster -c com.imooc.scala.WordCountScala -yjm 1024 -ytm 1024 db_flink15-1.0-SNAPSHOT-jar-with-dependencies.jar
1
2
3
[root@bigdata04 ~]# nc -l 9001
a b
a

image-20230531152500101

1
2
3
停止后任务后再去查看任务界面发现也是可以的。

注意:任务停止后需要稍微等一会才可以在这里刷新出来任务列表。

Flink1.11.1版本的代码

1
2
3
4
5
6
5:向YARN集群中正式提交Flink1.11.1版本的代码

接下来验证一下Flink1.11.1版本的代码是否正常。
基于之前开发的代码重新打包,上传到bigdata04中的flink1.11.1目录中

注意:需要将里面的依赖的scope暂时都设置为provided。
1
2
3
4
5
6
启动socket服务

提交任务
[root@bigdata04 flink-1.11.1]# flink run -m yarn-cluster -c com.imooc.scala.stream.SocketWindowWordCountScala -yjm 1024 -ytm 1024 db_flink-1.0-SNAPSHOT-jar-with-dependencies.jar

模拟生成数据

image-20230531152920001

1
2
3
停止后任务后再去查看任务界面发现也是可以的。

注意:任务停止后需要稍微等一会才可以在这里刷新出来任务列表。

image-20230531153010268

image-20230531153022755

1
到这为止,bigdata04客户端节点中可以同时支持提交flink1.11和1.15版本的代码。

什么是State(状态)

1
2
3
4
5
6
7
8
9
10
State中文翻译为状态。

在Flink流计算中的某些业务场景下,状态是非常重要的。
那下面我们首先来分析一下到底什么是状态。

比较官方一点的定义是这样的:当前流计算任务执行过程中需要用到之前的数据,那么之前的数据就可以称之为状态。

所以说针对流计算任务中的状态其实可以理解为历史流数据。

举个生活中的例子:我们每个人的大脑中存储的昨天、前天的信息也可以认为是状态。
1
2
3
4
5
6
7
8
9
状态在代码层面的体现其实就是一种存储数据的数据结构。类似于Java中的list、map之类的数据结构。

它的出现主要是为了解决流(实时)计算中的两大问题:
第一个问题:解决流计算中需要使用历史流数据的问题
例如一些流计算的去重场景,当然也可以借助于外部存储系统来实现去重。
状态的引入可以实现不依赖外部存储系统来存储中间数据,最终实现去重操作。

第二个问题:解决流计算中数据一致性的问题(单纯使用状态是解决不了的,需要结合Checkpoint机制一起实现)
例如金融数据的流计算场景,对结果的准确度要求比较高,需要保证任务故障重启后结果数据依然是准确的,不能出现重复或者丢失数据的情况。
1
2
3
下面来通过两个图加深一下理解

第一个图

image-20230531153628847

1
2
3
4
5
6
7
8
9
10
11
这个图里面显示了Flink实现流计算去重的业务流程,source组件接入实时数据流,中间在具体的算子中对实时数据进行去重,这里借助于State实现去重,也就是将source传输过来的数据存储到State中,只保留不重复的数据,最后通过Sink组件将需要的结果数据写出去。

刚才我们说了,这里其实也可以通过外部存储系统来实现去重
例如:我们可以通过redis实现数据去重的效果,这样其实也是可以的,但是有点大材小用了,引入外部存储系统会增加Flink任务的复杂度,使用State这个轻量级解决方案是比较合适的。

大家可能还有一个疑问,针对这里实现的去重功能,我在Flink中直接new一个基于内存的set集合来存储历史接收到的数据,是不是也可以实现数据去重的效果?
是可以的,这样也可以实现数据去重的效果,但是如果任务发生了异常,重启之后基于内存的set集合中的数据就没了,这样会导致任务重启后数据无法恢复到之前的样子。

state中存储的数据默认也是存放在内存中的,不过state借助于checkpoint机制可以将内存中的数据持久化到HDFS中,这样可以实现任务重启后State数据的恢复。

接下来来看一下第二个图

image-20230531153924351

1
2
3
4
5
6
7
8
9
10
11
12
13
这个图里面显示了Flink在对金融数据实现实时累加求和时的业务场景,source组件对接的是kafka中的数据,中间通过算子实现数据累加求和,将聚合后的结果数据存储到State中,最终通过sink组件将聚合后的结果写出去。
为了保证数据累加求和计算任务失败重启后数据的准确性,除了需要在State中维护每次累加求和的中间结果,还需要维护消费者对应的offset偏移量信息。

在这个图里面
当Source组件消费到第1条数据1的时候,此时offset偏移量会等于1,后面的算子接收到第1条数据1的时候会进行累加求和,产生的结果还是1。此时Source组件会在State中写入offset=1,同时算子也会在State中写入累加后的结果1。假设这个时候触发了checkpoint操作,那么就会把State中存储的数据持久化到HDFS中,便于后期使用。

当Source组件消费到第2条数据3的时候,此时offset偏移量会等于2,后面的算子接收到第2条数据3的时候会和上次的结果累加求和,所以产生的结果是1+3=4。此时Source组件会在State中写入offset=2,同时算子也会在State中写入累加后的结果4。假设这个时候也触发了checkpoint操作,也会把State中存储的数据持久化到HDFS中。

当Source组件消费到第3条数据5的时候,此时offset偏移量会等于3,后面的算子接收到第3条数据5的时候会和上次的结果累加求和,所以产生的结果是4+5=9。此时Source组件会在State中写入offset=3,同时算子也会在State中写入累加后的结果9。假设这个时候也触发了checkpoint操作,也会把State中存储的数据持久化到HDFS中。

后面再来新数据的话也是按照这个流程去执行。

如果Flink任务将第4条数据消费出来了,并且发送到后面的算子中进行计算,但是在计算的时候却由于网络异常出现了问题,导致任务异常停止,当网络恢复正常后,我们重启任务,此时可以基于上一次checkpoint持久化的数据恢复任务中的State。这样就可以将消费者的offset重置为3,算子中累加的结果重置为9,接下来继续正常消费第4条数据进行计算,这样就可以保证数据的准确性了。
1
注意:如果我们使用的是Java中的map、list之类数据结构来存储offset偏移量和算子的中间结果,checkpoint的时候无法将这些数据持久化到HDFS中,只有State中的数据才可以,这是Flink框架默认实现的机制,并不是所有的数据都可以被checkpoint持久化的,正因为如此,我们才需要使用State。

离线计算是否需要State(状态)?

1
2
3
4
5
6
7
8
9
10
在离线计算的时候我们没有提到过状态,但是在实时计算中却经常提到状态,这是为什么?

状态主要是为了保存历史数据,并且保证结果数据的一致性。

针对离线计算,所有历史数据都已经到了,不需要单独保存,如果在离线计算中任务失败了,重跑一遍程序就可以了,基本上没有影响。所以可以认为离线计算是不需要维护状态的。

但是针对实时计算,数据是源源不断产生的,实时计算任务是7x24小时执行的,如果实时计算任务失败了,想要恢复数据,总不能从头再重新计算一遍把,这样肯定是不现实的。就算数据源是Kafka,支持重跑数据,但是这个任务可能已经运行了很多天了,重新把之前的数据再计算一遍也需要消耗很长时间,这样在数据时效层面上来说也是不合理的,因为这个数据恢复时间太长了。
如果数据源是基于socket的,任务失败的时候还会导致数据丢失。

所以说实时计算任务失败之后可能会导致源数据、以及中间结果数据丢失,这个时候想要在一定时效内保证数据的准确性就需要借助于状态实现了。

State相关概念整体概览

1
State涉及的相关概念比较多,所以在这里我们首先从全局层面分析一下这些概念,这样可以构建一个全局观,便于后面的深入理解。

image-20230531155313232

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
首先看图里面蓝色的方框,主要包括JobManager和TaskManager。
JobManager对应的是Flink集群的主节点,TaskManager对应的是Flink集群的从节点。

假设我们向Flink集群中提交了一个流计算任务,这个任务中包含了Source、Map、和Sink这三块。
其中Source是负责从Kafka中消费数据的,为了保证任务重启后的数据准确性,Source组件会把消费到的Offset偏移量数据保存到State中。
Map算子会把计算的中间结果也保存到State中,Sink也会把某些数据保存到State中,这样可以便于后期的数据恢复,保证数据的准确性。

默认情况下,Source、Map、和Sink写入到State中的数据会存储在TaskManager节点中的JVM堆内存中。
当然官方也可以支持将这些State数据存储到Rocksdb这个内嵌数据库中,Rocksdb数据库中的数据会存储在对应节点的本地磁盘文件中。

当满足一定时机的情况下,Flink任务会触发checkpoint操作,当执行checkpoint操作的时候,会将默认存储在TaskManager节点JVM堆内存中的State数据保存到另外一个地方,这个地方可以是JobManager的JVM堆内存中,也可以是分布式文件系统中(例如HDFS)。

初始的State数据的存储位置是由State Backend这个功能控制的。
checkpoint时State数据的存储位置是由checkpoint storage这个功能控制的。

这就是涉及到状态的Flink任务的整体执行流程以及checkpoint时的流程,在这大家先有一个初步的了解,这里面的细节内容后面我们会进一步详细分析。

State(状态)的类型

1
2
3
4
5
6
前面我们对状态有了基本的了解,其实状态对应的就是数据,这些数据会涉及到存储和恢复。
任务正常运行的时候会向状态中写入数据,在任务重启时会涉及到状态的恢复。

Flink从管理层面对这些状态进行了划分,大致划分出两种类型的状态:
一种是Raw State,中文翻译为原生状态
另外一种是Managed State,中文翻译为托管状态。

Raw State VS Managed State

1
2
这两种类型的状态有什么区别吗?
下面我们来详细分析一下,看这个表格:

image-20230531155439184

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
在这里会从3个角度进行分析:
1:从管理方式这个角度上进行分析
原生状态是需要开发者自己管理的,需要自己实现数据的序列化。

而托管状态是由Flink Runtime进行托管管理的,Flink已经实现好了存储和恢复这些功能,不需要我们自己实现序列化。
从这里可以看出来,原生状态需要自己实现相关的功能,使用起来比较复杂。而托管状态已经被封装好了,使用起来比较方便。

2:从数据结构这个角度上进行分析
原生状态只支持字节数组类型,而托管状态则支持多种数据结构,例如:MapState、ListState、ValueState等。
从这里可以看出来原生状态支持的是最基础的数据类型,比较灵活。而托管状态主要支持常见的数据类型。不过托管状态中支持的那些常见的数据类型在工作中其实已经足够使用了。

3:从使用场景这个角度上进行分析
原生状态主要在自定义Operator时使用,Operator是DataSource、Transform、以及DataSink这三块的统称。而托管状态在所有数据流场景中都是可以使用的,也就是说托管状态的应用场景是包含了原生状态的应用场景。

这就是原生状态和托管状态的区别。
在实际工作中,基本上只会用到托管状态,而不会用到原生状态。

所以后面我们主要分析托管状态的使用。

托管状态(Managed State)的类型

1
2
3
4
针对托管状态,从作用域层面进行划分,还可以再细分为两种类型:
一种是keyed State,还有一种是Operator State。

这两种类型的State中支持的数据结构是不一样的,来看一下这个表格

image-20230531155737405

1
2
3
4
keyed State可以支持ValueState、ListState、ReducingState、AggregatingState、MapState这5种数据结构,这些数据结构也可以称为原语,原语是一个官方名词,不太好理解,我还是喜欢将它称为是数据结构。
从名字上可以看出来,ListState其实就相当于是一个List列表了,MapState相当于是一个Map集合,具体这些数据类型的使用后面我们再具体分析,在这里先有个基本认识就可以了。

Operator State可以支持ListState、UnionListState和BroadcastState这3种数据结构。
Keyed State VS Operator State
1
Keyed State和Operator State除了在支持的数据结构层面有区别,还有一些其他的区别,看这个表格:

image-20230531155849233

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
在这里会从4个角度进行分析:
1:从State使用场景这个角度上进行分析
针对Keyed State,它只能应用在基于KeyedStream的数据流中,在Flink中,普通的数据流是DataStream,在DataStream后面调用keyBy算子之后,返回的就是KeyedStream数据流,那也就是说Keyed State只能应用在做过keyBy之后的数据流里面。

而Operator State可以应用在所有数据流中,包括keyedStream。

2:从State分配方式这个角度上进行分析
针对Keyed State,因为是基于key进行分组的数据流,相同key的数据会进入到同一个子任务中被处理,此时每个相同的Key共享一个State实例。

针对Operator State,需要兼容所有类型的数据流,所以此时算子的同一个子任务共享一个State实例,和key无关。

3:从State创建方式这个角度上进行分析
针对Keyed State,需要借助于getRuntimeContext这个对象来创建。
针对Operator State,需要借助于context这个对象来创建。

4:从State扩缩容模式这个角度上进行分析
什么是扩缩容模式?
通俗一点来说,其实就是在任务故障后恢复的时候,算子的并行度发生了变化,可能增加了并行度,或者减少了并行度。
针对无状态的算子,扩缩容很容易,没有什么影响。
但是针对有状态的算子,并行度发生改变之后,状态在恢复的时候会涉及到重新分组,需要将状态数据分配到和之前数量不相等的算子任务中。

针对Keyed State,它会以KeyGroup为单位重新分配状态数据,KeyGroup其实就是包含了多个key的一个分组。
针对Operator State,它会均匀分配状态数据,或者是广播分配,具体要看你使用的是哪种数据类型了。
Keyed State类型的状态的扩缩容模式
1
2
3
针对扩缩容模式,下面有几个图,我们来看一下,详细分析一下

首先是针对Keyed State类型的状态的扩缩容模式

image-20230531155938701

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
图中左边的task1和task2表示是keyBy后面连接的某个算子的2个并行实例,表示这个算子的并行度为2。
中间的 task1 、task2和task3表示任务重启后,将keyBy后面那个算子的并行度改为了3。
右边的task1表示是任务又一次重启后,将keyBy后面那个算子的并行度又改为了1。
这个图里面显示的是Flink任务中的同一个算子,在任务重启后,算子并行度发生变化时状态的分配情况。

基于Keyed State类型状态的算子在扩缩容时会根据新的算子并行度数量对状态重新分配,不过为了降低状态数据在不同任务之间的迁移成本,Flink对这些状态做了分组,会按照所有状态的key进行分组,划分成多个keyGroup,每个keyGroup内部包含一部分key的状态,以keyGroup为单位重新分配状态数据。

在这个图里面最开始算子的并行度为2,会产生2个task,task1和task2。其中task1里面的k1和k3这两个key的状态被划分到了一个keyGroup,k5和k7这两个key的状态被划分到了一个keyGroup,k9和k11这两个key的状态被划分到了一个keyGroup。
Task2里面也是类似这样的效果,也划分出了一些keyGroup。

此时当这个任务异常结束,重新启动的时候,我们对这个算子的并行度进行了调整,改成了3,相当于增加了并行度。此时这个算子在运行时会产生3个task,分别是这里面的task1 、task2和task3。
这个时候在对状态数据进行恢复的时候就需要将之前2个task产生的状态数据恢复到最新的3个task中,如图所示,这些状态数据会按照keyGroup为单位分配到这3个task中。

当任务重启运行了一段时间之后,由于某些异常情况导致任务又停止了,重启再启动的时候,我们将算子的并行度改为了1,相当于减少了并行度。此时这个算子在运行时只会产生1个task。
这个时候在对状态数据进行恢复的时候就需要将之前3个task产生的状态数据恢复到这1个task中,如图所示,这些状态数据会按照keyGroup为单位分配到这1个task中。

这就是Keyed State类型的状态在任务重启并行度发生了变化时状态的分配方式。
Operator State类型的状态的扩缩容模式
1
2
3
4
接下来我们来看一下Operator State类型的状态的扩缩容模式,
在这里需要针对Operator State中的不同数据结构进行单独分析:

首先是Operator State中的ListState,看这个图:

image-20230531160009335

1
2
3
4
5
6
如果算子中使用了Operator State类型中的ListState这种状态,那么算子在扩缩容时会对ListState中的数据重新分配。
大致流程是这样的:这个算子的所有并行运行的task中的ListState数据会被统一收集起来,然后均匀分配给更多的task或者更少的task。

最开始这些ListState中的状态数据在2个task中维护。
后面任务重启,并行度变成3以后,会将之前的状态数据分配给这3个task。
当后面任务重启,并行度变成1以后,会将之前的状态数据都分配这1个task。
1
接下来是针对Operator State中的UnionListState,看这个图:

1
2
3
4
5
6
UnionListState底层其实就是ListState,唯一的区别就是在扩缩容时状态数据的分配策略不一样。
UnionListState会在扩缩容时把里面的所有状态数据全部广播发送给新任务。
最开始这些状态数据在2个task中维护。
后面任务重启,并行度变成3以后,之前task1和task2维护的状态数据都会分配给这3个task。
当后面任务重启,并行度变成1以后,前面3个task都会将自己维护的数据分配给这1个task。
所以针对UnionListState这种方式,任务重启恢复状态数据的时候,每个子任务都会收到所有的数据,但是这个子任务可以根据一定的策略选择操作部分状态数据。
1
最后一个是Operator State中的BroadcastState,看这个图:

image-20230531160052830

1
2
3
4
5
BroadcastState在扩缩容时会把状态广播发送给所有的新任务。
那这种方式和UnionListState有什么区别?
针对UnionListState这种方式,假设算子A的并行度为2,那么会产生2个task,这2个task中维护的状态数据是不一样的,当任务重启之后,如果并行度发生了变化,那么算子A的每个子任务都可以接收到之前2个task中维护的状态数据。

针对BroadcastState这种方式,假设算子A的并行度为2,那么这2个task中的数据是完全一样的,当任务重启之后,如果并行度增加了,只需要基于某一个task中的状态数据复制到新的task中即可。如果任务重启后并行度减少了,只需要简单的去掉多余的task即可。
Keyed State详解
1
2
3
4
5
6
7
首先看一下针对Keyed State的解释
Keyed State是基于KeyedStream上的状态,在普通数据流后面调用keyBy之后可以获取到一个KeyedStream数据流。
此时状态是和特定的key绑定的。

针对KeyedStream上的每个Key,Flink都会维护一个状态实例。

看一下下面这个图,加深一下理解

image-20230531163721767

1
2
3
4
5
6
7
图中左边的Source表示是数据源,这个组件的并行度为2,所以Source产生了2个task。
右边的Stateful表示是有状态的算子,这个算子的并行度也是2,所以也产生了2个task。

假如数据源按照ID这个列作为Key进行了keyBy分组,形成了一个KeyedStream数据流,其中ID的值为A\B\C\D这种英文字母。
此时这个数据流中所有ID为A的key共享一个状态,可以访问和更新这个状态,以此类推,每个Key对应一个自己的状态。

在这个图里面Stateful 1这个task实例维护了A\B\Y这些key的状态数据。Stateful 2这个task实例维护了D\E\Z这些key的状态数据。
1
注意了:无论是Keyed State还是Operator State,Flink的状态都是基于本地的,也就是说每个算子子任务维护着这个子任务对应的状态存储,子任务之间的状态不支持互相访问。
1
在这个图里面,Stateful 1和Stateful 2这两个子任务虽然都属于同一个算子,但是他们是2个独立的子任务,所以这2个子任务之间的状态数据也是不支持互相访问的。
Keyed State中支持的数据结构
1
接下来我们来具体分析一下Keyed State中支持的常见数据结构,也可以称之为状态的原语。

image-20230531164250345

1
2
3
4
5
6
7
8
9
ValueState:存储类型为T的单值状态,T是一个泛型。这个状态与对应的key绑定,是最简单的状态了。它里面可以存储任意类型的值,这个值也可以是一个复杂数据结构。它可以通过update方法更新状态的值,通过value()方法获取状态值。

ListState:表示是一个列表状态,列表里面存储多个类型为T的元素。可以通过add方法往列表中添加数据;也可以通过get()方法返回一个Iterable列表来遍历状态数据。

ReducingState:存储一个聚合后类型为T的单值状态,这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值,有点类似于ValueState,都是存储单个数值的。

AggregatingState<IN , OUT>:存储一个聚合后类型为OUT的单值状态,它和ReducingState的区别是ReducingState在聚合时接收的数据类型和最终产生的聚合结果数据类型是一致的。但是AggregatingState在聚合时接收的数据类型和最终产生的聚合结果数据类型可以不一样,AggregatingState里面使用了两个泛型,IN代表聚合时传入的数据类型,OUT表示最终产生的结果数据类型。

MapState<UK, UV>:可以存储key-value类型的多个元素,key和value可以是任何类型。用户可以通过put或putAll方法添加元素。
1
2
3
4
5
6
7
在这需要注意一点:我们前面分析的这些State对象,只是用于和状态进行交互(例如:更新、删除、清空、查询等操作),真正的状态值,有可能是存储在内存中,也可能是rocksdb对应的本地磁盘中,相当于我们只是持有了状态的句柄。

类似于我们在Java中new了一个对象a1,这个a1只是一个引用,真正的对象是存储在JVM堆内存中的,a1只是持有了这个对象在堆内存中的内存地址值。

在实际工作中,其实最常用的State就是keyed State,keyed State中最常用的其实主要是ValueState、ListState和MapState这3个。

其它State很少用,包括后面我们要讲到的Operator State。
Keyed State的使用案例
1
前面我们针对Keyed State的原理有了一定的了解,下面我们来真正实操一下,掌握Keyed State在工作中的实际应用:
温度告警-ValueState
1
2
3
4
5
6
7
8
首先看第一个案例:温度告警
大致需求是这样的,某机房内的多个设备会实时上报温度信息,在Flink任务内部需要对设备最近两次的温度进行对比,如果温差超过了20度,则需要发送告警信息,说明设备出问题了。

在这里我们可以把设备的唯一标识ID字段作为keyby分组的key,这样的话在Flink内部就只需要维护设备的温度即可,温度值是一个数字,数字属于一个普通的单值,所以可以考虑使用ValueState。

下面开始开发代码,
创建package:com.imooc.scala.state
创建object:KeyedState_AlarmDemo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.imooc.scala.state

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.{MapFunction, RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

/**
* 温度告警:ValueState
* Created by xuwei
*/
object KeyedState_AlarmDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

//数据格式为 设备ID,温度
val text = env.socketTextStream("bigdata04", 9001)

import org.apache.flink.api.scala._
text.map(line=>{
val tup = line.split(",")
(tup(0),tup(1).toInt)
}).keyBy(_._1)
.flatMap(new RichFlatMapFunction[(String,Int),String] {
//声明一个ValueState类型的状态变量,存储设备上一次收到的温度数据
private var lastDataState: ValueState[Int] = _

/**
* 任务初始化的时候这个方法执行一次
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//注册状态
val valueStateDesc = new ValueStateDescriptor[Int](
"lastDataState",//指定状态名称
classOf[Int]//指定状态中存储的数据类型
)
lastDataState = getRuntimeContext.getState(valueStateDesc)
}

override def flatMap(value: (String, Int), out: Collector[String]): Unit = {
//初始化
if(lastDataState.value() == null){
lastDataState.update(value._2)
}
//获取上次温度
val tmpLastData = lastDataState.value()
//如果某个设备的最近两次温差超过20度,则告警
if(Math.abs(value._2 - tmpLastData) >= 20){
out.collect(value._1+"_温度异常")
}
//更新状态
lastDataState.update(value._2)
}
}).print()

env.execute("KeyedState_AlarmDemo")
}
}
1
2
3
在这段代码中,确实没有设置状态存储位置或状态后端。默认情况下,Flink会使用内存作为状态后端,将状态存储在TaskManager的JVM堆上。如果要使用其他状态后端,例如RocksDB,需要在Flink配置文件中进行配置或在代码中显式设置。这段代码可能只是一个简单的示例,用于演示如何使用`ValueState`。

无论您使用哪种状态后端(RocksDB或文件系统),都需要启用checkpoint才能在发生故障时恢复状态。状态后端负责管理运行时状态的存储和访问,而checkpoint则负责将状态数据定期保存到持久化存储中,以便在发生故障时可以从最近的checkpoint恢复状态。因此,如果您希望您的应用程序能够在发生故障时恢复状态,那么建议启用checkpoint。
1
2
3
4
5
6
[root@bigdata04 ~]# nc -l 9001

[root@bigdata04 ~]# nc -l 9001
s1,50
s2,10
s3,10
1
2
3
4
5
6
一直到这,程序还是打印不出任何数据,说明没有触发温度异常的逻辑。
因为这里会针对每个key维护一个状态,现在输入的3个key都是不一样的,所以没有触发。

只有再输入一个s1的值,才可能会触发
[root@bigdata04 ~]# nc -l 9001
s1,10
1
2
3
此时程序触发打印操作了

s1_温度异常
1
2
3
4
5
为了加深大家的理解,我在代码里面增加一些调试代码,打印一些中间数据,便于我们观察分析:
1:首先将任务的并行度调整一下,默认我的windows是8核的,所以并行度为8,为了看起来清晰,我就在代码层面将任务的并行度设置为8。
2:在flatmap中增加一些打印操作,将当前的线程id、初始的状态值,以及上次的状态值打印一下。

修改后的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.imooc.scala.state

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.{MapFunction, RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

/**
* 温度告警:ValueState
* Created by xuwei
*/
object KeyedState_AlarmDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)
//设置任务全局并行度为8
env.setParallelism(8)

//数据格式为 设备ID,温度
val text = env.socketTextStream("bigdata04", 9001)

import org.apache.flink.api.scala._
text.map(line=>{
val tup = line.split(",")
(tup(0),tup(1).toInt)
}).keyBy(_._1)
.flatMap(new RichFlatMapFunction[(String,Int),String] {
//声明一个ValueState类型的状态变量,存储设备上一次收到的温度数据
private var lastDataState: ValueState[Int] = _

/**
* 任务初始化的时候这个方法执行一次
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//注册状态
val valueStateDesc = new ValueStateDescriptor[Int](
"lastDataState",//指定状态名称
classOf[Int]//指定状态中存储的数据类型
)
lastDataState = getRuntimeContext.getState(valueStateDesc)
}

override def flatMap(value: (String, Int), out: Collector[String]): Unit = {
println("线程ID:"+Thread.currentThread().getId+",接收到数据:"+value)
//打印当前的线程ID和接收到的数据
//初始化
if(lastDataState.value() == null){
lastDataState.update(value._2)
println("lastDataState is null")//打印初始的状态为null
}
println("lastDataState is "+ lastDataState.value())//打印上次的状态值
//获取上次温度
val tmpLastData = lastDataState.value()
//如果某个设备的最近两次温差超过20度,则告警
if(Math.abs(value._2 - tmpLastData) >= 20){
out.collect(value._1+"_温度异常")
}
//更新状态
lastDataState.update(value._2)
}
}).print()

env.execute("KeyedState_AlarmDemo")
}

}
1
2
3
4
5
6
7
8
9
10
11
重启开启socket

运行代码。
在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
s1,50

控制台打印的数据如下:
线程ID:91,接收到数据:(s1,50)
lastDataState is null
lastDataState is 50
1
2
3
4
5
6
7
8
9
10
11
flatmap算子的并行度为8,同时会有8个线程处理,通过keyBy对数据流分组之后,相同规则的数据会进入到同一个线程中。
这里就说明s1,50这条数据被线程ID为91的线程处理了,s1这个key是第一次过来,对应的状态为null,所以这里打印出来了null,当状态为null时,程序会把当前的数据赋值给状态,所以下面打印的上次状态值就是50了。

在之前的socket中再模拟产生一条数据:
[root@bigdata04 ~]# nc -l 9001
s2,10

控制台打印的数据如下:
线程ID:84,接收到数据:(s2,10)
lastDataState is null
lastDataState is 10
1
2
3
4
5
6
7
8
9
10
此时说明这条数据被线程ID为84的线程处理了,s2这个key也是第一次过来的,所以初始也为null,后面给他赋值之后是10。

在之前的socket中再模拟产生一条数据:
[root@bigdata04 ~]# nc -l 9001
s3,10

控制台打印的数据如下:
线程ID:91,接收到数据:(s3,10)
lastDataState is null
lastDataState is 10
1
2
3
4
此时显示的线程ID也是91,说明s3这条数据和s1那条数据都是由同一个线程处理的,但是注意此时s3对应的状态默认依然是null,因为s3也是第一次过来,虽然ts3和s1都是由同一个线程处理的,但是状态是和key绑定的,所以s3对应的状态依然是null。
所以前面咱们说过,KeyedSate是和key绑定的,针对KeyedStream上的每个Key,Flink都会维护一个状态实例。

如果我们把ValueState换成一个普通的int变量,就不是这样的效果了,当s3这条数据过来的时候就可以获取到之前s1存储的数据50了,因为普通的变量在同一个线程里面是共享的。
直播间数据统计-MapState
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
接下来我们来看第二个案例:直播间数据统计
大致需求是这样的,需要统计平台中每个主播在直播间内收到的礼物信息、点赞、关注等指标,以直播间为单位进行统计。

由于用户每次开播都会生成一个新的直播间vid,并且我们需要基于这个直播间vid统计它里面的数据指标,这些数据指标包含多个维度,例如:礼物对应的数量、点赞对应的数据量、关注对应的数量,这种数据就适合使用MapState进行存储了,每一个直播间的数据指标存储到一个MapState中。

下面开始开发代码,
创建object:KeyedState_VideoDataDemo

由于数据格式是JSON格式的,所以引入fastjson依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
<!--<scope>provided</scope>-->
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.imooc.scala.state

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

/**
* 直播间数据统计-MapState
* Created by xuwei
*/
object KeyedState_VideoDataDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

//数据格式
//送礼数据:{"type":"gift","uid":"1001","vid":"29901","value":100}
//关注数据:{"type":"follow","uid":"1001","vid":"29901"}
//点赞数据:{"type":"like","uid":"1001","vid":"29901"}
val text = env.socketTextStream("bigdata04", 9001)

import org.apache.flink.api.scala._
text.map(line=>{
val videoJsonData = JSON.parseObject(line)
val vid = videoJsonData.getString("vid")
val videoType = videoJsonData.getString("type")
//也可以使用if语句实现
videoType match {
case "gift" => {
val value = videoJsonData.getIntValue("value")
(vid,videoType,value)
}
case _ => (vid,videoType,1)
}
}).keyBy(_._1)//注意:后面也可以使用flatmap算子,在这里换一种形式,使用低级API process
.process(new KeyedProcessFunction[String,(String,String,Int),(String,String,Int)] {
//声明一个MapState类型的状态变量,存储用户的直播间数据指标
//MapState中key的值为gift\follow\like,value的值为累加后的结果
private var videoDataState: MapState[String,Int] = _

override def open(parameters: Configuration): Unit = {
//注册状态
val mapStateDesc = new MapStateDescriptor[String,Int](
"videoDataState",//指定状态名称
classOf[String],//指定key的类型
classOf[Int]//指定value的类型
)
videoDataState = getRuntimeContext.getMapState(mapStateDesc)
}

override def processElement(value: (String, String, Int),
ctx: KeyedProcessFunction[String, (String, String, Int), (String, String, Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {
val videoType = value._2
var num = value._3
//判断状态中是否有这个数据
if(videoDataState.contains(videoType)){
num += videoDataState.get(videoType)
}
//更新状态
videoDataState.put(videoType,num)
out.collect((value._1,videoType,num))
}
}).print()

env.execute("KeyedState_VideoDataDemo")
}

}
1
2
3
4
5
6
7
8
9
在Flink程序中,要使用Keyed State,必须先对数据流进行keyBy操作。keyBy操作会将数据流分区,使得具有相同键的数据被发送到同一个算子实例进行处理。

在keyBy之后,可以使用flatMap或process函数来访问和操作Keyed State。这两种函数都可以访问Keyed State,但它们之间有一些区别。

`flatMap`函数允许您对输入数据进行一对多的转换。它接收一个输入元素,并可以生成零个、一个或多个输出元素。它通常用于对数据流进行扁平化处理。

process函数提供了更多的控制能力。它允许您访问时间戳、watermark和其他元数据,并且可以注册定时器来执行基于时间的操作。它通常用于更复杂的事件驱动应用程序。

因此,在选择使用flatMap还是process函数时,需要根据您的应用程序需求来决定。
1
2
3
4
5
6
7
除了`flatMap`和`process`函数之外,还可以使用`reduce`和`aggregate`函数来访问和操作Keyed State。

`reduce`函数允许您对数据流中的元素进行归约操作,以计算每个键的累积结果。它需要一个ReduceFunction,该函数指定如何将两个输入元素合并为一个输出元素。

`aggregate`函数与`reduce`函数类似,但提供了更多的灵活性。它需要一个AggregateFunction,该函数由三个方法组成:`createAccumulator`、`add`和`getResult`。这些方法分别用于创建累加器、将输入元素添加到累加器中并计算累积结果。

这些函数都可以访问Keyed State,并根据您的应用程序需求选择使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
开启一个新的socket

运行代码。
在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
{"type":"gift","uid":"1001","vid":"29901","value":100}
{"type":"follow","uid":"1001","vid":"29901"}
{"type":"like","uid":"1001","vid":"29902"}
{"type":"like","uid":"1001","vid":"29901"}
{"type":"gift","uid":"1001","vid":"29901","value":100}
{"type":"like","uid":"1001","vid":"29904"}

控制台打印的数据如下
7> (29901,gift,100)
7> (29901,follow,1)
6> (29902,like,1)
7> (29901,like,1)
7> (29901,gift,200)
6> (29904,like,1)
订单数据补全-ListState
1
2
3
4
5
6
7
8
接下来我们来看第三个案例:订单数据补全
大致需求是这样的,某外卖平台需要开发一个实时订单消息推送功能,当用户下单,并且成功支付后向商家推送一条消息。
其中下单数据是一个数据流,支付数据是另外一个数据流。
这个时候就需要对两个数据流进行关联了,当同一个订单相关的数据都到齐之后向外推送消息。
针对这个需求,我们计划使用ListState实现。

下面开始开发代码,
创建object:KeyedState_OrderDataDemo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.imooc.scala.state

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

/**
* 订单数据补全-ListState (双流Join)
* 订单数据流 + 支付数据流
* Created by xuwei
*/
object KeyedState_OrderDataDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

//数据格式为
//订单数据流:{"pid":"1001","pname":"n1"}
//支付数据流:{"pid":"1001","pstatus":"success"}
val orderText = env.socketTextStream("bigdata04", 9001)
val payText = env.socketTextStream("bigdata04", 9002)

import org.apache.flink.api.scala._
//解析订单数据流
val orderTupleData = orderText.map(line => {
val orderJsonObj = JSON.parseObject(line)
val pid = orderJsonObj.getString("pid")
val pname = orderJsonObj.getString("pname")
(pid, pname)
})

//解析支付数据流
val payTupleData = payText.map(line => {
val payJsonObj = JSON.parseObject(line)
val pid = payJsonObj.getString("pid")
val pstatus = payJsonObj.getString("pstatus")
(pid, pstatus)
})


//针对两个流进行分组+connect连接(也可以先对两个流分别调用keyBy,再调用connect,效果一样)
orderTupleData.connect(payTupleData)
.keyBy("_1","_1")//field1表示第1个流里面的分组字段,field2表示第2个流里面的分组字段
.process(new KeyedCoProcessFunction[String,(String,String),(String,String),(String,String,String)] {
//声明两个ListState类型的状态变量,分别存储订单数据流和支付数据流
/**
* 注意:针对这个业务需求,pid在一个数据流中是不会重复的,其实使用ValueState也是可以的,
* 因为在这里已经通过keyBy基于pid对数据分组了,所以只需要在状态中存储pname或者pstatus即可。
* 但是如果pid数据在一个数据流里面会重复,那么就必须要使用ListState了,这样才能存储指定pid的多条数据
*/
private var orderDataState: ListState[(String,String)] = _
private var payDataState: ListState[(String,String)] = _

override def open(parameters: Configuration): Unit = {
//注册状态
val orderListStateDesc = new ListStateDescriptor[(String,String)](
"orderDataState",
classOf[(String, String)]
)
val payListStateDesc = new ListStateDescriptor[(String,String)](
"payDataState",
classOf[(String, String)]
)
orderDataState = getRuntimeContext.getListState(orderListStateDesc)
payDataState = getRuntimeContext.getListState(payListStateDesc)
}

//处理订单数据流
override def processElement1(orderTup: (String, String),
ctx: KeyedCoProcessFunction[String, (String, String), (String, String), (String, String, String)]#Context,
out: Collector[(String, String, String)]): Unit = {
//获取当前pid对应的支付数据流,关联之后输出数据,(可能是支付数据先到)
payDataState.get().forEach(payTup=>{
out.collect((orderTup._1,orderTup._2,payTup._2))
})
//将本次接收到的订单数据添加到状态中,便于和支付数据流中的数据关联
orderDataState.add(orderTup)
}
//处理支付数据流
override def processElement2(payTup: (String, String),
ctx: KeyedCoProcessFunction[String, (String, String), (String, String), (String, String, String)]#Context,
out: Collector[(String, String, String)]): Unit = {
//获取当前pid对应的订单数据流,关联之后输出数据,(可能是订单数据先到)
orderDataState.get().forEach(orderTup=>{
out.collect((orderTup._1,orderTup._2,payTup._2))
})
//将本次接收到的订单数据添加到状态中,便于和订单数据流中的数据关联
payDataState.add(payTup)
}
}).print()

env.execute("KeyedState_OrderDataDemo")
}

}
1
2
3
开启两个新的socket
[root@bigdata04 ~]# nc -l 9001
[root@bigdata04 ~]# nc -l 9002
1
2
3
4
5
6
7
8
9
运行代码。

在socket 9001中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
{"pid":"1001","pname":"n1"}

在socket 9002中模拟产生数据:
[root@bigdata04 ~]# nc -l 9002
{"pid":"1003","pstatus":"success"}
1
2
3
4
5
此时两条数据不是同一个订单的,所以程序没有任何输出

接下来继续在socket 9001中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
{"pid":"1003","pname":"n3"}
1
2
3
4
5
6
7
8
9
此时可以看到控制台输出一条数据
5> (1003,n3,success)

接下来继续在socket 9002中模拟产生数据:
[root@bigdata04 ~]# nc -l 9002
{"pid":"1001","pstatus":"success"}

此时可以看到控制台输出一条数据
4> (1001,n1,success)
1
2
3
4
5
6
7
8
9
10
这样就实现了订单数据流和支付数据流的数据关联,这就是Flink中的双流Join,双流Join还有很多中实现方式,后面我们会有一个单独的章节详细分析双流Join。

注意:这个案例中两个数据流的数据会一直存储在状态中,随着时间的增长,状态会越来越多,状态如果是存储在内存中的话,内存可能就扛不住了,最终导致内存溢出。

针对这个问题有两个解决方案

我们自己基于业务层面从状态中清理掉不用的数据,例如两份数据join到一起之后,就删除状态中的数据,这样可以保证状态中的数据不会一直无限递增。
状态设置一个失效机制,官方提供的有一个TTL机制,可以给状态设置一个生存时间 ,过期自动删除,这个TTL机制我们后面再具体分析。

在这里我们只是演示了状态的存储,针对状态的故障恢复我们还没有演示,因为我们使用状态的很大一个原因就是考虑到状态可以实现故障后的数据恢复。等后面讲到状态一致性的时候我们再来具体演示基于状态的任务故障后,状态数据是如何恢复的。
Keyed State的使用形式总结
1
2
3
4
5
6
7
8
9
10
11
12
13
14
在程序中想要使用Keyed State,大致可以通过下面这几种形式:

第一种:通过重写RichXXXFunction,在里面创建和操作状态。例如针对map算子可以使用RichMapFunction,针对flatmap算子可以使用RichFlatMapFunction等。
在这里使用对应的RichFunction主要是因为它里面提供了getRuntimeContext这个上下文,通过getRuntimeContext可以获取Keyed State。

第二种:通过process()这种低级API。
主要是因为process中也可以获取到getRuntimeContext这个上下文。

第三种:其实还通过mapWithState()、flatMapWithState()等直接带有状态的算子,这种算子里面针对Keyed State直接进行了封装,使用起来更加方便,但是它有一定的局限性。
首先是这些带有状态的算子只能应用在keyBy算子之后
还有就是这些带有状态的算子里面封装的其实都是ValueState这种状态,不能自己控制使用哪种状态。

下面我们来演示一下这种状态的使用:
以常用的单词计数这个需求为例

image-20230601174451755

1
2
3
Scala有一种特殊的数据类型,叫做Option。
Option有两种值,一种是Some,表示有值,一种是None,表示没有值
Option通常会用于模式匹配中,用于判断某个变量是有值还是没有值,这比null来的更加简洁明了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.imooc.scala.state

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* 使用MapWithState实现带有状态的单词计数案例
* 思考问题:有状态的单词计数代码 和 无状态的单词计数代码有什么区别?
* Created by xuwei
*/
object KeyedState_MapWithStateDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

val text = env.socketTextStream("bigdata04", 9001)

import org.apache.flink.api.scala._
val keyedStream = text.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)

/**
* T:当前数据流中的数据类型
* R:返回数据类型
* S:State中存储的数据类型
* [(String,Int),Int] (String,Int):R Int:S
* 注意:在这里State默认是ValueState,ValueState中可以存储多种数据类型。
*/
keyedStream.mapWithState[(String,Int),Int]((in: (String,Int),count: Option[Int])=>{
count match {
//(t1,t2)
//t1: 表示这个map操作需要返回的数据
//t2: 表示State中目前的数据
case Some(c) => ((in._1,in._2+c),Some(in._2+c))//第2及以上次数,返回累加后的数据,更新状态;c是count简写,用count也行
case None => ((in._1,in._2),Some(in._2))//第1次接收到数据,直接返回数据,初始化状态
}
}).print()

env.execute("KeyedState_MapWithStateDemo")
}
}
1
分析一下mapWithState方法的底层实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def mapWithState[R: TypeInformation, S: TypeInformation](
fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Map function must not be null.")
}

val cleanFun = clean(fun)
val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
val serializer: TypeSerializer[S] = stateTypeInfo.createSerializer(getExecutionConfig)

val mapper = new RichMapFunction[T, R] with StatefulFunction[T, R, S] {

override val stateSerializer: TypeSerializer[S] = serializer

override def map(in: T): R = {
applyWithState(in, cleanFun)
}
}

map(mapper)
}
1
以及它里面封装的状态:ValueState
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
trait StatefulFunction[I, O, S] extends RichFunction {

protected val stateSerializer: TypeSerializer[S]

private[this] var state: ValueState[S] = _


def applyWithState(in: I, fun: (I, Option[S]) => (O, Option[S])): O = {
val (o, s: Option[S]) = fun(in, Option(state.value()))
s match {
case Some(v) => state.update(v)
case None => state.update(null.asInstanceOf[S])
}
o
}

override def open(c: Configuration) = {
val info = new ValueStateDescriptor[S]("state", stateSerializer)
state = getRuntimeContext().getState(info)
}
}
1
2
3
4
5
6
7
8
9
10
11
开启一个新的socket
[root@bigdata04 ~]# nc -l 9001

启动程序。
在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
a b

控制台打印的结果如下:
2> (b,1)
6> (a,1)
1
2
3
4
5
6
再模拟产生一条数据
[root@bigdata04 ~]# nc -l 9001
a

控制台打印的结果如下:
6> (a,2)
1
2
3
从这里可以看出来,整个代码的执行流程是没有问题的。

注意:这种方式了解即可,以后看到了这种写法知道是什么意思就行,个人不建议在工作中使用方式,因为这种方式不好理解,不属于通俗易懂的代码。
Operator State 详解
1
2
3
4
5
6
接下来我们来看一下针对Operator State的解释
Operator State表示是和算子绑定的状态,与Key无关。所以说Operator State可以应用在任何类型的数据流上。

此时算子的同一个子任务共享一个状态实例,流入这个算子子任务的数据可以访问和更新这个状态实例。

看一下下面这个图,加深一下理解

image-20230601181849199

1
2
3
4
5
6
7
8
9
10
11
12
13
14
图中左边的Source表示是数据源,这个组件的并行度为2,会产生2个task。
右边的Stateful这个有状态的算子的并行度也是2,对应也会产生2个task。

此时Source-1的数据都会进入到Stateful -1这个子任务中,Stateful -1会维护一个状态实例,他接收到的A\B\Y这几个数据会存储到同一个状态实例中,A\B\Y这些数据会共享同一个状态实例。

对应的Source-2的数据都会进入到Stateful -2这个子任务中,Stateful -2会维护一个状态实例,他接收到的D\E\Z这几个数据会存储到同一个状态实例中。D\E\Z这些数据会共享同一个状态实例。

从这个图里面可以清晰的看出来Operator State和Keyed State之间的区别。

在实际工作中Operator State的实际应用场景不如Keyed State多。Operator State经常被用在Source或Sink组件中,用来保存流入数据的偏移量或者对输出的数据做缓存,以保证Flink应用的Exactly-Once语义。

其中有一个典型的应用场景是Flink从Kafka中消费数据,这个时候会用到FlinkKafkaConsumerBase这个接口,我们之前在讲Flink中的DataSource的时候讲到过,针对kafka的DataSource可以提供仅一次语句,想要提供仅一次语句,那么这个DataSource中就需要维护状态了,通过状态来维护消费偏移量信息。

FlinkKafkaConsumerBase中实际上会维护消费者消费的topic名称、分区编号和offset偏移量这些信息,这些数据会使用Operator State类型的状态进行存储,类似图中显示的这样:

image-20230601182307162

1
2
3
4
5
6
7
8
9
10
实际上这里面会用到Operator State中的UnionListState这种状态,其实就是一个基于List列表的状态。

接下来我们来具体分析一下FlinkKafkaConsumerBase的源码,需要先引入flink-connector-kafka这个依赖。

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.0</version>
<!--<scope>provided</scope-->
</dependency>
1
2
在FlinkKafkaConsumerBase这个类的第201行有这么一行代码:
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
1
2
3
4
5
6
7
8
9
这里用到了ListState,咱们刚才说的他使用的是UnionListState,其实UnionListState的底层就是ListState,唯一的区别就是任务故障恢复时状态数据的传输方式不一样,咱们之前在讲Operator State的扩缩容模式的时候详细分析过。

这个ListState中存储的是Tuple2数据结构,Tuple2中的第1列是KafkaTopicPartition。
KafkaTopicPartition中存储的是Kafka中指定topic的名称和分区的编号。
查看KafkaTopicPartition的源码:
private final String topic;
private final int partition;

Tuple2中的第2列是Long类型,其实存储的就是指定Topic对应分区的消费偏移量。
Operator State中支持的数据结构
1
接下来我们来具体分析一下 Operator State中支持的常见数据结构:

image-20230601182743126

1
2
3
4
5
6
7
8
由于Operator State中存储的是算子的同一个子任务中的状态数据,所以提供的都是可以存储多条数据的状态,没有提供ValueState这种状态。

ListState:表示是一个列表类型的状态,存储类型为T的多个元素,T是一个泛型。
UnionListState:底层就是ListState。

ListState和UnionListState的区别在于任务故障后恢复数据时:ListState是将整个状态列表按照负载均衡算法均匀分布到各个算子子任务上,每个算子子任务得到的是整个列表的子集;而UnionListState会按照广播的方式,将整个列表发送给每个算子子任务。

ListState和UnionListState的区别在源码层面也可以看到,看一下OperatorStateStore这个接口的内容:
1
2
ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
1
2
3
4
通过这块代码可以看出来,ListState和UnionListState对应的数据结构是一样的。

BroadcastState:主要存储K-V类型的多个元素,存储的数据格式和MapState一样,但是它在恢复数据的时候会广播发送数据。
BroadcastState属于OperatorState的一种特殊类型,主要是为了实现同一个算子的多个子任务共享一个State。
Operator State的使用案例
1
前面我们针对Operator State的原理有了一定的了解,下面我们来实操一下,掌握Operator State在工作中的应用:
ListState的使用
Flink中自带的有状态的Source
1
2
3
4
首先我们来分析一个Flink内部已有的Source:FromElementsFunction
当我们使用env.fromElements或者env.fromCollection时,底层其实会用到FromElementsFunction这个数据源。
代码路径如下所示:
env.fromCollection() --> javaEnv.fromCollection(collection, typeInfo) --> SourceFunction<OUT> function = new FromElementsFunction<>(data) --> FromElementsFunction(Iterable<T> elements)
1
2
3
在FromElementsFunction这个类中有一个ListState:checkpointedState

private transient ListState<Integer> checkpointedState;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
checkpointedState这个状态主要在initializeState和snapshotState这两个方法里面用到了。

initializeState和snapshotState这两个方法有什么作用吗?

在分析这两个方法之前,我们需要再重新梳理一下状态这块的概念

从本质上来说,状态属于Flink算子子任务中的一种本地数据,为了保证这份数据的可恢复性,需要借助于Checkpoint机制来将状态数据持久化输出到分布式文件系统中。

状态相关的主要逻辑有2块:
1:在Checkpoint时将算子子任务中的状态数据写入到外部存储中,这个过程可以简称为:snapshot。
2:在任务初始化或重启时,以一定的逻辑从外部存储中读取状态数据,并且恢复为算子子任务的本地数据,这个过程可以简称为restore。

针对Keyed State来说,它对这2块内容做了完善的封装,我们程序员可以开箱即用。

但是对于Operator State来说,每个算子子任务管理自己的Operator State,或者说每个算子子任务上的数据流共享同一个状态实例,可以访问和修改该状态。算子子任务上的数据在程序重启、扩缩容等场景下不能保证百分百的一致性。简单来说,就是Flink应用重启后,某个数据流中的元素不一定会和上次一样,还能流入到对应的子任务上。
因此,我们需要根据自己的业务场景来设计snapshot和restore的逻辑。为了实现这两块的业务逻辑,Flink提供了最为基础的CheckpointedFunction接口。

所以在这我们可以发现其实FromElementsFunction这个类就实现了CheckpointedFunction这个接口。

CheckpointedFunction这个接口中提供了两个方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
其中snapshotState对应的就是刚才我们所分析的那个snapshot过程。
initializeState对应的就是刚才我们所分析的那个restore过程。

那我们来看一下FromElementsFunction里面针对这两个过程是怎么实现的:

首先看一下snapshotState这个方法:
这个方法会在checkpoint时触发执行,将checkpointedState中的数据持久化到外部存储中。

public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkState(
this.checkpointedState != null,
"The " + getClass().getSimpleName() + " has not been properly initialized.");
//先清空一下checkpointedState中之前保存的数据
this.checkpointedState.clear();
//然后把目前正在处理的数据保存到checkpointedState中
this.checkpointedState.add(this.numElementsEmitted);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
接下来看一下initializeState这个方法:
这个方法会在任务第一次启动,以及后续重启时执行,主要是为了从外部存储中读取状态数据,并且把状态数据恢复成本地数据。

public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkState(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been initialized.");
//注册状态
this.checkpointedState =
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>(
"from-elements-state", IntSerializer.INSTANCE));
//判断任务是否是重启
if (context.isRestored()) {
List<Integer> retrievedStates = new ArrayList<>();
//从外部存储中读取状态数据,存储为本地数据
for (Integer entry : this.checkpointedState.get()) {
retrievedStates.add(entry);
}

// given that the parallelism of the function is 1, we can only have 1 state
Preconditions.checkArgument(
retrievedStates.size() == 1,
getClass().getSimpleName() + " retrieved invalid state.");
//从本地数据中获取第一条数据赋值给numElementsToSkip
this.numElementsToSkip = retrievedStates.get(0);
}
}
1
2
3
注意:在initializeState里面注册状态时,首先会根据指定的状态名称到状态的外部存储中检查一下是否存在一个和当前状态名称相同的状态,如果存在,则尝试对状态进行恢复,如果不存在,则默认初始化状态。

这个是Flink中自带的有状态的Source。
自定义一个有状态的Sink
1
2
3
4
下面我们通过自定义一个有状态的Sink来感受一下CheckpointedFunction的具体使用:

需求是这样的:
我们想要实现一个批量输出的功能,此时可以考虑在Sink组件内部定义一个缓存,但是还要保证数据一定会输出到外部系统。这个时候就需要借助于状态实现了,通过snapshotState定期将批量缓存的数据保存到状态中,如果程序出现了故障,重启后还可以从状态中将未输出的数据读取到缓存中,继续输出到外部系统。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.imooc.scala.state

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* 通过自定义Sink实现批量输出
* Created by xuwei
*/
object OperatorState_MyBufferSinkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)
//设置并行度为2
env.setParallelism(2)

val text = env.socketTextStream("bigdata04", 9001)

import org.apache.flink.api.scala._
text.flatMap(_.split(" "))
.map((_, 1))
.addSink(new MyBufferSink())

env.execute("OperatorState_MyBufferSinkDemo")
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.imooc.scala.state

import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction

import scala.collection.mutable.ListBuffer

/**
* 自定义批量输出Sink
* Created by xuwei
*/
class MyBufferSink extends SinkFunction[(String,Int)] with CheckpointedFunction{
//声明一个ListState类型的状态变量
private var checkpointedState: ListState[(String, Int)] = _

//定义一个本地缓存
private val bufferElements = ListBuffer[(String, Int)]()

/**
* Sink的核心处理逻辑,将接收到的数据输出到外部系统
* 接收到一条数据,这个方法就会执行一次
* @param value
* @param context
*/
override def invoke(value: (String, Int), context: SinkFunction.Context): Unit = {
//将接收到的数据保存到本地缓存中
bufferElements += value
//当本地缓存大小到达一定阈值时,将本地缓存中的数据一次性输出到外部系统
if (bufferElements.size == 2) {
println("======start======")
for (element <- bufferElements) {
println(element)
}
println("======end======")
//清空本地缓存中的数据
bufferElements.clear()
}
}

/**
* 将本地缓存中的数据保存到状态中,在执行checkpoint时,会将状态中的数据持久化到外部存储中
* @param context
*/
override def snapshotState(context: FunctionSnapshotContext): Unit = {
//将上次写入到状态中的数据清空
checkpointedState.clear()
//将最新的本地缓存中的数据写入到状态中
for (element <- bufferElements) {
checkpointedState.add(element)
}
}

/**
* 初始化或者恢复状态
* @param context
*/
override def initializeState(context: FunctionInitializationContext): Unit = {
//注册状态
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
classOf[(String,Int)]
)
//此时借助于context获取OperatorStateStore,进而获取ListState
checkpointedState = context.getOperatorStateStore.getListState(descriptor)

// 如果是重启任务,需要从外部存储中读取状态数据并写入到本地缓存中
if(context.isRestored) {
checkpointedState.get().forEach(e=>{
bufferElements += e
})
}
}
}
1
2
3
4
5
6
开启一个新的Socket

启动代码。
在socket中模拟产生数据:
[root@bigdata04 ~]# nc -l 9001
a
1
2
3
4
5
此时控制台没有输出数据。

再产生一条数据:
[root@bigdata04 ~]# nc -l 9001
b
1
2
3
4
5
6
7
8
9
10
11
12
13
此时控制台依然没有输出数据。
为什么产生两条数据了还是没有输出?这是因为此时我们给程序设置的并行度为2。所以sink组件会产生2个子任务,这两条数据现在被划分到了两个子任务中,每个子任务只收到了1条数据,所以不满足输出数据的条件。
如果把任务的并行度修改为1这个时候就会输出数据了。

再产生一条数据:
[root@bigdata04 ~]# nc -l 9001
a

此时发现控制台输出数据了:
======start======
(a,1)
(a,1)
======end======
1
这样就通过自定义Sink实现了有状态的批量输出功能。
UnionListState的使用
1
2
3
4
针对UnionListState的使用,在Kafka Connector中的FlinkKafkaConsumerBase里面有应用,我们来看一下对应的代码:
前面其实我们已经简单分析了一些内容了,这个时候再来查看FlinkKafkaConsumerBase这个类,理解会更深一些:
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
1
2
3
4
5
在这里可以发现这个类其实也实现了CheckpointedFunction接口。
那我们来看一下FlinkKafkaConsumerBase中对应的snapshotState和initializeState这两个方法的实现

首先来看snapshotState方法:
snapshotState方法里面的逻辑大致就是先清空之前unionOffsetStates中的数据,然后写入最新的数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
//清空之前写入的状态数据
unionOffsetStates.clear();

final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
subscribedPartitionsToStartOffsets.entrySet()) {
//向状态中写入数据
unionOffsetStates.add(
Tuple2.of(
subscribedPartition.getKey(), subscribedPartition.getValue()));
}

if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call
// can happen
// on this function at a time: either snapshotState() or
// notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call
// can happen
// on this function at a time: either snapshotState() or
// notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}

for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
currentOffsets.entrySet()) {
//向状态中写入数据
unionOffsetStates.add(
Tuple2.of(
kafkaTopicPartitionLongEntry.getKey(),
kafkaTopicPartitionLongEntry.getValue()));
}
}

if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
}
}
}
1
2
3
4
5
接下来再看一下initializeState方法
initializeState方法里面的逻辑大致就是初始化或者获取unionOffsetStates,然后再判断任务是不是重启,是的话就把数据给本地变量。
这里面比较特殊的就是在获取状态时使用的是getUnionListState这个方法,这个方法返回的就是UnionListState这种类型的状态了。这种状态咱们前面分析过,他的底层其实就是ListState、唯一的区别就是任务在故障恢复时状态数据的传输方式不一样。
这里面用到了UnionListState,说明FlinkKafkaConsumer在故障恢复的时候每个子任务都可以获取到之前所有子任务中维护的状态数据,这样做的目的是为了便于重新给每个子任务分配需要消费的topic分区信息。
此时Flink任务在重启的时候,每个FlinkKafkaConsumer子任务都可以获取到待消费的kafka中指定topic的所有分区信息和对应的消费偏移量信息,具体某一个FlinkKafkaConsumer子任务在运行的时候会按照一定的策略选择一个或者多个分区进行消费。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void initializeState(FunctionInitializationContext context) throws Exception {

OperatorStateStore stateStore = context.getOperatorStateStore();
//初始化或者获取unionOffsetStates,这里用到了getUnionListState
this.unionOffsetStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
createStateSerializer(getRuntimeContext().getExecutionConfig())));

if (context.isRestored()) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

// populate actual holder for restored state
//从外部存储中获取状态数据,恢复到本地变量中
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
}

LOG.info(
"Consumer subtask {} restored state: {}.",
getRuntimeContext().getIndexOfThisSubtask(),
restoredState);
} else {
LOG.info(
"Consumer subtask {} has no restore state.",
getRuntimeContext().getIndexOfThisSubtask());
}
}
1
这就是UnionListState的使用。
BroadcastState的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
针对BroadcastState的使用,一个典型的应用案例就是两个流连接的场景。
假设其中一个数据流是 事件数据流,它属于普通的数据流,里面是一些用户行为数据。

另外一个数据流是 配置数据流,它不是普通的数据流,它是广播数据流,里面是一些映射关系数据。

需求是使用配置数据流中的映射关系数据去完善事件数据流中的用户行为数据。

这个需求来源于某直播平台,这个直播平台会在多个国家运营,如果每一个国家都使用一套运营策略,会比较麻烦,运营成本比较高,意义也不是特别大。为了方便运营管理,所以平台内部提出了大区这个概念,可以将一些国家划分到同一个大区里面,同一个大区使用相同的运营策略。
那么对应的就有国家和大区之间的映射关系,这个映射关系不是一成不变的,他会随着平台的发展而发生变化。
在用户行为数据中针对用户的基础数据里面只有用户所属的国家信息,没有包含大区信息,因为国家和大区的关系是可变的,但是在做报表统计的时候,是需要以大区维度进行统计的。
所以针对实时报表这种场景,就需要对用户行为数据中的国家信息进行实时关联转换了。

为了加深理解,来看一下下面这个图:

image-20230601184843285

1
2
3
4
5
6
7
8
这个图里面有两个实时数据流。
上面的事件数据流里面是用户的行为数据。
下面的配置数据流里面是国家和大区之间的最新映射关系。
在程序中需要将配置数据流广播出去,转换为BroadcastState,然后将两份数据流连接到一起,这样在处理事件数据流中的用户行为数据的时候,可以获取到BroadcastState,基于这份状态数据对用户行为数据中的国家信息进行转换。

下面我们来基于这个需求开发一下对应的代码。

针对事件数据流在实际工作中基本上是来源于kafka的,在这里为了演示方便,我们来开发一个自定义的Source模拟产生数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.imooc.scala.state

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

import scala.util.Random

/**
* 事件数据流-自定义Source
* Created by xuwei
*/
class MyStreamSource extends RichSourceFunction[String]{
var isRunning = true

/**
* 初始化方法,只执行一次
*
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
}

/**
* Source的核心方法,负责源源不断的产生数据
* @param ctx
*/
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
while (isRunning){
//{"dt":"2026-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]}
val time = sdf.format(new Date)
val line_prefix = "{\"dt\":\""+time+"\",\"countryCode\":\""
val line_suffix = "\",\"data\":[{\"type\":\"s1\",\"score\":0.3,\"level\":\"A\"},{\"type\":\"s2\",\"score\":0.2,\"level\":\"B\"}]}"
val countryCodeArr = Array("US","PK","KW")
val num = Random.nextInt(3)
ctx.collect(line_prefix+countryCodeArr(num)+line_suffix)
Thread.sleep(1000)//每隔1秒产生一条数据,控制一下数据产生速度
}
}

/**
* 任务停止的时候执行一次
* 这里主要负责控制run方法中的循环
*/
override def cancel(): Unit = {
isRunning = false
}

/**
*任务停止的时候执行一次
* 这里主要负责关闭在open方法中创建的连接
*/
override def close(): Unit = {

}
}
1
针对配置数据流在实际工作中基本上是来源于Redis或者MySQL,在这里为了演示方便,我们来开发一个自定义的Source模拟从Redis中获取数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.imooc.scala.state

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

import scala.collection.mutable

/**
* 配置数据流-自定义Source
* Created by xuwei
*/
class MyRedisSource extends RichSourceFunction[mutable.Map[String,String]]{
var isRunning = true

/**
* 初始化方法,只执行一次
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//TODO 创建Redis数据库连接
}

/**
* Source的核心方法,负责源源不断的产生数据
* @param ctx
*/
override def run(ctx: SourceFunction.SourceContext[mutable.Map[String, String]]): Unit = {
while (isRunning){
//TODO 需要从Redis中获取这些映射关系
val resMap = mutable.Map("US"->"AREA_US","PK"->"AREA_AR","KW"->"AREA_AR")
ctx.collect(resMap)
Thread.sleep(5000)//每隔5秒更新一次配置数据
}
}

/**
* 任务停止的时候执行一次
* 这里主要负责控制run方法中的循环
*/
override def cancel(): Unit = {
isRunning = false
}

/**
*任务停止的时候执行一次
* 这里主要负责关闭在open方法中创建的连接
*/
override def close(): Unit = {
//TODO 关闭Redis数据库连接
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.imooc.scala.state

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

import scala.collection.JavaConverters.mapAsJavaMap
import scala.collection.mutable

/**
* BroadcastState在两个流连接中的应用(双流Join)
* 这个场景类似于:一个事实表(事件数据流) left join 一个维度表(配置数据流)
* Created by xuwei
*/
object OperatorState_BroadcastStateDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

import org.apache.flink.api.scala._
//构建第1个数据流:事件数据流
val eventStream = env.addSource(new MyStreamSource)

//构建第2个数据流:配置数据流
val confStream = env.addSource(new MyRedisSource)

//将配置数据流广播出去,变成广播数据流,并且注册一个MapState
val countryAreaMapStateDescriptor = new MapStateDescriptor[String,String](
"countryArea",
classOf[String],
classOf[String]
)
val broadcastStream = confStream.broadcast(countryAreaMapStateDescriptor)

//将两个流进行连接
val broadcastConnectStream = eventStream.connect(broadcastStream)

//处理连接后的流
broadcastConnectStream.process(new BroadcastProcessFunction[String,mutable.Map[String,String],String]{
//处理事件数据流中的数据
override def processElement(value: String, ctx: BroadcastProcessFunction[String, mutable.Map[String, String], String]#ReadOnlyContext, out: Collector[String]): Unit = {
val jsonObj = JSON.parseObject(value)
val countryCode = jsonObj.getString("countryCode")
//取出广播状态中的数据
val broadcastState = ctx.getBroadcastState(countryAreaMapStateDescriptor)
val area = broadcastState.get(countryCode)
//任务刚开始执行的时候broadcastState中的数据为空,所以获取不到数据
if(area!=null){
jsonObj.put("countryCode",area)
out.collect(jsonObj.toJSONString)
}
}

//处理广播后的配置数据流中的数据
override def processBroadcastElement(value: mutable.Map[String, String], ctx: BroadcastProcessFunction[String, mutable.Map[String, String], String]#Context, out: Collector[String]): Unit = {
//获取BroadcastState
val broadcastState = ctx.getBroadcastState(countryAreaMapStateDescriptor)
//清空BroadcastState中的数据
broadcastState.clear()
//重新写入最新的映射关系数据
broadcastState.putAll(mapAsJavaMap(value))
}
}).print()

env.execute("OperatorState_BroadcastStateDemo")
}

}
1
2
3
4
5
运行程序,结果发现程序报错,查看错误日志发现提示的是scala的问题:
Exception in thread "main" java.util.concurrent.ExecutionException: scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot initialize the compiler due to java.lang.NoClassDefFoundError: Could not initialize class scala.tools.nsc.Properties$
at org.apache.flink.shaded.guava30.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:566)
at org.apache.flink.shaded.guava30.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:527)
...
1
经过排查发现这个问题主要是因为我们代码里面用到了mapAsJavaMap这种功能,此时需要引入scala相关的依赖包:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.11</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.11</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.11</version>
<!-- <scope>provided</scope> -->
</dependency>
1
2
3
4
5
6
7
重新运行程序,发现可以正常运行,并且输出数据:

3> {"dt":"2022-05-29 18:11:07","data":[{"score":0.3,"level":"A","type":"s1"},{"score":0.2,"level":"B","type":"s2"}],"countryCode":"AREA_US"}
4> {"dt":"2022-05-29 18:11:08","data":[{"score":0.3,"level":"A","type":"s1"},{"score":0.2,"level":"B","type":"s2"}],"countryCode":"AREA_AR"}
5> {"dt":"2022-05-29 18:11:09","data":[{"score":0.3,"level":"A","type":"s1"},{"score":0.2,"level":"B","type":"s2"}],"countryCode":"AREA_AR"}
6> {"dt":"2022-05-29 18:11:10","data":[{"score":0.3,"level":"A","type":"s1"},{"score":0.2,"level":"B","type":"s2"}],"countryCode":"AREA_AR"}
7> {"dt":"2022-05-29 18:11:11","data":[{"score":0.3,"level":"A","type":"s1"},{"score":0.2,"level":"B","type":"s2"}],"countryCode":"AREA_AR"}
1
2
3
这就是BroadcastState的常用应用场景。

注意:针对这个需求,因为没有对数据流进行keyBy分组,如果不对配置数据流进行广播,那么在处理事件数据流中的数据的时候,每一个子任务无法获取到配置数据流中的所有映射关系,所以只能通过全量广播,这样才能让事件数据流的每一个子任务都可以获取到所有的配置数据流中的数据,最终实现数据关联转换。
Operator State的使用形式总结
1
2
3
4
5
6
7
在程序中想要使用Operator State,主要通过实现CheckpointedFunction这个接口,然后实现接口中的initializeState和snapshotState函数。

可以开发一些自定义的类去继承SourceFunction、SinkFunction或者MapFunction,同时实现CheckpointedFunction这个接口,这样也可以在里面使用状态了。

注意:CheckpointedFunction这种方式,既可以操作Operator State,也可以操作Keyed State,在代码中同时操作这两种类型的State也是可以的。

因为在initializeState中,通过context既可以获取KeyedState,又可以获取OperatorState。
1
2
context.getKeyedStateStore
context.getOperatorStateStore

本文标题:大数据开发工程师-Flink新版本1.12以上-1

文章作者:TTYONG

发布时间:2023年04月20日 - 16:04

最后更新:2023年06月02日 - 16:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%B8%83%E5%91%A8-Flink%E6%9E%81%E9%80%9F%E4%B8%8A%E6%89%8B%E7%AF%87-Flink%E6%96%B0%E7%89%88%E6%9C%AC1.12%E4%BB%A5%E4%B8%8A-1.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%