大数据开发工程师-第十四周 消息队列之Kafka从入门到小牛-5


第十四周 消息队列之Kafka从入门到小牛-5

实战:Flume集成Kafka

1
2
3
4
5
6
7
8
在实际工作中flume和kafka会深度结合使用
1:flume采集数据,将数据实时写入kafka
2:flume从kafka中消费数据,保存到hdfs,做数据备份

下面我们就来看一个综合案例
使用flume采集日志文件中产生的实时数据,写入到kafka中,然后再使用flume从kafka中将数据消费出来,保存到hdfs上面
那为什么不直接使用flume将采集到的日志数据保存到hdfs上面呢?
因为中间使用kafka进行缓冲之后,后面既可以实现实时计算,又可以实现离线数据备份,最终实现离线计算,所以这一份数据就可以实现两种需求,使用起来很方便,所以在工作中一般都会这样做。

image-20230417140517700

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
下面我们来实现一下这个功能
其实在Flume中,针对Kafka提供的有KafkaSource和KafkaSink
KafkaSource是从kafka中读取数据
KafkaSink是向kafka中写入数据

所以针对我们目前这个架构,主要就是配置Flume的Agent。
需要配置两个Agent:
第一个Agent负责实时采集日志文件,将采集到的数据写入Kafka中
第二个Agent负责从Kafka中读取数据,将数据写入HDFS中进行备份(落盘)
针对第一个Agent:
source:ExecSource,使用tail -F监控日志文件即可
channel:MemoryChannel
sink:KafkaSink

针对第二个Agent
Source:KafkaSource
channel:MemoryChannel
sink:HdfsSink

这里面这些组件其实只有KafkaSource和KafkaSink我们没有使用过,其它的组件都已经用过了。

配置Agent

file-to-kafka.conf

1
2
下面来配置第一个Agent:
文件名为: file-to-kafka.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/test.log

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

# 指定topic名称
a1.sinks.k1.kafka.topic = test_r2p5
# 指定kafka地址,多个节点地址使用逗号分割
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03
# 一次向kafka中写多少条数据,默认值为100,在这里为了演示方便,改为1
# 在实际工作中这个值具体设置多少需要在传输效率和数据延迟上进行取舍
# 如果kafka后面的实时计算程序对数据的要求是低延迟,那么这个值小一点比较好
# 如果kafka后面的实时计算程序对数据延迟没什么要求,那么就考虑传输性能,一次多传输一些
# 建议这个值的大小和ExecSource每秒钟采集的数据量大致相等,这样不会频繁向kafka中写数
a1.sinks.k1.kafka.flumeBatchSize = 1
a1.sinks.k1.kafka.producer.acks = 1
# 一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去
# linger.ms和flumeBatchSize(不积到设置的条数,则一直不写入到topic),哪个先满足先按哪个规则执行,这个值默认是0,在这设置为1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 指定数据传输时的压缩格式,对数据进行压缩,提高传输效率
a1.sinks.k1.kafka.producer.compression.type = snappy
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
kafka的producer的相关参数,可以直接在这里设置:a1.sinks.k1.kafka.producer.+。。。

kafka-to-hdfs.conf

1
2
下面来配置第二个Agent:
文件名为: kafka-to-hdfs.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 一次性向channel中写入的最大数据量,在这为了演示方便,设置为1
# 这个参数的值不要大于MemoryChannel中transactionCapacity的值
a1.sources.r1.batchSize = 1
# 最大多长时间向channel写一次数据
a1.sources.r1.batchDurationMillis = 2000
# kafka地址
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata
# topic名称,可以指定一个或者多个,多个topic之间使用逗号隔开
# 也可以使用正则表达式指定一个topic名称规则
a1.sources.r1.kafka.topics = test_r2p5
# 指定消费者组id
a1.sources.r1.kafka.consumer.group.id = flume-con1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata01:9000/kafkaout
a1.sinks.k1.hdfs.filePrefix = data-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
在bigdata04机器的flume目录下复制两个目录
[root@bigdata04 apache-flume-1.9.0-bin]# cd /data/soft/apache-flume-1.9.0-bin
[root@bigdata04 apache-flume-1.9.0-bin]# cp -r conf conf-file-to-kafka
[root@bigdata04 apache-flume-1.9.0-bin]# cp -r conf conf-kafka-to-hdfs

修改 conf_file_to_kafka和conf_kafka_to_hdfs中log4j的配置
[root@bigdata04 apache-flume-1.9.0-bin]# cd conf-file-to-kafka
[root@bigdata04 conf_file_to_kafka]# vi log4j.properties
flume.root.logger=ERROR,LOGFILE
flume.log.file=flume-file-to-kafka.log

[root@bigdata04 apache-flume-1.9.0-bin]# cd conf-kafka-to-hdfs
[root@bigdata04 conf_kafka_to_hdfs]# vi log4j.properties
flume.root.logger=ERROR,LOGFILE
flume.log.file=flume-kafka-to-hdfs.log
1
2
3
4
5
6
7
8
把刚才配置的两个Agent的配置文件复制到这两个目录下
[root@bigdata04 apache-flume-1.9.0-bin]# cd conf-file-to-kafka
[root@bigdata04 conf-file-to-kafka]# vi file-to-kafka.conf
.....把file-to-kafka.conf文件中的内容复制进来即可

[root@bigdata04 apache-flume-1.9.0-bin]# cd conf-kafka-to-hdfs/
[root@bigdata04 conf-kafka-to-hdfs]# vi kafka-to-hdfs.conf
.....把kafka-to-hdfs.conf文件中的内容复制进来即可

启动Flume Agent

1
2
3
4
5
6
启动这两个Flume Agent
确保zookeeper集群、kafka集群和Hadoop集群是正常运行的
以及Kafka中的topic需要提前创建好

创建topic
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 -partions 5 --replication-factor 2 --topic test_r2p5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
先启动第二个Agent,再启动第一个Agent
[root@bigdata04 apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf-kafka-to-hdfs --conf-file conf-kafka-to-hdfs/kafka-to-hdfs.conf

[root@bigdata04 apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf-file-to-kafka --conf-file conf-file-to-kafka/file-to-kafka.conf

模拟产生日志数据
[root@bigdata04 ~]# cd /data/log/
[root@bigdata04 log]# echo hello world >> /data/log/test.log

到HDFS上查看数据,验证结果:
[root@bigdata04 ~]# hdfs dfs -ls /kafkaout
Found 1 items
-rw-r--r-- 2 root supergroup 12 2020-06-09 22:59 /kafkaout/data-.15
[root@bigdata04 ~]# hdfs dfs -cat /kafkaout/data-.1591714755267.tmp
hello world

此时Flume可以通过tail -F命令实时监控文件中的新增数据,发现有新数据就写入kafka,然后kafka后面的flume落盘程序,以及kafka后面的实时计算程序就可以使用这份数据了。

实战:Kafka集群平滑升级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
之前我们在使用 Kafka 0.9.0.0版本的时候,遇到一个比较诡异的问题
(背景:这个版本他们遇到一个问题,官方通过升级kafka版本解决了,但他们之前的版本工作中运用于直播平台,所以不可能将集群停了重新部署一套)
针对消费者组增加消费者的时候可能会导致rebalance,进而导致部分consumer不能再消费分区数据
意思就是之前针对这个topic的5个分区只有2个消费者消费数据,后期我动态的把消费者调整为了5个,这样可能会导致部分消费者无法消费分区中的数据。

针对这个bug这里有一份详细描述:
https://issues.apache.org/jira/browse/KAFKA-2978
此bug官方在0.9.0.1版本中进行了修复
当时我们的线上集群使用的就是0.9.0.0的版本。

所以我们需要对线上集群在不影响线上业务的情况下进行升级,称为平滑升级(滚动升级),也就是升级的时候不影响线上的正常业务运行(但还是要选择在业务低峰期时进行升级)。

接下来我们就查看了官网文档(0.9.0.0),上面有针对集群平滑升级的一些信息
http://kafka.apache.org/090/documentation.html#upgrade
在验证这个升级流程的时候我们是在测试环境下,先模拟线上的集群环境,进行充分测试,可千万不能简单测试一下就直接搞到测试环境去做,这样是很危险的。
由于当时这个kafka集群我们还没有移交给运维负责,并且运维当时对这个框架也不是很熟悉,所以才由我们开发人员来进行平滑升级,否则这种框架升级的事情肯定是交给运维去做的。

那接下来看一下具体的平滑升级步骤
小版本之间集群升级不需要额外修改集群的配置文件。只需要按照下面步骤去执行即可。
假设kafka0.9.0.0集群在三台服务器上,需要把这三台服务器上的kafka集群升级到0.9.0.1版本
1
2
3
注意:提前在集群的三台机器上把0.9.0.1的安装包,解压、配置好。
主要是log.dirs这个参数,0.9.0.1中的这个参数和0.9.0.0的这个参数一定要保持一致,这样新版本的kafka才可以识别之前的kakfa中的数据。
在集群升级的过程当中建议通过CMAK(kafkamanager)查看集群的状态信息,比较方便

image-20230417152957587

1
1:先stop掉0.9.0.0集群中的第一个节点,然后去CMAK上查看集群的broker信息,确认节点确实已停掉。并且再查看一下,节点的副本下线状态。确认集群是否识别到副本下线状态。

image-20230417153133889

image-20230417153328093

1
然后在当前节点把kafka0.9.0.1启动起来。再回到CMAK中查看broker信息,确认刚启动的节点是否已正确显示,并且还要确认这个节点是否可以正常接收和发送数据。

image-20230417153455185

1
2
3
2:按照第一步的流程去依次操作剩余节点即可,就是先把0.9.0.0版本的kafka停掉,再把0.9.0.1版本的kafka启动即可。

注意:每操作一个节点,需要稍等一下,确认这个节点可以正常接收和发送数据之后,再处理下一个节点。

本文标题:大数据开发工程师-第十四周 消息队列之Kafka从入门到小牛-5

文章作者:TTYONG

发布时间:2023年04月16日 - 01:04

最后更新:2023年04月17日 - 15:04

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%9B%9B%E5%91%A8-%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E4%B9%8BKafka%E4%BB%8E%E5%85%A5%E9%97%A8%E5%88%B0%E5%B0%8F%E7%89%9B-5.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%