大数据开发工程师-第十四周 消息队列之Kafka从入门到小牛-2
Kafka使用初体验
Kafka中Topic的操作
1 | kafka集群安装好了以后我们就想向kafka中添加一些数据 |
新增Topic
1 | 指定2个分区,2个副本,注意:副本数不能大于集群中Broker的数量 |
1 | [root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --replication-factor 2 |
查询Topic
查询Kafka中的所有Topic列表
1 | 查询Kafka中的所有Topic列表以及查看指定Topic的详细信息 |
查看指定Topic的详细信息
1 | 查看指定topic的详细信息 |

1 | 第一个行显示指定topic所有partitions的一个总结 |

1 | 这个图里面的hello这个topic有两个partition,其中partition1的leader所在的节点是broker1,partition2的leader所在的节点是broker2 |
修改Topic
1 | 修改一般是修改Topic的partition数量,只能增加 |
删除Topic
1 | 删除Kafka中的指定Topic |
1 | 如果不想开启删除功能,可以设置delete.topic.enable为false,这样删除topic的时候只会把它标记为删除状态,此时这个topic依然可以正常使用。 |
Kafka中的生产者和消费者
1 | 前面我们学习了Kafka中的topic的创建方式,下面我们可以向topic中生产数据以及消费数据了 |
生产者
1 | 先来看一下如何向里面生产数据 |
1 | 向这个topic中生产数据 |
消费者
1 | 下面来创建一个消费者消费topic中的数据 |
案例:QQ群聊天
1 | 通过kafka可以模拟QQ群聊天的功能,我们来看一下 |
1 | [root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost 2181 --partitions 5 --replication-factor 2 --topic 88888888 |
1 | [root@bigdata02 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 88888888 |
1 | [root@bigdata03 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 88888888 |
1 | 然后我在bigdata01上开启生产者发消息,这样bigdata02和bigdata03都是可以收到的。 |
Kafka核心扩展内容
Broker扩展
1 | Broker的参数可以配置在server.properties这个配置文件中,Broker中支持的完整参数在官方文档中有体现 |
Log Flush Policy
1 | 1:Log Flush Policy:设置数据flush到磁盘的时机 |

1 | 这个参数的默认值为9223372036854775807,long的最大值 |
1 | log.flush.interval.ms间隔指定时间 |
Log Retention Policy
1 | 设置数据保存周期,默认7天 |
1 | 默认根据时间就行了,后期在时间范围内还可以从kafka中恢复数据 |
Producer扩展
producer发送数据到partition的方式
1 | Producer默认是随机将数据发送到topic的不同分区中,也可以根据用户设置的算法来根据消息的key来计算输入到哪个partition里面 |
producer的数据通讯方式
1 | 在这里有一个需要注意的内容就是 |

1 | 我们在向hello这个topic生产数据的时候,可以在生产者中设置acks参数, |
面试
1 | 针对这块在面试的时候会有一个面试题:Kafka如何保证数据不丢? |
Consumer扩展
1 | 在消费者中还有一个消费者组的概念 |

1 | Kafka集群有两个节点,Broker1和Broker2 |
Topic、Partition扩展
1 | 每个partition在存储层面是append log文件。 |
Message扩展
1 | 每条Message包含了以下三个属性: |

1 | 这里也体现了kafka高吞吐量的原因:磁盘顺序读写由于内存随机访问 |
