大数据开发工程师-第十四周 消息队列之Kafka从入门到小牛-3


第十四周 消息队列之Kafka从入门到小牛-3

Kafka核心之存储和容错机制

存储策略

1
2
3
4
5
6
7
8
9
10
11
12
在kafka中每个topic包含1到多个partition,每个partition存储一部分Message。每条Message包含三个属性,其中有一个是offset。

问题来了:offset相当于partition中这个message的唯一id,那么如何通过id高效的找到message?
两大法宝:分段+索引(分段表示一个partition会存储多个文件)

kafak中数据的存储方式是这样的:
1、每个partition由多个segment【片段】组成,每个segment文件中存储多条消息,
2、每个partition在内存中对应一个index,记录每个segment文件中的第一条消息偏移量。

Kafka中数据的存储流程是这样的:
生产者生产的消息会被发送到topic的多个partition上,topic收到消息后往对应partition的最后一个segment上添加该消息,segment达到一定的大小后会创建新的segment。
来看这个图,可以认为是针对topic中某个partition的描述

image-20230416003228787

1
2
图中左侧就是索引,右边是segment文件,左边的索引里面会存储每一个segment文件中第一条消息的偏移量,由于消息的偏移量都是递增的,这样后期查找起来就方便了,先到索引中判断数据在哪个
segment文件中,然后就可以直接定位到具体的segment文件了,这样再找具体的那一条数据就很快了,因为都是有序的。

容错机制

Broker节点宕机

1
2
3
4
5
6
7
8
9
当Kafka集群中的一个Broker节点宕机,会出现什么现象?

下面来演示一下
使用kill -9 杀掉bigdata01中的broker进程测试
[root@bigdata01 kafka_2.12-2.4.1]# jps
7522 Jps
2054 Kafka
1679 QuorumPeerMain
[root@bigdata01 kafka_2.12-2.4.1]# kill 2054
1
2
3
4
5
6
7
8
我们可以先通过zookeeper来查看一下,因为当kafka集群中的broker节点启动之后,会自动向zookeeper中进行注册,保存当前节点信息
....]# bin/zkCli.sh
Connecting to localhost:2181

[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[1, 2]

image-20230416004647924

1
2
3
4
此时发现zookeeper的/brokers/ids下面只有2个节点信息
可以通过get命令查看节点信息,这里面会显示对应的主机名和端口号
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLA

image-20230416005045245

1
2
3
4
5
然后再使用describe查询topic的详细信息,会发现此时的分区的leader全部变成了目前存活的另外两个节点

此时可以发现Isr中的内容和Replicas中的不一样了,因为Isr中显示的是目前正常运行的节点

所以当Kafka集群中的一个Broker节点宕机之后,对整个集群而言没有什么特别的大影响,此时集群会给partition重新选出来一些新的Leader节点

image-20230416005338215

新增一个Broker节点

1
2
3
4
5
当Kafka集群中新增一个Broker节点,会出现什么现象?
新加入一个broker节点,zookeeper会自动识别并在适当的机会选择此节点提供服务

再次启动bigdata01节点中的broker进程测试
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties
1
2
3
4
5
6
7
8
此时到zookeeper中查看一下
[root@bigdata01 apache-zookeeper-3.5.8-bin]# bin/zkCli.sh
Connecting to localhost:2181
.....
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[0, 1, 2]

image-20230416005822131

1
2
3
4
发现broker.id为0的这个节点信息也有了

在通过describe查看topic的描述信息,Isr中的信息和Replicas中的内容是一样的了
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello

image-20230416005947958

1
2
3
4
但是启动后有个问题:发现新启动的这个节点不会是任何分区的leader?怎么重新均匀分配呢?
1、Broker中的自动均衡策略(默认已经有)
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds 默认值:300
1
2
3
4
5
6
2、手动执行:
bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type pareferred --all-topic-partitions

Successfully completed leader election (PREFERRED) for partitions hello-4, he

执行后的效果如下,这样就实现了均匀分配

image-20230416010500020

Kafka生产消费者实战

1
2
3
前面我们使用基于console的生产者和消费者对topic实现了数据的生产和消费,,这个基于控制台的生产者和消费者主要是让我们做测试用的
在实际工作中,我们有时候需要将生产者和消费者功能集成到我们已有的系统中,此时就需要写代码实现生产者和消费者的逻辑了。
在这我们使用java代码来实现生产者和消费者的功能

Kafka Java代码编程

Java代码实现生产者代码

image-20230416011434393

1
2
3
4
5
6
7
8
先创建maven项目, db_kafka

添加kafka的maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
1
开发生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.imooc.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
/**
* 需求:Java代码实现生产者代码
*/
public class ProducerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
//指定key-value数据的序列化格式(key就是之前讲的,如果指定了数据有key,则可以根据它来将数据放入哪一个partition,一般用不到;但这里要知道不然要报错)
prop.put("key.serializer", StringSerializer.class.getName());
prop.put("value.serializer", StringSerializer.class.getName());

//指定topic
String topic = "hello";
//创建kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
//向topic中生产数据(这里也没有传入key,只传入了value)
producer.send(new ProducerRecord<String, String>(topic, "hello kafka"))
//关闭链接
producer.close();
}
}

Java代码实现消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.imooc.kafka;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
* 需求:Java代码实现消费者代码
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址(之前控制台那里server没s)
prop.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
//指定key-value的反序列化类型
prop.put("key.deserializer", StringDeserializer.class.getName());
prop.put("value.deserializer", StringDeserializer.class.getName());

//指定消费者组(之前控制台那里,会自动生成)
prop.put("group.id", "con-1");

//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);
Collection<String> topics = new ArrayList<String>();
topics.add("hello");
//订阅指定的topic
consumer.subscribe(topics);
while(true) {
//消费数据【注意:需要修改jdk编译级别为1.8,否则Duration.ofSeconds(1)会语法报错
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> consumerRecord : poll) {
System.out.println(consumerRecord);
}
}
}
}

启动

1
2
3
4
5
6
注意:
1. 关闭kafka服务器的防火墙
2. 配置windows的hosts文件 添加kafka节点的hostname和ip的映射关系。[如果我们的hosts文件中没有对kafka节点的hostnam和ip的映射关系做配置,在这经过多次尝试连接不上就会报错]

先开启消费者。
发现没有消费到数据,这个topic中是有数据的,为什么之前的数据没有消费出来呢?(就是前面讲的,默认会从consumer生成后生成的数据读取)不要着急,先带着这个问题往下面看

image-20230416014022148

1
再开启生产者,生产者会生产一条数据,然后就结束

image-20230416014143241

1
此时回到kafka的消费者端就可以看到消费出来的数据了

image-20230416014214263

1
所以这个时候我们发现,新产生的数据我们是可以消费到的,但是之前的数据我们就无法消费了,那下面我们来分析一下这个问题

消费者代码扩展

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//==================================================
//开启消费者自动提交offset功能,默认就是开启的
prop.put("enable.auto.commit","true");
//自动提交offset的时间间隔,单位是毫秒(在开启自动提交时,它默认开启,且默认值是5000)
prop.put("auto.commit.interval.ms","5000");

/*
注意:正常情况下,kafka消费数据的流程是这样的
先根据group.id指定的消费者组到kafka中查找之前保存的offset信息

如果查找到了,说明之前使用这个消费者组消费过数据,则根据之前保存的offset继续进行消费

如果没查找到(说明第一次消费),或者查找到了,但是查找到的那个offset对应的数据已经不存

这个时候消费者该如何消费数据?
(因为kafka默认只会保存7天的数据,超过时间数据会被删除)

此时会根据auto.offset.reset的值执行不同的消费逻辑

这个参数的值有三种:[earliest,latest,none]
earliest:表示从最早的数据开始消费(从头消费)
latest【默认】:表示从最新的数据开始消费
none:如果根据指定的group.id没有找到之前消费的offset信息,就会抛异常

(工作中earliest和latest常用)

解释:【查找到了,但是查找到的那个offset对应的数据已经不存在了】
假设你第一天使用一个消费者去消费了一条数据,然后就把消费者停掉了,等了7天之后,你又使用这个消费者去消费数据
这个时候,这个消费者启动的时候会到kafka里面查询它之前保存的offset信息
但是那个offset对应的数据已经被删了,所以此时再根据这个offset去消费是消费不到数据的

总结,一般在实时计算的场景下,这个参数的值建议设置为latest,消费最新的数据

这个参数只有在消费者第一次消费数据,或者之前保存的offset信息已过期的情况下才会生效
*/

prop.put("auto.offset.reset","latest");
//==================================================
1
2
此时我们来验证一下,
先启动一次生产者,再启动一次消费者,看看消费者能不能消费到这条数据,如果能消费到,就说明此时是根据上次保存的offset信息进行消费了。结果发现是可以消费到的。
1
2
3
注意:消费者消费到数据之后,不要立刻关闭程序,要至少等5秒,因为自动提交offset的时机是5秒提交一次

ConsumerRecord(topic = hello, partition = 4, leaderEpoch = 5, offset = 0, Cre
1
2
3
4
5
将auto.offset.reset置为earliest,修改一下group.id的值,相当于使用一个新的消费者,验证一下,看是否能把这个topic中的所有数据都取出来,因为新的消费者第一次肯定是获取不到offset信息的,
所以就会根据auto.offset.reset的值来消费数据

prop.put("group.id", "con-2");
prop.put("auto.offset.reset","earliest");
1
2
3
ConsumerRecord(topic = hello, partition = 2, leaderEpoch = 0, offset = 0, Cre
ConsumerRecord(topic = hello, partition = 3, leaderEpoch = 3, offset = 0, Cre
ConsumerRecord(topic = hello, partition = 4, leaderEpoch = 5, offset = 0, Cre

image-20230416213026828

1
2
此时,关闭消费者(需要等待5秒,这样才会提交offset),再重新启动,发现没有消费到数据,说明此时就
根据上次保存的offset来消费数据了,因为没有新数据产生,所以就消费不到了。
1
2
最后来处理一下程序输出的日志警告信息,这里其实示因为缺少依赖日志依赖
在pom文件中添加log4j的依赖,然后将 log4j.properties 添加到 resources目录中
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
</dependency>
1
2
3
4
5
6
log4j.rootLogger=info,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

Consumer消费offset查询

1
2
3
4
kafka0.9版本以前,消费者的offset信息保存在zookeeper中
从kafka0.9开始,使用了新的消费API,消费者的信息会保存在kafka里面的__consumer_offsets这个topic中

因为频繁操作zookeeper性能不高,所以kafka在自己的topic中负责维护消费者的offset信息。

image-20230416213511507

1
2
3
4
5
6
7
如何查询保存在kafka中的Consumer的offset信息呢?
使用kafka-consumer-groups.sh这个脚本可以查看目前所有的consumer group

[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

con-1
con-2 (前面视频里修改过)
1
2
3
4
5
6
7
具体查看某一个consumer group的信息
GROUP:当前消费者组,通过group.id指定的值
TOPIC:当前消费的topic
PARTITION:消费的分区
CURRENT-OFFSET:消费者消费到这个分区的offset
LOG-END-OFFSET:当前分区中数据的最大offset
LAG:当前分区未消费数据量
1
2
3
4
5
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group con-1
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
con-1 hello 4 1 1 0
con-1 hello 2 1 1 0
con-1 hello 3 1 1 0

image-20230416214127550

1
2
3
4
partition:是指消费者消费了哪些分区
current-offset:当前消费了的数据的offset
log-end-offset:最新数据的offset
lag:还有多少条数据没消费
1
此时再执行一次生产者代码,生产一条数据,重新查看一下这个消费者的offset情况

image-20230416214839816

1
2
如何分析生产的数据能不能及时消费掉:查看lag
如果lag值比较大:就需要增加消费者个数,同一个代码执行多次(但group.id不能变)

Consumer消费顺序

1
2
3
4
5
6
7
当一个消费者消费一个partition时候,消费的数据顺序和此partition数据的生产顺序是一致的

当一个消费者消费多个partition时候,消费者按照partition的顺序,首先消费一个partition,当消费完一个partition最新的数据后再消费其它partition中的数据

总之:如果一个消费者消费多个partiton,只能保证消费的数据顺序在一个partition内是有序的

也就是说消费kafka中的数据只能保证消费partition内的数据是有序的,多个partition之间是无序的。

image-20230416220156464

Kafka的三种语义

1
kafka可以实现以下三种语义,这三种语义是针对消费者而言的:

至少一次:at-least-once

1
2
3
4
5
这种语义有可能会对数据重复处理
实现至少一次消费语义的消费者也很简单。
1: 设置enable.auto.commit为false,禁用自动提交offset
2: 消息处理完之后手动调用consumer.commitSync()提交offset
这种方式是在消费数据之后,手动调用函数consumer.commitSync()异步提交offset,有可能处理多次的场景是消费者的消息处理完并输出到结果库,但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息,所以至少会处理一次

image-20230416221328144

至多一次:at-most-once

1
2
3
4
5
6
7
这种语义有可能会丢失数据
至多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是
1: enable.auto.commit设置为true。
2: auto.commit.interval.ms设置为一个较低的时间范围。
由于上面的配置,此时kafka会有一个独立的线程负责按照指定间隔提交offset。

消费者的offset已经提交,但是消息还在处理中(还没有处理完),这个时候程序挂了,导致数据没有被成功处理,再重启的时候会从上次提交的offset处消费,导致上次没有被成功处理的消息就丢失了。

仅一次:exactly-once

1
2
3
4
5
6
这种语义可以保证数据只被消费处理一次。
实现仅一次语义的思路如下:
1: 将enable.auto.commit设置为false,禁用自动提交offset
2: 使用consumer.seek(topicPartition,offset)来指定offset
3: 在处理消息的时候,要同时保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果,这个时候相当于自己保存offset信息了,把offset和具体的数据绑定到一块,数据真正处理成功的时候才会保存offset信息
这样就可以保证数据仅被处理一次了。

本文标题:大数据开发工程师-第十四周 消息队列之Kafka从入门到小牛-3

文章作者:TTYONG

发布时间:2023年04月15日 - 22:04

最后更新:2023年04月19日 - 23:04

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%9B%9B%E5%91%A8-%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E4%B9%8BKafka%E4%BB%8E%E5%85%A5%E9%97%A8%E5%88%B0%E5%B0%8F%E7%89%9B-3.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%