第十七周 Flink极速上手篇-Flink高级进阶之路-2
Kafka-Connector
1 | 大家好,下面呢,我们来看一下flink中针对kafka connect的专题,提供了很多的connect组件,其中应用比较广泛的就是kafka这个connect。我们就针对kafka在flink的应用做详细的分析。针对flink流处里啊,最常用的组件就是kafka。原始日志数据产生后,会被日志采集工具采集到kafka中,让flink去处理。处理之后的数据可能也会继续写入到kafka中。kafka可以作为flink的datasource和datasink来使用。并且kafka中的partition机制和flink的并行度机制可以深度结合,提高数据的读取效率和写入效率。那我们想要在flink中使用kafka,需要添加对应的依赖。(先在flink官网中找到依赖的名字,再到maven中去找符合的版本) |
1 | <dependency> |
Kafka Consumer的使用
1 | 那在具体执行这个代码之前啊,我们先需要把那个zookeeper集群,还有kafka集群给他起来。我这些相关的服务呢,已经起来了。这是入K班了,这是卡不卡都已经起来啊。好,那下面注意,我们还需要做一件事情。因为我们在这里面呢,用到了一个T1这个topic,所以说在这我们需要去创建这个topic。找一下之前的命令。嗯。其一。这个分区设置为五,后面因子是为二。发现一个T。好,创建成功。那下面呢,我们就可以去启动代码。启动电板之后,那我们需要往那个卡夫卡里面模拟产生数据。这个时候呢,我们可以启动一个基于控制台的一个生产者来模拟产生数据。嗯。使用这个卡不卡console producer。把这个复制一下。好,这个套背上就是T1。那这个时候呢,我们接着就来模拟产生数据。hello。看到没有消费到。再加一个。hello。没问题吧,是可以的,这样的话我们就可以消费卡夫卡中的数据了。好,这个是代码实践,接下来我们使用这个Java代码来实现一下。先创建一个package。嗯。stream。搞不搞?SS。把这个复制过来。嗯。嗯嗯。嗯。好,首先呢,还是获取一个连环应。execution。因为第2GET。因为。下面的env.S。嗯。在这儿,我们需要去利用这个。Li。卡不卡?three。那这里面啊,传一个topic。嗯。第一,嗯。第二个,你有一个simple。视频,game。第三个pop。嗯嗯。有一个薄。嗯。嗯嗯嗯。首先呢,我们在里面set property。我可以把这个呢直接拿过来。嗯。好,下面呢,said。格布利。卡不卡星本。把它拿过来。嗯。感注释,这个就是指定卡夫卡作为S。嗯。这是指令。普林格卡夫卡consumer的相关配置。接下来呢,我们将读取到的数据啊,一到控制台。嗯。嗯。嗯嗯。嗯。嗯。包的异常。嗯。来把这个启动起来。好,那我们在这边呢,再模拟产生的数据。哈哈哈。没问题吧,是可以的。好,这就是Java代码的一个实现。 |
scala
1 | package com.imooc.scala.kafkaconnector |
1 | 在运行代码之前,需要先启动zookeeper集群和kafka集群 |
1 | 然后启动代码 |

java
1 | package com.imooc.java.kafkaconnector; |
KafkaConsumer消费策略设置
1 | 针对卡夫卡康消费数据的时候会有一些策略,我们来看一下。首先这个是默认的消费策略。下面还有一个ear,从最早的开始消费,latest,从最新的开始消费。已经呢,现在要出来一个C呢。按照指定的时间戳往后面开始消费。下面呢,我们来演示一下。我直接在这里面来设置一下。嗯。卡夫卡consumer。的消费策略设置。这个其实我们在讲卡不卡的时候也详细分解过啊,其实是类似的。首先我们看下这个默认策略。嗯。直接使用它来设置点带。start from group of,注意这个呢,其实就默认你不设置它默就类。它是什么意思呢?它会读取。group ID。对应。保存的outside。开始消费数据。那读取不到的话呢。则根据卡夫卡中。这个参数auto点。reite。参数的值开始。消费数据。因为如果说你是第一次使用这个消费者,那么他之前肯定是没有保存这个对应的office的信息,那这样的话呢,他就会根据这个参数的值来开始进行消费。那这个值的话,它那要么是early latest对吧,要么是从最新的,要么是从最近的。那下一次的话呢,他就会根据你之前指定的这个global ID对应的保存的那个开始往下面继续消费数据。那既然下面这个呢,是从那个最早的记录开始,消费主义啊,不搞consumer there that。from earliest。从最早的记录。开始消费。独具。忽略。你提交。信息。这样的话,他就不管你有没有提交,都会每次都从那个最早的数据开始消费。那对了,还有一个从。最新的记录开始消费。也是忽略这个已提交的奥赛的信息。嗯。start。home latest。嗯。那还有一个是从指定的时间戳开始消费数据。对于每个分区。其时间戳大于或等于指定时间戳的记录。江北。作为70位。嗯。that。大的from。这里面你给他传一个时间戳就行了啊,我这边随便写一个行吗?这就是这几种测量啊。其实我们在讲卡不卡的时候,也详细分析过这几种词,那在那就把这个默认的给它打开吧。就你这个呢,你在这儿设置不设置,它其实都是一个默认的策略。这个呢,就是针对这里面这个卡夫卡抗性板,它这个消费策略的一个设置。其实咱们在工作中啊,一般最常见的,那其实就是一种默认的技术。 |

scala
1 | package com.imooc.scala.kafkaconnector |
KafkaConsumer的容错
Flink Checkpoint
1 | 下Flink中也有checkpoint机制,Checkpoint是Flink实现容错机制的核心功能,它能够根据配置周期性地基于流中各个算子任务的State来生成快照,从而将这些State数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。 |
1 | //每隔5000 ms执行一次Checkpoint(设置Checkpoint的周期) |
1 | 那针对这个呢,它还有一些相关的配置。那我接着呢,把这个配置拿过来。把这个复制一下。搞一下包。这个是。针对checkpoint的相关配置。下面这个参数的意思呢?表示设置一下checkpoint的一个语义,它可以提供这种锦一词的语义。下面这个呢,表示两次切之间它的一个时间间隔。这个呢,表示呢,必须要在指定时间之内完成one。其实就是给这个check呢,设置一个超时时间,超过这个时间了就被丢弃了。下面这个呢,表示呢,同一时间只允许执行一个checkpoint。下面这个三注意。他呢表示呀,当我们对这个link程序执行一个cancel之后,就是把这个link程序停掉之后,我们呢,会保留这个这个波段数据,这样的话,我们可以根据实际需要,后期呢来恢复这些数据。这是它相关的一些配置啊, |
1 | //设置模式为.EXACTLY_ONCE (这是默认值) ,还可以设置为AT_LEAST_ONCE |
1 | env.enableCheckpointing(5000)用于设置checkpoint的时间间隔,即每5000毫秒触发一次checkpoint。而env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500)用于设置两次checkpoint之间的最短时间间隔,即两次checkpoint之间至少要间隔500毫秒。 |
State数据存储的位置
1 | 最后还有一个配置,设置State数据存储的位置 |
RocksDBStateBackend
scala
1 | 所以在这里我们使用第三种:RocksDBStateBackend |
1 | package com.imooc.scala.kafkaconnector |




Kafka Consumers Offset自动提交
1 | Kafka Consumers Offset自动提交机制需要根据Job是否开启Checkpoint来区分。 |

KafkaProducer的使用
1 | 下面我们来看一下在flink中如何向kafka中写数据,此时需要用到kafka producer。 |
scala
1 | package com.imooc.scala.kafkaconnector |
1 | 所以,如果我们不需要自定义分区器的时候,直接传递为null即可,不要使用FlinkFixedPartitioner,它会将数据都写入到topic的一个分区中。 |



1 | cmak里查看 |

1 | 如果指定的默认FlinkFixedPartitioner |

java
1 | package com.imooc.java.kafkaconnector; |
KafkaProducer的容错
1 | 如果Flink开启了CheckPoint,针对FlinkKafkaProducer可以提供EXACTLY_ONCE的语义保证 |

1 | package com.imooc.scala.kafkaconnector |
1 | 注意:此时执行代码会发现无法正常执行,socket打开之后,启动代码,会发现socket监听会自动断开,表示代码执行断开了 |
1 | 提示生产者中设置的事务超时时间大于broker中设置的事务超时时间。 |
1 | 下面我们需要修改kafka中的server.properties配置文件 |
1 | 改完配置文件之后,重启kafka |