大数据开发工程师-第十四周 消息队列之Kafka从入门到小牛-1


第十四周 消息队列之Kafka从入门到小牛-1

初识Kafka

什么是消息队列

1
2
3
4
5
6
7
在学习Kafka之前我们先来看一下什么是消息队列
消息队列(Message Queue):可以简称为MQ
例如:Java中的Queue队列,也可以认为是一个消息队列

消息队列:顾名思义,消息+队列,其实就是保存消息的队列,属于消息传输过程中的容器。

消息队列主要提供生产、消费接口供外部调用,做数据的存储和读取

消息队列分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
消息队列大致可以分为两种:点对点(P2P)、发布订阅(Pub/Sub)
共同点:
针对数据的处理流程是一样的
消息生产者生产消息发送到queue中,然后消息消费者从queue中读取并且消费消息。

不同点:
点对点(p2p)模型包含:消息队列(Queue)、发送者(Sender)、接收者(Receiver)
一个生产者生产的消息只有一个消费者(Consumer)(消息一旦被消费,就不在消息队列中)消费。
例如QQ中的私聊,我发给你的消息只有你能看到,别人是看不到的

发布订阅(Pub/Sub)模型包含:消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber)
每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到,或者QQ中的群聊,我在群里面发一条消息,群里面所有人都能看到

这就是这两种消息队列的区别
我们接下来要学习的Kafka这个消息队列是属于发布订阅模型的

什么是Kafka

1
2
3
4
5
6
7
8
9
10
Kafka 是一个高吞吐量的、持久性的、分布式发布订阅消息系统
高吞吐量:可以满足每秒百万级别消息的生产和消费。

为什么这么快?
难道Kafka的数据是放在内存里面的吗?
不是的,Kafka的数据还是放在磁盘里面的
主要是Kafka利用了磁盘顺序读写速度超过内存随机读写速度这个特性。所以说它的吞吐量才这么高

持久性:有一套完善的消息存储机制,确保数据高效安全的持久化。
分布式:它是基于分布式的扩展、和容错机制;Kafka的数据都会复制到几台服务器上。当某一台机器故障失效时,生产者和消费者切换使用其它的机器。

面试题

1
2
Kafka的数据时存储是磁盘中的,为什么可以满足每秒百万级别消息的生产和消费?
这是一个面试题,其实就是我们刚才针对高吞吐量的解释:kafka利用了磁盘顺序读写速度超过内存随机读写速度这个特性
1
2
Kafka主要应用在实时计算领域,可以和Flume、Spark、Flink等框架结合在一块使用
例如:我们使用Flume采集网站产生的日志数据,将数据写入到Kafka中,然后通过Spark或者Flink从Kafka中消费数据进行计算,这其实是一个典型的实时计算案例的架构

Kafka组件介绍

1
2
接下来我们来分析一下Kafka中的组件,加深对kafka的理解
看这个图

image-20230415153737183

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
先看中间的Kafka Cluster
这个Kafka集群内有两个节点,这些节点在这里我们称之为Broker
Broker:消息的代理,Kafka集群中的一个节点称为一个broker

在Kafka中有Topic的概念
Topic:称为主题,Kafka处理的消息的不同分类(是一个逻辑概念)。
如果把Kafka认为是一个数据库的话,那么Kafka中的Topic就可以认为是一张表
不同的topic中存储不同业务类型的数据,方便使用

在Topic内部有partition的概念
Partition:是Topic物理上的分组,一个Topic会被分为1个或者多个partition(分区),分区个数是在创建topic的时候指定。每个topic都是有分区的,至少1个。

注意:这里面针对partition其实还有副本的概念,主要是为了提供数据的容错性,我们可以在创建Topic的时候指定partition的副本因子是几个。
在这里面副本因子其实就是2了,其中一个是Leader,另一个是真正的副本
Leader中的这个partition负责接收用户的读写请求,副本partition负责从Leader里面的partiton中同步数据,这样的话,如果后期leader对应的节点宕机了,副本可以切换为leader顶上来。

在partition内部还有一个message的概念
Message:我们称之为消息,代表的就是一条数据,它是通信的基本单位,每个消息都属于一个partition。

总结

1
2
3
4
5
6
7
8
9
在这里总结一下
Broker>Topic>Partition>Message

接下来还有两个组件,看图中的最左边和最右边
Producer:消息和数据的生产者,向Kafka的topic生产数据。
Consumer:消息和数据的消费者,从kafka的topic中消费数据。

这里的消费者可以有多个,每个消费者可以消费到相同的数据
最后还有一个Zookeeper服务,Kafka的运行是需要依赖于Zookeeper的,Zookeeper负责协调Kafka集群的正常运行。

Kafka集群安装部署

1
2
3
4
5
6
前面我们对Kafka有了一个基本的认识,下面我们就想使用一下Kafka

在使用之前,需要先把Kafka安装部署起来
Kafka是支持单机和集群模式的,建议大家在学习阶段使用单机模式即可,单机和集群在操作上没有任何区别

注意:由于Kafka需要依赖于Zookeeper,所以在这我们需要先把Zookeeper安装部署起来

zookeeper安装部署

1
2
3
针对Zookeeper前期不需要掌握太多,只需要掌握Zookeeper的安装部署以及它的基本操作即可。
Zookeeper也支持单机和集群安装,建议大家在学习阶段使用单机即可,单机和集群在操作上没有任何区别。
在这里我们会针对单机和集群这两种方式分别演示一下

zookeeper单机安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
zookeeper需要依赖于jdk,只要保证jdk已经正常安装即可。
具体安装步骤如下:
1:下载zookeeper的安装包
进入Zookeeper的官网

2:把安装包上传到bigdata01机器的/data/soft目录下
[root@bigdata01 soft]# ll
-rw-r--r--. 1 root root 9394700 Jun 2 21:33 apache-zookeeper-3.5.8-bin.t
3:解压安装包
[root@bigdata01 soft]# tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
4:修改配置文件
首先将zoo_sample.cfg重命名为zoo.cfg
然后修改 zoo.cfg中的dataDir参数的值,dataDir指向的目录存储的是zookeeper的核心数据,所以这个目录不能使用tmp目录
[root@bigdata01 soft]# cd apache-zookeeper-3.5.8-bin/conf
[root@bigdata01 conf]# mv zoo_sample.cfg zoo.cfg
[root@bigdata01 conf]# vi zoo.cfg
dataDir=/data/soft/apache-zookeeper-3.5.8-bin/data

5:启动zookeeper服务
[root@bigdata01 apache-zookeeper-3.5.8-bin]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/soft/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

6:验证
如果能看到QuorumPeerMain进程就说明zookeeper启动成功
[root@bigdata01 apache-zookeeper-3.5.8-bin]# jps
1701 QuorumPeerMain

注意:如果执行jps命令发现没有QuorumPeerMain进程,则需要到logs目录下去查看zookeeper-*.out这个日志文件
也可以通过zkServer.sh脚本查看当前机器中zookeeper服务的状态
注意:使用zkServer.sh默认会连接本机2181端口的zookeeper服务,默认情况下zookeeper会监听2181端口,这个需要注意一下,因为后面我们在使用zookeeper的时候需要知道它监听的端口是哪个。
最下面显示的Mode信息,表示当前是一个单机独立集群

[root@bigdata01 apache-zookeeper-3.5.8-bin]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/soft/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone

如果没有启动成功的话则会提示连不上服务not running
1
2
3
4
5
6
7
8
7:操作zookeeper
首先使用zookeeper的客户端工具连接到zookeeper里面,使用bin目录下面的zkCli.sh脚本,默认会连接本机的zookeeper服务
[root@bigdata01 apache-zookeeper-3.5.8-bin]# bin/zkCli.sh
Connecting to localhost:2181
.....
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]
1
2
3
4
5
6
7
8
这样就进入zookeeper的命令行了。
在这里面可以操作Zookeeper中的目录结构
zookeeper中的目录结构和Linux文件系统的目录结构类似
zookeeper里面的每一个目录我们称之为节点(ZNode)
正常情况下我们可以把ZNode认为和文件系统中的目录类似,但是有一点需要注意:ZNode节点本身是可以存储数据的。

zookeeper中提供了一些命令可以对它进行一些操作
在命令行下随便输入一个字符,按回车就会提示出zookeeper支持的所有命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[zk: localhost:2181(CONNECTED) 8] aa
ZooKeeper -server host:port cmd args
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path
delquota [-n|-b] path
get [-s] [-w] path
getAcl [-s] path
history
listquota path
ls [-s] [-w] [-R] path
ls2 path [watch]
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:po
redo cmdno
removewatches path [-c|-d|-a] [-l]
rmr path
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b val path
stat [-w] path
sync path
Command not found: Command not found aa
1
下面我们来具体看一些比较常用的功能:
查看根节点下面有什么内容(常用)
1
2
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
创建节点
1
2
3
在根节点下面创建一个test节点,在test节点上存储数据hello
[zk: localhost:2181(CONNECTED) 9] create /test hello
Created /test
查看节点中的信息(常用)
1
2
3
查看/test节点中的内容
[zk: localhost:2181(CONNECTED) 10] get /test
hello
删除节点
1
2
这个删除命令可以递归删除,这里面还有一个delete命令,也可以删除节点,但是只能删除空节点,如果节点下面还有子节点,想一次性全部删除建议使用deleteall(递归删除,适用于这个节点下还有节点;这里用delete也可以)
[zk: localhost:2181(CONNECTED) 6] deleteall /test
1
2
3
直接按 ctrl+c 就可以退出这个操作界面,想优雅一些的话可以输入quit退出

或者quit
1
2
8:停止zookeeper服务
[root@bigdata01 apache-zookeeper-3.5.8-bin]# bin/zkServer.sh stop

zookeeper集群安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1:集群节点规划,使用三个节点搭建一个zookeeper集群
bigdata01
bigdata02
bigdata03

2:首先在bigdata01节点上配置zookeeper
解压
修改配置(2888:集群节点进行通信时的端口;3888:集群节点进行选举时用的端口)
[root@bigdata01 soft]# cd apache-zookeeper-3.5.8-bin/conf/
[root@bigdata01 conf]# mv zoo_sample.cfg zoo.cfg
dataDir=/data/soft/apache-zookeeper-3.5.8-bin/data
server.0=bigdata01:2888:3888
server.1=bigdata02:2888:3888
server.2=bigdata03:2888:3888
1
2
3
4
5
6
7
8
创建目录保存myid文件,并且向myid文件中写入内容
myid中的值其实是和zoo.cfg中server后面指定的编号是一一对应的
编号0对应的是bigdata01这台机器,所以在这里指定0
在这里使用echo和重定向实现数据写入
[root@bigdata01 conf]#cd /data/soft/apache-zookeeper-3.5.8-bin
[root@bigdata01 apache-zookeeper-3.5.8-bin]# mkdir data
[root@bigdata01 apache-zookeeper-3.5.8-bin]# cd data
[root@bigdata01 data]# echo 0 > myid
1
2
3
3:把修改好配置的zookeeper拷贝到其它两个节点
[root@bigdata01 soft]# scp -rq apache-zookeeper-3.5.8-bin bigdata02:/data/soft
[root@bigdata01 soft]# scp -rq apache-zookeeper-3.5.8-bin bigdata03:/data/soft
1
2
3
4
5
6
7
8
4:修改bigdata02和bigdata03上zookeeper中myid文件的内容
首先修改bigdata02节点上的myid文件
[root@bigdata02 ~]# cd /data/soft/apache-zookeeper-3.5.8-bin/data/
[root@bigdata02 data]# echo 1 > myid

然后修改bigdata03节点上的myid文件
[root@bigdata03 ~]# cd /data/soft/apache-zookeeper-3.5.8-bin/data/
[root@bigdata03 data]# echo 2 > myid
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
5:启动zookeeper集群
分别在 bigdata01、bigdata02、bigdata03 上启动zookeeper进程
在bigdata01上启动
[root@bigdata01 apache-zookeeper-3.5.8-bin]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/soft/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

在bigdata02上启动
[root@bigdata02 apache-zookeeper-3.5.8-bin]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/soft/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

在bigdata03上启动
[root@bigdata03 apache-zookeeper-3.5.8-bin]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/soft/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
1
2
3
4
5
6
6:验证
分别在bigdata01、bigdata02、bigdata03上执行jps命令验证是否有QuorumPeerMain 进程
如果都有就说明zookeeper集群启动正常了

如果没有就到对应的节点的logs目录下查看zookeeper*-*.out日志文件
执行bin/zkServer.sh status命令会发现有一个节点显示为leader,其他两个节点为follower
1
2
3
4
7:操作zookeeper
和上面单机的操作方式一样
8:停止zookeeper集群
在bigdata01、bigdata02、bigdata03三台机器上分别执行bin/zkServer.sh stop命令

kafka安装部署

kafka单机安装

1
2
3
4
5
zookeeper集群安装好了以后就可以开始安装kafka了。

注意:在安装kafka之前需要先确保zookeeper集群是启动状态。

kafka还需要依赖于基础环境jdk,需要确保jdk已经安装到位。
1
2
3
4
5
6
7
8
9
10
11
12
1:下载kafka安装包

注意:kafka在启动的时候不需要安装scala环境,只有在编译源码的时候才需要,因为运行的时候是在jvm虚拟机上运行的,只需要有jdk环境就可以了

2:把kafka安装包上传到bigdata01的/data/soft目录下
3:解压
4:修改配置文件(config/server.properties)
主要参数:
broker.id:集群节点id编号,单机模式不用修改
listeners:默认监听9092端口
log.dirs:注意:这个目录不是存储日志的,是存储Kafka中核心数据的目录,这个目录默认是指向的tmp目录,所以建议修改一下
zookeeper.connect:kafka依赖的zookeeper
1
2
3
4
5
6
针对单机模式,如果kafka和zookeeper在同一台机器上,并且zookeeper监听的端口就是那个默认的2181端口,则zookeeper.connect这个参数就不需要修改了。

只需要修改一下log.dirs即可
[root@bigdata01 kafka_2.12-2.4.1]# cd kafka_2.12-2.4.1/config/
[root@bigdata01 config]# vi server.properties
log.dirs=/data/soft/kafka_2.12-2.4.1/kafka-logs
1
2
3
4
5:启动kafka
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties

-daemon的作用是让进程后台运行
1
2
3
4
5
6
6:验证
启动成功之后会产生一个kafka进程
[root@bigdata01 kafka_2.12-2.4.1]# jps
2230 QuorumPeerMain
3117 Kafka
3182 Jps
1
2
7:停止kafka
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-server-stop.sh

kafka集群安装

1
2
3
4
1:集群节点规划,使用三个节点搭建一个kafka集群
bigdata01
bigdata02
bigdata03
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
注意:针对Kafka集群而言,没有主从之分,所有节点都是一样的。
2:首先在bigdata01节点上配置kafka
解压:
[root@bigdata01 soft]# tar -zxvf kafka_2.12-2.4.1.tgz

修改配置文件

注意:此时针对集群模式需要修改broker.id、log.dirs、以及zookeeper.connect

broker.id的值默认是从0开始的,集群中所有节点的broker.id从0开始递增即可
所以bigdata01节点的broker.id值为0

log.dirs的值建议指定到一块存储空间比较大的磁盘上面,因为在实际工作中kafka中会存储很多数据,我这个虚拟机里面就一块磁盘,所以就指定到/data目录下面了

zookeeper.connect的值是zookeeper集群的地址,可以指定集群中的一个节点或者多个节点地址,多个节点地址之间使用逗号隔开即可(避免zookeeper当前这个节点挂了,kafka识别不出来,建议写多个)
[root@bigdata01 soft]# cd kafka_2.12-2.4.1/config/
broker.id=0
log.dirs=/data/kafka-logs
zookeeper.connect=bigdata01:2181,bigdata02:2181,bigdata03:2181
1
2
3
4
5
6
7
8
9
10
11
12
3:将修改好配置的kafka安装包拷贝到其它两个节点
[root@bigdata01 soft]# scp -rq kafka_2.12-2.4.1 bigdata02:/data/soft/
[root@bigdata01 soft]# scp -rq kafka_2.12-2.4.1 bigdata03:/data/soft/
4:修改bigdata02和bigdata03上kafka中broker.id的值
首先修改bigdata02节点上的broker.id的值为1
[root@bigdata02 ~]# cd /data/soft/kafka_2.12-2.4.1/config/
[root@bigdata02 config]# vi server.properties
broker.id=1
然后修改bigdata03节点上的broker.id的值为2
[root@bigdata03 ~]# cd /data/soft/kafka_2.12-2.4.1/config/
[root@bigdata03 config]# vi server.properties
broker.id=2
1
2
3
4
5
6
7
8
9
10
5:启动集群
分别在bigdata01、bigdata02、bigdata03上启动kafka进程
在bigdata01上启动
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties

-daemon的作用是让进程后台运行
在bigdata02上启动
[root@bigdata02 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties
在bigdata03上启动
[root@bigdata03 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties
1
2
3
6:验证
分别在bigdata01、bigdata02、bigdata03上执行jps命令验证是否有kafka进程
如果都有就说明kafka集群启动正常了

image-20230415165152051


本文标题:大数据开发工程师-第十四周 消息队列之Kafka从入门到小牛-1

文章作者:TTYONG

发布时间:2023年04月08日 - 00:04

最后更新:2023年05月20日 - 15:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%9B%9B%E5%91%A8-%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E4%B9%8BKafka%E4%BB%8E%E5%85%A5%E9%97%A8%E5%88%B0%E5%B0%8F%E7%89%9B-1.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%