大数据开发工程师-第十四周 消息队列之Kafka从入门到小牛-2


大数据开发工程师-第十四周 消息队列之Kafka从入门到小牛-2

Kafka使用初体验

Kafka中Topic的操作

1
2
3
kafka集群安装好了以后我们就想向kafka中添加一些数据
想要添加数据首先需要创建topic
那接下来看一下针对topic的一些操作

新增Topic

1
2
3
4
5
6
7
指定2个分区,2个副本,注意:副本数不能大于集群中Broker的数量

因为每个partition的副本必须保存在不同的broker,否则没有意义,如果partition的副本都保存在同一个broker,那么这个broker挂了,则partition数据依然会丢失

在这里我使用的是3个节点的kafka集群,所以副本数我就暂时设置为2,最大可以设置为3

如果你们用的是单机kafka的话,这里的副本数就只能设置为1了,这个需要注意一下
1
2
3
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --replication-factor 2
--topic hello
Created topic hello.

查询Topic

查询Kafka中的所有Topic列表
1
2
3
4
5
6
查询Kafka中的所有Topic列表以及查看指定Topic的详细信息
查询kafka中所有的topic列表
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --list --zookeeper localhost:2181
hello

topic数据是存在zookeeper中,所以直接指定zookeeper地址就可以了(有些地方需要指定kafka地址)
查看指定Topic的详细信息
1
2
3
4
5
查看指定topic的详细信息
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello
Topic: hello PartitionCount: 2 ReplicationFactor: 2 Configs:
Topic: hello Partition: 0 Leader: 2 Replicas: 2,0 Is
Topic: hello Partition: 1 Leader: 0 Replicas: 0,1 Is

image-20230415223043384

1
2
3
4
5
6
7
8
9
10
第一个行显示指定topic所有partitions的一个总结
PartitionCount:表示这个Topic一共有多少个partition
ReplicationFactor:表示这个topic中partition的副本因子是几个
Config:这个表示创建Topic时动态指定的配置信息,在这我们没有额外指定配置信息

下面每一行给出的是一个partition的信息,如果只有一个partition,则只显示一行。
Topic:显示当前的topic名称
Partition:显示当前topic的partition编号
Leader:Leader partition所在的节点编号,这个编号其实就是broker.id的值,
来看这个图:

image-20230415223420514

1
2
3
4
5
6
7
8
这个图里面的hello这个topic有两个partition,其中partition1的leader所在的节点是broker1,partition2的leader所在的节点是broker2

Replicas:当前partition所有副本所在的节点编号【包含Leader所在的节点】,如果设置多个副本的话,这里会显示多个,不管该节点是否是Leader以及是否存活。

Isr:当前partition处于同步状态的所有节点,这里显示的所有节点都是存活状态的,并且跟Leader同步的(包含Leader所在的节点)

所以说Replicas和Isr的区别就是
如果某个partition的副本所在的节点宕机了,在Replicas中还是会显示那个节点,但是在Isr中就不会显示了,Isr中显示的都是处于正常状态的节点。

修改Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
修改一般是修改Topic的partition数量,只能增加

为什么partition只能增加?
因为数据是存储在partition中的,如果可以减少partition的话,那么partition中的数据就丢了

[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 5 --topic hello
WARNING: If partitions are increased for a topic that has a key, the partitio
Adding partitions succeeded!

修改之后再来查看一下topic的详细信息
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --describe --zookeeper
Topic: hello PartitionCount: 5 ReplicationFactor: 2 Configs:
Topic: hello Partition: 0 Leader: 2 Replicas: 2,0 I
Topic: hello Partition: 1 Leader: 0 Replicas: 0,1 I
Topic: hello Partition: 2 Leader: 1 Replicas: 1,2 I
Topic: hello Partition: 3 Leader: 2 Replicas: 2,1 I
Topic: hello Partition: 4 Leader: 0 Replicas: 0,2

删除Topic

1
2
3
4
5
6
7
8
删除Kafka中的指定Topic
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --delete --zookeeper localhost 2181 --topic hello
Topic hello is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

删除操作是不可逆的,删除Topic会删除它里面的所有数据

注意:Kafka从1.0.0开始默认开启了删除操作,之前的版本只会把Topic标记为删除状态,需要设置delete.topic.enable为true才可以真正删除
1
2
如果不想开启删除功能,可以设置delete.topic.enable为false,这样删除topic的时候只会把它标记为删除状态,此时这个topic依然可以正常使用。
delete.topic.enable可以配置在server.properties文件中

Kafka中的生产者和消费者

1
2
3
4
5
6
7
前面我们学习了Kafka中的topic的创建方式,下面我们可以向topic中生产数据以及消费数据了
生产数据需要用到生产者
消费数据需要用到消费者

kafka默认提供了基于控制台的生产者和消费者,方便测试使用
生产者: bin/kafka-console-producer.sh
消费者: bin/kafka-console-consumer.sh

生产者

1
2
3
4
5
先来看一下如何向里面生产数据
直接使用kafka提供的基于控制台的生产者
先创建一个topic【5个分区,2个副本】:
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost 2181 --partitions 5 --replication-factor 2 --topic hello
Created topic hello.
1
2
3
4
5
向这个topic中生产数据
broker-list:kafka的服务地址[多个用逗号隔开](这里需要用到kafka地址,上面创建topic时指定的是zookeeper,这里用本地地址和使用bigdata01:9092,bigdata02:9092,bigdata03:9092是一样的)
topic:topic名称
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello
>hehe

消费者

1
2
3
4
5
6
7
8
9
10
11
12
下面来创建一个消费者消费topic中的数据
bootstrap-server:kafka的服务地址
topic:具体的topic下面来创建一个消费者消费topic中的数据
1 [root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

发现消费不到刚才生产的数据,为什么呢?
因为kafka的消费者默认是消费最新生产的数据,如果想消费之前生产的数据需要添加一个参数--from-beginning,表示从头消费的意思
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning

hehe

这里创建消费者的机器01、02、03都可以

案例:QQ群聊天

1
2
通过kafka可以模拟QQ群聊天的功能,我们来看一下
首先在kafka中创建一个新的topic,可以认为是我们在QQ里面创建了一个群,群号是88888888
1
2
3
4
5
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost 2181 --partitions 5 --replication-factor 2 --topic 88888888
Created topic 88888888.

然后我把你们都拉到这个群里面,这样我在群里面发消息你们就都能收到了
在bigdata02和bigdata03上开启消费者,可以认为是把这两个人拉到群里面了
1
[root@bigdata02 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 88888888
1
[root@bigdata03 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 88888888
1
2
3
4
5
然后我在bigdata01上开启生产者发消息,这样bigdata02和bigdata03都是可以收到的。
这样就可以认为在群里的人都能收到我发的消息,类似于发广播。
这个其实主要利用了kafka中的多消费者的特性,每个消费者都可以消费到相同的数据
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 88888888
>hello everyone

Kafka核心扩展内容

Broker扩展

1
2
3
4
Broker的参数可以配置在server.properties这个配置文件中,Broker中支持的完整参数在官方文档中有体现
具体链接为:
http://kafka.apache.org/24/documentation.html#brokerconfigs
针对Broker的参数,我们主要分析两块

Log Flush Policy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1:Log Flush Policy:设置数据flush到磁盘的时机
为了减少磁盘写入的次数,broker会将消息暂时缓存起来,当消息的个数达到一定阀值或者过了一定的时间间隔后,再flush到磁盘,这样可以减少磁盘IO调用的次数。

这块主要通过两个参数控制
log.flush.interval.messages 一个分区的消息数阀值,达到该阈值则将该分区的数据flush到磁盘,注意这里是针对分区,因为topic是一个逻辑概念,分区是真实存在的,每个分区会在磁盘上产生一个目录
[root@bigdata01 kafka-logs]# ll
total 20
drwxr-xr-x. 2 root root 141 Jun 8 17:12 88888888-0
drwxr-xr-x. 2 root root 141 Jun 8 17:12 88888888-3
-rw-r--r--. 1 root root 4 Jun 8 15:23 cleaner-offset-checkpoint
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-0
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-12
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-15
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-18
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-21
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-24
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-27
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-3
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-30
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-33
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-36
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-39
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-42
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-45
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-48
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-6
drwxr-xr-x. 2 root root 141 Jun 8 17:08 __consumer_offsets-9
drwxr-xr-x. 2 root root 141 Jun 8 17:04 hello-0
drwxr-xr-x. 2 root root 141 Jun 8 17:04 hello-1
drwxr-xr-x. 2 root root 141 Jun 8 17:04 hello-4
hello topic有5个分区,但这里只有2个目录的原因是:没写几条数据

image-20230416002248398

1
2
这个参数的默认值为9223372036854775807,long的最大值
默认值太大了,所以建议修改,可以使用server.properties中针对这个参数指定的值10000,需要去掉注释之后这个参数才生效。
1
2
3
4
log.flush.interval.ms间隔指定时间
默认间隔指定的时间将内存中缓存的数据flush到磁盘中,由文档可知,这个参数的默认值为null,此时会使用log.flush.scheduler.interval.ms参数的值,log.flush.scheduler.interval.ms参数的值默认是 9223372036854775807,long的最大值

所以这个值也建议修改,可以使用server.properties中针对这个参数指定的值1000,单位是毫秒,表示每1秒写一次磁盘,这个参数也需要去掉注释之后才生效

Log Retention Policy

1
2
3
4
5
6
7
设置数据保存周期,默认7天
kafka中的数据默认会保存7天,如果kafka每天接收的数据量过大,这样是很占磁盘空间的,建议修改数据保存周期,我们之前在实际工作中是将数据保存周期改为了1天。

数据保存周期主要通过这几个参数控制
log.retention.hours,这个参数默认值为168,单位是小时,就是7天,可以在这调整数据保存的时间,超过这个时间数据会被自动删除
log.retention.bytes,这个参数表示当分区的文件达到一定大小的时候会删除它,如果设置了按照指定周期删除数据文件,这个参数不设置也可以,这个参数默认是没有开启的
log.retention.check.interval.ms,这个参数表示检测的间隔时间,单位是毫秒,默认值是300000,就是5分钟,表示每5分钟检测一次文件看是否满足删除的时机
1
默认根据时间就行了,后期在时间范围内还可以从kafka中恢复数据

Producer扩展

producer发送数据到partition的方式

1
2
3
Producer默认是随机将数据发送到topic的不同分区中,也可以根据用户设置的算法来根据消息的key来计算输入到哪个partition里面

此时需要通过partitioner来控制,这个知道就行了,因为在实际工作中一般在向kafka中生产数据的都是不带key的,只有数据内容,所以一般都是使用随机的方式发送数据

producer的数据通讯方式

1
2
3
4
5
6
7
8
9
10
11
在这里有一个需要注意的内容就是
针对producer的数据通讯方式:同步发送和异步发送

同步是指:生产者发出数据后,等接收方发回响应以后再发送下个数据的通讯方式。
异步是指:生产者发出数据后,不等接收方发回响应,接着发送下个数据的通讯方式。

具体的数据通讯策略是由acks参数控制的
acks默认为1,表示需要Leader节点回复收到消息,这样生产者才会发送下一条数据
acks:all,表示需要所有Leader+副本节点回复收到消息(acks=-1),这样生产者才会发送下一条数据
acks:0,表示不需要任何节点回复,生产者会继续发送下一条数据
再来看一下这个图:

image-20230415235025189

1
2
3
4
5
6
我们在向hello这个topic生产数据的时候,可以在生产者中设置acks参数,
acks设置为1,表示我们在向hello这个topic的partition1这个分区写数据的时候,只需要让leader所在的broker1这个节点回复确认收到的消息就可以了,这样生产者就可以发送下一条数据了

如果acks设置为all,则需要partition1的这两个副本所在的节点(包含Leader)都回复收到消息,生产者才会发送下一条数据

如果acks设置为0,表示生产者不会等待任何partition所在节点的回复,它只管发送数据,不管你有没有收到,所以这种情况丢失数据的概率比较高。
面试
1
2
3
4
5
6
7
针对这块在面试的时候会有一个面试题:Kafka如何保证数据不丢?
其实就是通过acks机制保证的,如果设置acks为all,则可以保证数据不丢,因为此时把数据发送给kafka之后,会等待对应partition所在的所有leader和副本节点都确认收到消息之后才会认为数据发送成功了,所以在这种策略下,只要把数据发送给kafka之后就不会丢了。

如果acks设置为1,则当我们把数据发送给partition之后,partition的leader节点也确认收到了,但是leader回复完确认消息之后,leader对应的节点就宕机了,副本partition还没来得及将数据同步过去,所以会存在丢失的可能性。
不过如果宕机的是副本partition所在的节点,则数据是不会丢的

如果acks设置为0的话就表示是顺其自然了,只管发送,不管kafka有没有收到,这种情况表示对数据丢不丢都无所谓了。

Consumer扩展

1
2
3
4
5
6
7
8
9
10
11
12
13
在消费者中还有一个消费者组的概念
每个consumer属于一个消费者组,通过group.id指定消费者组

那组内消费和组间消费有什么区别吗?
组内:消费者组内的所有消费者消费同一份数据;

注意:在同一个消费者组中,一个partition同时只能有一个消费者消费数据
如果消费者的个数小于分区的个数,一个消费者会消费多个分区的数据。
如果消费者的个数大于分区的个数,则多余的消费者不消费数据
所以,对于一个topic,同一个消费者组中推荐不能有多于分区个数的消费者,否则将意味着某些消费者将无法获得消息。

组间:多个消费者组消费相同的数据,互不影响。
来看下面这个图,加深一下理解

image-20230416000001364

1
2
3
4
5
6
7
8
Kafka集群有两个节点,Broker1和Broker2
集群内有一个topic,这个topic有4个分区,P0,P1,P2,P3

下面有两个消费者组
Consumer Group A和Consumer Group B
其中Consumer Group A中有两个消费者C1和C2,由于这个topic有4个分区,所以,C1负责消费两个分区的数据,C2负责消费两个分区的数据,这个属于组内消费
Consumer Group B有5个消费者,C3~C7,其中C3,C4,C5,C6分别消费一个分区的数据,而C7就是多余出来的了,因为现在这个消费者组内的消费者的数量比对应的topic的分区数量还多,但是一个分区同时只能被一个消费者消费,所以就会有一个消费者处于空闲状态。这个也属于组内消费
Consumer Group A和Consumer Group B这两个消费者组属于组间消费,互不影响。

Topic、Partition扩展

1
2
3
4
5
6
7
每个partition在存储层面是append log文件。
新消息都会被直接追加到log文件的尾部,每条消息在log文件中的位置称为offset(偏移量)。
越多partitions可以容纳更多的consumer,有效提升并发消费的能力。

具体什么时候增加topic的数量?什么时候增加partition的数量呢?

业务类型增加需要增加topic、数据量大需要增加partition

Message扩展

1
2
3
4
5
每条Message包含了以下三个属性:
1. offset对应类型:long,表示此消息在一个partition中的起始的位置。可以认为offset是partition中Message的id,自增的
2. MessageSize 对应类型:int32 此消息的字节大小。
3. data,类型为bytes,是message的具体内容。
看这个图,加深对Topic、Partition、Message的理解

image-20230416001618719

1
这里也体现了kafka高吞吐量的原因:磁盘顺序读写由于内存随机访问


本文标题:大数据开发工程师-第十四周 消息队列之Kafka从入门到小牛-2

文章作者:TTYONG

发布时间:2023年04月15日 - 22:04

最后更新:2023年04月16日 - 11:04

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%9B%9B%E5%91%A8-%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E4%B9%8BKafka%E4%BB%8E%E5%85%A5%E9%97%A8%E5%88%B0%E5%B0%8F%E7%89%9B-2.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%