大数据开发工程师-第十七周 Flink极速上手篇-Flink高级进阶之路-2


第十七周 Flink极速上手篇-Flink高级进阶之路-2

Kafka-Connector

1
大家好,下面呢,我们来看一下flink中针对kafka connect的专题,提供了很多的connect组件,其中应用比较广泛的就是kafka这个connect。我们就针对kafka在flink的应用做详细的分析。针对flink流处里啊,最常用的组件就是kafka。原始日志数据产生后,会被日志采集工具采集到kafka中,让flink去处理。处理之后的数据可能也会继续写入到kafka中。kafka可以作为flink的datasource和datasink来使用。并且kafka中的partition机制和flink的并行度机制可以深度结合,提高数据的读取效率和写入效率。那我们想要在flink中使用kafka,需要添加对应的依赖。(先在flink官网中找到依赖的名字,再到maven中去找符合的版本)
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.11.1</version>
</dependency>

Kafka Consumer的使用

1
那在具体执行这个代码之前啊,我们先需要把那个zookeeper集群,还有kafka集群给他起来。我这些相关的服务呢,已经起来了。这是入K班了,这是卡不卡都已经起来啊。好,那下面注意,我们还需要做一件事情。因为我们在这里面呢,用到了一个T1这个topic,所以说在这我们需要去创建这个topic。找一下之前的命令。嗯。其一。这个分区设置为五,后面因子是为二。发现一个T。好,创建成功。那下面呢,我们就可以去启动代码。启动电板之后,那我们需要往那个卡夫卡里面模拟产生数据。这个时候呢,我们可以启动一个基于控制台的一个生产者来模拟产生数据。嗯。使用这个卡不卡console producer。把这个复制一下。好,这个套背上就是T1。那这个时候呢,我们接着就来模拟产生数据。hello。看到没有消费到。再加一个。hello。没问题吧,是可以的,这样的话我们就可以消费卡夫卡中的数据了。好,这个是代码实践,接下来我们使用这个Java代码来实现一下。先创建一个package。嗯。stream。搞不搞?SS。把这个复制过来。嗯。嗯嗯。嗯。好,首先呢,还是获取一个连环应。execution。因为第2GET。因为。下面的env.S。嗯。在这儿,我们需要去利用这个。Li。卡不卡?three。那这里面啊,传一个topic。嗯。第一,嗯。第二个,你有一个simple。视频,game。第三个pop。嗯嗯。有一个薄。嗯。嗯嗯嗯。首先呢,我们在里面set property。我可以把这个呢直接拿过来。嗯。好,下面呢,said。格布利。卡不卡星本。把它拿过来。嗯。感注释,这个就是指定卡夫卡作为S。嗯。这是指令。普林格卡夫卡consumer的相关配置。接下来呢,我们将读取到的数据啊,一到控制台。嗯。嗯。嗯嗯。嗯。嗯。包的异常。嗯。来把这个启动起来。好,那我们在这边呢,再模拟产生的数据。哈哈哈。没问题吧,是可以的。好,这就是Java代码的一个实现。

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.imooc.scala.kafkaconnector

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
* Flink从kafka中消费数据
* Created by xuwei
*/
object StreamKafkaSourceScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//每隔5000 ms执行一次checkpoint(设置checkpoint的周期)
env.enableCheckpointing(5000)

//针对checkpoint的相关配置
//设置模式为.EXACTLY_ONCE (这是默认值) ,还可以设置为AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//确保两次Checkpoint之间有至少多少 ms的间隔(checkpoint最小间隔)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//Checkpoint必须在一分钟内完成,或者被丢弃(checkpoint的超时时间)
env.getCheckpointConfig.setCheckpointTimeout(60000)
//同一时间只允许执行一个Checkpoint
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置状态数据存储的位置
env.setStateBackend(new RocksDBStateBackend("hdfs://bigdata01:9000/flink/checkpoints",true))


//指定FlinkKafkaConsumer相关配置
val topic = "t1"
val prop = new Properties() prop.setProperty("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092")
prop.setProperty("group.id","con1")
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)

//kafka consumer的消费策略设置
//默认策略,读取group.id对应保存的offset开始消费数据,读取不到则根据kafka中auto.offset.reset参数的值开始消费数据
kafkaConsumer.setStartFromGroupOffsets()
//从最早的记录开始消费数据,忽略已提交的offset信息
//kafkaConsumer.setStartFromEarliest()
//从最新的记录开始消费数据,忽略已提交的offset信息
//kafkaConsumer.setStartFromLatest()
//从指定的时间戳开始消费数据,对于每个分区,其时间戳大于或等于指定时间戳的记录将被作为起始位置
//kafkaConsumer.setStartFromTimestamp(176288819)

//指定kafka作为source
import org.apache.flink.api.scala._
val text = env.addSource(kafkaConsumer)

//将读取到的数据打印到控制台上
text.print()
env.execute("StreamKafkaSourceScala")
}
}
1
2
3
在运行代码之前,需要先启动zookeeper集群和kafka集群
在kafka中创建topic:t1
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 2 --topic t1
1
2
然后启动代码
再启动一个Kafka 的 console生产者模拟产生数据,验证效果。

image-20230423105004127

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.imooc.java.kafkaconnector;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
* Flink从kafka中消费数据
* Created by xuwei
*/
public class StreamKafkaSourceJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//指定FlinkKafkaConsumer相关配置
String topic = "t1";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092");
prop.setProperty("group.id","con1");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);

//指定kafka作为source
DataStreamSource<String> text = env.addSource(kafkaConsumer);

//将读取到的数据打印到控制台
text.print();

env.execute("StreamKafkaSourceJava");
}
}

KafkaConsumer消费策略设置

1
针对卡夫卡康消费数据的时候会有一些策略,我们来看一下。首先这个是默认的消费策略。下面还有一个ear,从最早的开始消费,latest,从最新的开始消费。已经呢,现在要出来一个C呢。按照指定的时间戳往后面开始消费。下面呢,我们来演示一下。我直接在这里面来设置一下。嗯。卡夫卡consumer。的消费策略设置。这个其实我们在讲卡不卡的时候也详细分解过啊,其实是类似的。首先我们看下这个默认策略。嗯。直接使用它来设置点带。start from group of,注意这个呢,其实就默认你不设置它默就类。它是什么意思呢?它会读取。group ID。对应。保存的outside。开始消费数据。那读取不到的话呢。则根据卡夫卡中。这个参数auto点。reite。参数的值开始。消费数据。因为如果说你是第一次使用这个消费者,那么他之前肯定是没有保存这个对应的office的信息,那这样的话呢,他就会根据这个参数的值来开始进行消费。那这个值的话,它那要么是early latest对吧,要么是从最新的,要么是从最近的。那下一次的话呢,他就会根据你之前指定的这个global ID对应的保存的那个开始往下面继续消费数据。那既然下面这个呢,是从那个最早的记录开始,消费主义啊,不搞consumer there that。from earliest。从最早的记录。开始消费。独具。忽略。你提交。信息。这样的话,他就不管你有没有提交,都会每次都从那个最早的数据开始消费。那对了,还有一个从。最新的记录开始消费。也是忽略这个已提交的奥赛的信息。嗯。start。home latest。嗯。那还有一个是从指定的时间戳开始消费数据。对于每个分区。其时间戳大于或等于指定时间戳的记录。江北。作为70位。嗯。that。大的from。这里面你给他传一个时间戳就行了啊,我这边随便写一个行吗?这就是这几种测量啊。其实我们在讲卡不卡的时候,也详细分析过这几种词,那在那就把这个默认的给它打开吧。就你这个呢,你在这儿设置不设置,它其实都是一个默认的策略。这个呢,就是针对这里面这个卡夫卡抗性板,它这个消费策略的一个设置。其实咱们在工作中啊,一般最常见的,那其实就是一种默认的技术。

image-20230423105719815

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.imooc.scala.kafkaconnector

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
* Flink从kafka中消费数据
* Created by xuwei
*/
object StreamKafkaSourceScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

//每隔5000 ms执行一次checkpoint(设置checkpoint的周期)
env.enableCheckpointing(5000)

//针对checkpoint的相关配置
//设置模式为.EXACTLY_ONCE (这是默认值) ,还可以设置为AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//确保两次Checkpoint之间有至少多少 ms的间隔(checkpoint最小间隔)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//Checkpoint必须在一分钟内完成,或者被丢弃(checkpoint的超时时间)
env.getCheckpointConfig.setCheckpointTimeout(60000)
//同一时间只允许执行一个Checkpoint
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置状态数据存储的位置
env.setStateBackend(new RocksDBStateBackend("hdfs://bigdata01:9000/flink/checkpoints",true))


//指定FlinkKafkaConsumer相关配置
val topic = "t1"
val prop = new Properties()
prop.setProperty("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092")
prop.setProperty("group.id","con1")
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)

//kafka consumer的消费策略设置
//默认策略,读取group.id对应保存的offset开始消费数据,读取不到则根据kafka中auto.offset.reset参数的值开始消费数据
kafkaConsumer.setStartFromGroupOffsets()
//从最早的记录开始消费数据,忽略已提交的offset信息
//kafkaConsumer.setStartFromEarliest()
//从最新的记录开始消费数据,忽略已提交的offset信息
//kafkaConsumer.setStartFromLatest()
//从指定的时间戳开始消费数据,对于每个分区,其时间戳大于或等于指定时间戳的记录将被作为起始位置
//kafkaConsumer.setStartFromTimestamp(176288819)


//指定kafka作为source
import org.apache.flink.api.scala._
val text = env.addSource(kafkaConsumer)

//将读取到的数据打印到控制台上
text.print()

env.execute("StreamKafkaSourceScala")
}
}

KafkaConsumer的容错

1
2
3
4
5
6
7
下Flink中也有checkpoint机制,Checkpoint是Flink实现容错机制的核心功能,它能够根据配置周期性地基于流中各个算子任务的State来生成快照,从而将这些State数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

当CheckPoint机制开启的时候,Consumer会定期把Kafka的offset信息还有其它算子任务的State信息一块保存起来
当Job失败重启的时候,Flink会从最近一次的CheckPoint中进行恢复数据,重新消费Kafka中的数据

为了能够使用支持容错的Consumer,需要开启checkpoint
那如何开启呢?
1
2
//每隔5000 ms执行一次Checkpoint(设置Checkpoint的周期)
env.enableCheckpointing(5000)
1
那针对这个呢,它还有一些相关的配置。那我接着呢,把这个配置拿过来。把这个复制一下。搞一下包。这个是。针对checkpoint的相关配置。下面这个参数的意思呢?表示设置一下checkpoint的一个语义,它可以提供这种锦一词的语义。下面这个呢,表示两次切之间它的一个时间间隔。这个呢,表示呢,必须要在指定时间之内完成one。其实就是给这个check呢,设置一个超时时间,超过这个时间了就被丢弃了。下面这个呢,表示呢,同一时间只允许执行一个checkpoint。下面这个三注意。他呢表示呀,当我们对这个link程序执行一个cancel之后,就是把这个link程序停掉之后,我们呢,会保留这个这个波段数据,这样的话,我们可以根据实际需要,后期呢来恢复这些数据。这是它相关的一些配置啊,
1
2
3
4
5
6
7
8
9
10
//设置模式为.EXACTLY_ONCE (这是默认值) ,还可以设置为AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//确保两次Checkpoint之间有至少多少 ms的间隔(checkpoint最小间隔)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//Checkpoint必须在一分钟内完成,或者被丢弃(checkpoint的超时时间)
env.getCheckpointConfig.setCheckpointTimeout(60000)
//同一时间只允许执行一个Checkpoint
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
1
2
3
env.enableCheckpointing(5000)用于设置checkpoint的时间间隔,即每5000毫秒触发一次checkpoint。而env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500)用于设置两次checkpoint之间的最短时间间隔,即两次checkpoint之间至少要间隔500毫秒。

这意味着,如果某一次checkpoint花费了超过4500毫秒的时间,那么下一次checkpoint将不会立即开始,而是会等待至少500毫秒后才开始。

State数据存储的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
最后还有一个配置,设置State数据存储的位置
默认情况下,State数据会保存在TaskManager的内存中,Checkpoint执行时,会将State数据存储在JobManager的内存中。

具体的存储位置取决于State Backend的配置,Flink一共提供了3种存储方式

MemoryStateBackend
State数据保存在Java堆内存中,执行Checkpoint的时候,会把State的快照数据保存到JobManager的内存中,基于内存的State Backend在生产环境下不建议使用。

FsStateBackend
State数据保存在TaskManager的内存中,执行Checkpoint的时候,会把State的快照数据保存到配置的文件系统中,可以使用HDFS等分布式文件系统。

RocksDBStateBackend
RocksDB跟上面的都略有不同,它会在本地文件系统中维护State,State会直接写入本地RocksDB中。同时它需要配置一个远端的文件系统(一般是HDFS),在做Checkpoint的时候,会把本地的数据直接复制到远端的文件系统中。故障切换的时候直接从远端的文件系统中恢复数据到本地。RocksDB克服了State受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产环境中使用。
RocksDBStateBackend
scala
1
2
3
4
5
6
7
8
9
10
11
12
所以在这里我们使用第三种:RocksDBStateBackend
针对RocksDBStateBackend需要引入依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.11.1</version>
</dependency>


//设置状态数据存储的位置
env.setStateBackend(new RocksDBStateBackend("hdfs://bigdata01:9000/flink/checkpoints",true))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.imooc.scala.kafkaconnector

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
* Flink从kafka中消费数据
* Created by xuwei
*/
object StreamKafkaSourceScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

//每隔5000 ms执行一次checkpoint(设置checkpoint的周期)
env.enableCheckpointing(5000)

//针对checkpoint的相关配置
//设置模式为.EXACTLY_ONCE (这是默认值) ,还可以设置为AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//确保两次Checkpoint之间有至少多少 ms的间隔(checkpoint最小间隔)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//Checkpoint必须在一分钟内完成,或者被丢弃(checkpoint的超时时间)
env.getCheckpointConfig.setCheckpointTimeout(60000)
//同一时间只允许执行一个Checkpoint
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置状态数据存储的位置
env.setStateBackend(new RocksDBStateBackend("hdfs://bigdata01:9000/flink/checkpoints",true))


//指定FlinkKafkaConsumer相关配置
val topic = "t1"
val prop = new Properties()
prop.setProperty("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092")
prop.setProperty("group.id","con1")
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)

//kafka consumer的消费策略设置
//默认策略,读取group.id对应保存的offset开始消费数据,读取不到则根据kafka中auto.offset.reset参数的值开始消费数据
kafkaConsumer.setStartFromGroupOffsets()
//从最早的记录开始消费数据,忽略已提交的offset信息
//kafkaConsumer.setStartFromEarliest()
//从最新的记录开始消费数据,忽略已提交的offset信息
//kafkaConsumer.setStartFromLatest()
//从指定的时间戳开始消费数据,对于每个分区,其时间戳大于或等于指定时间戳的记录将被作为起始位置
//kafkaConsumer.setStartFromTimestamp(176288819)


//指定kafka作为source
import org.apache.flink.api.scala._
val text = env.addSource(kafkaConsumer)

//将读取到的数据打印到控制台上
text.print()

env.execute("StreamKafkaSourceScala")
}
}

image-20230423210612299

image-20230423210628262

image-20230423210654942

image-20230423210749485

Kafka Consumers Offset自动提交

1
2
3
Kafka Consumers Offset自动提交机制需要根据Job是否开启Checkpoint来区分。
CheckPoint关闭时:通过参数enable.auto.commit和auto.commit.interval.ms控制
CheckPoint开启时:执行CheckPoint的时候才会提交offset,此时kafka中的自动提交机制就会被忽略

image-20230423211307520

KafkaProducer的使用

1
2
3
下面我们来看一下在flink中如何向kafka中写数据,此时需要用到kafka producer。

所有的数据都写入指定topic的一个分区里面。注意,他会把所有数据写到这个topic的一个分区。那这样的话,其实呢,在我们实习当中,这样是不合适的啊。我们使用操作的肯定是要使用多个分区,你要把数据分别写到不同的分区里面,这样的话后期我们去消费也可以并行消费,提高消费能力,对吧?那你如果都搞一个分区里面,那其实相当于我这个topic卡就一个分区。这样后期我这个处理能力是有限制的,如果不想自定义分区器,也不想使用默认的,可以直接使用null即可

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.imooc.scala.kafkaconnector

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner

/**
* Flink向Kafka中生产数据
* Created by xuwei
*/
object StreamKafkaSinkScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

//开启checkpoint
//env.enableCheckpointing(5000)

val text = env.socketTextStream("bigdata04", 9001)

//指定FlinkKafkaProducer的相关配置
val topic = "t2"
val prop = new Properties()
prop.setProperty("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092")

//指定kafka作为sink
/*
KafkaSerializationSchemaWrapper的几个参数
1:topic:指定需要写入的topic名称即可
2:partitioner,通过自定义分区器实现将数据写入到指定topic的具体分区中
默认会使用FlinkFixedPartitioner,它表示会将所有的数据都写入指定topic的一个分区里面
如果不想自定义分区器,也不想使用默认的,可以直接使用null即可
3:writeTimeStamp,向topic中写入数据的时候,是否写入时间戳
如果写入了,那么在watermark的案例中,使用extractTimestamp()提起时间戳的时候
就可以直接使用previousElementTimestamp即可,它表示的就是我们在这里写入的数据对应的timestamp
*/
val kafkaProducer = new FlinkKafkaProducer[String](topic, new KafkaSerializationSchemaWrapper[String](topic, null, false, new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
text.addSink(kafkaProducer)

env.execute("StreamKafkaSinkScala")
}
}
1
2
3
所以,如果我们不需要自定义分区器的时候,直接传递为null即可,不要使用FlinkFixedPartitioner,它会将数据都写入到topic的一个分区中。

将FlinkFixedPartitioner设置为null

image-20230423213547295

image-20230423213734278

image-20230423213834818

1
cmak里查看

image-20230423214449177

1
2
3
如果指定的默认FlinkFixedPartitioner

new FlinkFixedPartitioner[String]()

image-20230531123033483

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.imooc.java.kafkaconnector;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;

import java.util.Properties;

/**
* Flink向Kafka中生产数据
* Created by xuwei
*/
public class StreamKafkaSinkJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> text = env.socketTextStream("bigdata04", 9001);

//指定FlinkKafkaProducer相关配置
String topic = "t2";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092");

//指定kafak作为sink
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(topic, new KafkaSerializationSchemaWrapper<String>(topic, null, false, new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
text.addSink(kafkaProducer);

env.execute("StreamKafkaSinkJava");

}
}

KafkaProducer的容错

1
2
3
4
5
6
7
8
如果Flink开启了CheckPoint,针对FlinkKafkaProducer可以提供EXACTLY_ONCE的语义保证

可以通过semantic参数来选择三种不同的语义:
Semantic.NONE、Semantic.AT_LEAST_ONCE【默认】、Semantic.EXACTLY_ONCE

来看一下。注意看到没有,我们刚才指的就是一个锦一四啊。但是这个时候注意你还要开启这个table。开启这个了。下面那些参数我暂时就先不指定了,行吗?好,那下面呢,我们来执行一下。来开启这个稍的。好注意这时候呢,给大家看一个比较神奇的现象。你看刚才我们把这个搜下打开啊,结果它停了,你再把它打开。他还会请。看没有?什么原因呢?那时候我这块也没有报错呀。注意这个呢,是因为这个原因。我们之前啊,在这加了一个logo的配置文件,对吧。注意这个日级别,我之前给它改成error。我们把这个调一下。调成那个警告级别。因为这个时候有一些日他没有打出来警告信息看不到啊。来再启动把这个打开。对,他这个其实应该是error级别的,但是他写的什么写的不太好,他把这个日志写成那种warning级别,警告级别的,所以说呢,我们之前使用那个error级别的,监控不到这些日志信息啊。啊,停一下吧。

来分析一下啊。不要往后面看这。还有什么呀,这个事物时间比这个博客里面配置的这个时间还要大。就是说,生产者中设置的事物超时时间大于卡夫卡博客中设置的事物超时时间。因为卡夫卡服务中默认事物的超时时间是15分钟,但是呢,弗林格卡夫卡保留它里面设置的事物超时间默认是一小时,这个仅一次语义啊,它需要依赖这个事物。如果从Li应用程序崩溃到完全重启的时间超过了卡夫卡的事物超时时间,那么将会有数据丢失,所以我们需要合理的配置事物超时时间。因此,在使用这个仅一次语义之前,建议增加卡夫卡博克中这个transaction.max.timeout.ms的值。把这个值啊给它调大。那下面呢,我们就来修改一下卡夫卡里面这个配置,这个配置在哪啊,其实就那个server.properties里面啊。买那个可以试一下。它里面是没有这个参数的,你直接在这把它拿过来。给它做个值。我们也给它改成一小时吧。这个你转换成毫秒是3600000。那么是五个零啊,这样的话就一小时。把这个复制一下。对,这个集群里面所有机器都要改啊。嗯。好,可以了,注意改完之后我们需要重写。那你先把这个卡夫卡集群停掉。好停掉之后再去启动,启动的话,我们使用它这个命令啊。前面加了一个GMX,这样的话我们可以使那个CMA来减轻它里面一些信息啊。嗯。好,这个起来了。嗯。嗯。这个呢也可以啊。好,这个也可以了。那接下来我们重新再执行这个样本,对吧,把这个再看一下。嗯。看到没有,此时他就不报错了啊,我们可以在这来验证一下,先确一下里面的数据对吧。是这样。5211。这个停了,因为刚才我们把那个卡夫卡停掉之后啊,这个c map哎,就停掉了。把它起来。所以这个没不对。嗯。嗯。第三。对吧,这里面是这了来。我们输点作业。好变了吧,对吧。说明这个数据写进来了,并且这块呢也没报错啊。OK,这样就可以了。

image-20230423215255115

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.imooc.scala.kafkaconnector

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner

/**
* Flink向Kafka中生产数据
* Created by xuwei
*/
object StreamKafkaSinkScala{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//开启checkpoint
env.enableCheckpointing(5000)

val text = env.socketTextStream("bigdata04", 9001)

//指定FlinkKafkaProducer相关配置
val topic = "t3"
val prop = new Properties()
prop.setProperty("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092")

//指定kafka作为sink
/*
KafkaSerializationSchemaWrapper的几个参数
1:topic,指定需要写入的topic名称即可
2:partitioner,通过自定义分区器实现将数据写入到指定topic的具体分区中,
默认会使用FlinkFixedPartitioner,它表示会将所有的数据都写入指定topic的一个分区里面
如果不想自定义分区器,也不想使用默认的,可以直接使用null即可
3:writeTimeStamp,向topic中写入数据的时候,是否写入时间戳,
如果写入了,那么在watermark的常见中,使用extractTimestamp()提取时间戳的时候,
就可以直接使用previousElementTimestamp即可,它表示的就是我们在这里写入的数据对应的timestamp
4:SerializationSchema,数据解析规则,默认使用string类型的数据解析规则即可
*/
val kafkaProducer = new FlinkKafkaProducer[String](topic, new KafkaSerializationSchemaWrapper[String](topic, null,false, new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
text.addSink(kafkaProducer)

env.execute("StreamKafkaSinkScala")
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
注意:此时执行代码会发现无法正常执行,socket打开之后,启动代码,会发现socket监听会自动断开,表示代码执行断开了

log4j.rootLogger=warn,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

但是此时在idea中看不到任何报错信息,主要是因为我们之前把日志级别改为error级别了,把日志级别调整为warn之后就可以看到报错信息了

2020-08-12 19:21:59,759 [Sink: Unnamed (3/8)] [org.apache.flink.runtime.taskmanager.Task] [WARN] - Sink: Unnamed (3/8) (1b621d88e460877995ad37d34379c166) switched from RUNNING to FAILED.
org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1151)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
at java.lang.Thread.run(Thread.java:748)
1
2
3
提示生产者中设置的事务超时时间大于broker中设置的事务超时时间。

因为Kafka服务中默认事务的超时时间是15min,但是FlinkKafkaProducer里面设置的事务超时时间默认是1h。EXACTLY_ONCE模式依赖于事务,如果从Flink应用程序崩溃到完全重启的时间超过了Kafka的事务超时时间,那么将会有数据丢失,所以我们需要合理地配置事务超时时间,因此在使用EXACTLY_ONCE模式之前建议增加Kafka broker中transaction.max.timeout.ms 的值。
1
2
3
4
5
6
7
8
9
10
11
12
下面我们需要修改kafka中的server.properties配置文件
bigdata01、bigdata02、bigdata03都需要修改

[root@bigdata01 kafka_2.12-2.4.1]# vi config/server.properties
...
transaction.max.timeout.ms=3600000
[root@bigdata02 kafka_2.12-2.4.1]# vi config/server.properties
...
transaction.max.timeout.ms=3600000
[root@bigdata03 kafka_2.12-2.4.1]# vi config/server.properties
...
transaction.max.timeout.ms=3600000
1
2
3
4
5
6
改完配置文件之后,重启kafka
[root@bigdata01 kafka_2.12-2.4.1]# JMX_PORT=9988 bin/kafka-server-start.sh -daemon config/server.properties
[root@bigdata02 kafka_2.12-2.4.1]# JMX_PORT=9988 bin/kafka-server-start.sh -daemon config/server.properties
[root@bigdata03 kafka_2.12-2.4.1]# JMX_PORT=9988 bin/kafka-server-start.sh -daemon config/server.properties

重新执行Flink代码,此时就不报错了。

本文标题:大数据开发工程师-第十七周 Flink极速上手篇-Flink高级进阶之路-2

文章作者:TTYONG

发布时间:2023年04月20日 - 16:04

最后更新:2023年06月01日 - 12:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%B8%83%E5%91%A8-Flink%E6%9E%81%E9%80%9F%E4%B8%8A%E6%89%8B%E7%AF%87-Flink%E9%AB%98%E7%BA%A7%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF-2.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%