大数据开发工程师-第七周 第3章 精讲Flume高级组件


第七周 第3章 精讲Flume高级组件

1
前面我们掌握了Flume中的核心组件 source、channel、sink的使用,下面我们来学习一下Flume中的一些高级组件的使用

高级组件

1
2
3
4
5
6
7
Source Interceptors:Source可以指定一个或者多个拦截器按先后顺序依次对采集到的数据进行处
理。

Channel Selectors:Source发往多个Channel的策略设置,如果source后面接了多个channel,到底是给所有的channel都发,还是根据规则发送到不同channel,这些是由Channel Selectors来控制的

Sink Processors:Sink发送数据的策略设置,一个channel后面可以接多个sink,channel中的数据
是被哪个sink获取,这个是由Sink Processors控制的
1
在具体分析这些高级组件之前,我们先插播一个小知识点,这个知识点在高级组件中会用到
1
2
3
4
5
6
7
Event是Flume传输数据的基本单位,也是事务的基本单位,在文本文件中,通常一行记录就是一个Event

Event中包含header和body:
body是采集到的那一行记录的原始内容
header类型为Map<String, String>,里面可以存储一些属性信息,方便后面使用

我们可以在Source中给每一条数据的header中增加key-value,在Channel和Sink中使用header中的值了。

Source Interceptors

1
2
3
4
接下里我们看一下第一个高级组件,Source Interceptors
系统中已经内置提供了很多Source Interceptors

常见的Source Interceptors类型:Timestamp Interceptor、Host Interceptor、Search and Replace Interceptor 、Static Interceptor、Regex Extractor Interceptor 等
1
2
3
4
5
6
7
8
9
Timestamp Interceptor:向event中的header里面添加timestamp 时间戳信息

Host Interceptor:向event中的header里面添加host属性,host的值为当前机器的主机名或者ip

Search and Replace Interceptor:根据指定的规则查询Event中body里面的数据,然后进行替换,这个拦截器会修改event中body的值,也就是会修改原始采集到的数据内容

Static Interceptor:向event中的header里面添加固定的key和value

Regex Extractor Interceptor:根据指定的规则从Event中的body里面抽取数据,生成key和value,再把key和value添加到header中
1
2
3
4
5
6
根据刚才的分析,总结一下:
Timestamp Interceptor、Host Interceptor、Static Interceptor、Regex Extractor Interceptor是向event中的header里面添加key-value类型的数据,方便后面的channel和sink组件使用,对采集到的原始数据内容没有任何影响

Search and Replace Interceptor是会根据规则修改event中body里面的原始数据内容,对header没有任何影响,使用这个拦截器需要特别小心,因为他会修改原始数据内容。

这里面这几个拦截器其中Search and Replace Interceptor和Regex Extractor Interceptor 我们在工作中使用的比较多一些

案例

1
2
3
下面呢,我们来看一个案例:对采集到的数据按天按类型分目录存储

我们的原始数据是这样的,看这个文件,Flume测试数据格式.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
video_info

{"id":"14943445328940974601","uid":"840717325115457536","lat":"53.530598","lnt":"-2.5620373","hots":0,"title":"0","status":"1","topicId":"0","end_time":"1494344570","watch_num":0,"share_num":"1","replay_url":null,"replay_num":0,"start_time":"1494344544","timestamp":1494344571,"type":"video_info"}


user_info

{"uid":"861848974414839801","nickname":"mick","usign":"","sex":1,"birthday":"","face":"","big_face":"","email":"abc@qq.com","mobile":"","reg_type":"102","last_login_time":"1494344580","reg_time":"1494344580","last_update_time":"1494344580","status":"5","is_verified":"0","verified_info":"","is_seller":"0","level":1,"exp":0,"anchor_level":0,"anchor_exp":0,"os":"android","timestamp":1494344580,"type":"user_info"}


gift_record

{"send_id":"834688818270961664","good_id":"223","video_id":"14943443045138661356","gold":"10","timestamp":1494344574,"type":"gift_record"}
1
2
3
4
5
6
7
  这份数据中有三种类型的数据,视频信息、用户信息、送礼信息,数据都是json格式的,这些数据还有一个共性就是里面都有一个type字段,type字段的值代表数据类型
当我们的直播平台正常运行的时候,会实时产生这些日志数据,我们希望把这些数据采集到hdfs上进行存储,并且要按照数据类型进行分目录存储,视频数据放一块、用户数据放一块、送礼数据放一块
针对这个需求配置agent的话,source使用基于文件的execsource、channle使用基于文件的channle,我们希望保证数据的完整性和准确性,sink使用hdfssink
但是注意了,hdfssink中的path不能写死,首先是按天 就是需要动态获取日期,然后是因为不同类型的数据要存储到不同的目录中
那也就意味着path路径中肯定要是有变量,除了日期变量还要有数据类型变量,
这里的数据类型的格式都是单词中间有一个下划线,但是我们的要求是目录中的单词不要出现下划线,使用驼峰的命名格式。
所以最终在hdfs中需要生成的目录大致是这样的

H7Ry2q.png

1
2
3
4
5
6
7
这里的日期变量好获取,但是数据类型如何获取呢?
注意了,咱们前面分析了,通过source的拦截器可以向event的header中添加key-value,然后在后面的channel或者sink中获取key-value的值
那我们在这就可以通过Regex Extractor Interceptor获取原始数据中的type字段的值,获取出来以后存储到header中,这样在sink阶段就可以获取到了。

但是这个时候直接获取到的type的值是不满足要求的,需要对type的值进行转换,去掉下划线,转化为驼峰形式

所以可以先使用Search and Replace Interceptor对原始数据中type的值进行转换,然后使用Regex Extractor Interceptor指定规则获取type字段的值,添加到header中。
流程
1
所以整体的流程是这样的

H7WlLT.md.png

配置
1
那下面我们来配置Agent,在bigdata04机器上创建 file-to-hdfs-moreType.conf

image-20230317212249291

image-20230317214719430

image-20230317214800364

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#给三个组件起名
a1.sources=r1
a1.channels=c1
a1.sinks=k1


#配置sources
a1.sources.r1.type=exec
a1.sources.r1.command = tail -F /data/log/moreTypeData/moreType.log


#配置拦截器[多个拦截器按照顺序依次执行]
a1.sources.r1.interceptors = i1 i2 i3 i4
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern = "type":"video_info"
a1.sources.r1.interceptors.i1.replaceString = "type":"videoInfo"

a1.sources.r1.interceptors.i2.type = search_replace
a1.sources.r1.interceptors.i2.searchPattern = "type":"user_info"
a1.sources.r1.interceptors.i2.replaceString = "type":"userInfo"

a1.sources.r1.interceptors.i3.type = search_replace
a1.sources.r1.interceptors.i3.searchPattern = "type":"gift_record"
a1.sources.r1.interceptors.i3.replaceString = "type":"giftRecord"

a1.sources.r1.interceptors.i4.type=regex_extractor
a1.sources.r1.interceptors.i4.regex = "type":"(\\w+)"
a1.sources.r1.interceptors.i4.serializers = s1
a1.sources.r1.interceptors.i4.serializers.s1.name = logType


#配置channels
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/moreType/checkpoint
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/moreType/data


#配置sinks
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata01:9000/flume/moreTypeData/%Y%m%d/%{logType}
a1.sinks.k1.hdfs.filePrefix = data-
a1.sinks.k1.hdfs.fileSuffix= .log
a1.sinks.k1.hdfs.fileType= DataStream
a1.sinks.k1.hdfs.rollInterval= 360 0
a1.sinks.k1.hdfs.rollSize=134217728
a1.sinks.k1.hdfs.writeFormat= Text
a1.sinks.k1.hdfs.useLocalTimeStamp=true


#
a1.sources.r1.channels=c1
a1.sinks.k1.channel= c1
1
注意:这里面的拦截器,拦截器可以设置一个或者多个,source采集的每一条数据都会经过所有的拦截器进行处理,多个拦截器按照顺序执行。
1
2
后面也统一设置了hdfs文件的前缀和后缀
先生成测试数据,提前手工把数据添加到 moreType.log 文件中

image-20230317215919595

启动
1
bin/flume-ng agent --name a1 --conf conf-file conf/file-to-hdfs -Dflume.root.logger=INFO

image-20230317220402387

结果查看

H7LO1O.md.png

1
2
3
看一下HDFS中的文件内容,发现type字段的值确实被拦截器修改了

这就实现了按天,按类型分目录存储。

Channel Selectors

1
2
3
4
5
6
7
接下来看一下Channel Selectors
Channel Selectors类型包括:Replicating Channel Selector 和Multiplexing Channel Selector

其中Replicating Channel Selector是默认的channel选择器,它会将Source采集过来的Event发往所有
Channel

查看官方文档中针对这个默认channel选择器的解释

HHUhBq.md.png

1
2
3
4
5
6
在这个例子的配置中,c3是可选channel。对c3的写入失败将被忽略。由于c1和c2未标记为可选,因此未能写入这些channel将导致事务失败

针对这个配置,通俗一点来说就是,source的数据会发往c1、c2、c3这三个channle中,可以保证c1、c2一定能接收到所有数据,但是c3就无法保证了

这个selector.optional参数是一个可选项,可以不用配置就行。
如果是多个channel的话,直接在channels参数后面指定多个channel的名称就可以了,多个channel名称中间使用空格隔开,其实你看这个名称是channels带有s,从名字上看就能看出来他支持多个channel
1
2
3
4
还有一个 channel选择器是Multiplexing Channel Selector,它表示会根据Event中header里面的值将
Event发往不同的Channel

看下官网中的介绍

HHaEDI.md.png

1
2
3
4
5
6
7
8
在这个例子的配置中,指定了4个channel,c1、c2、c3、c4
source采集到的数据具体会发送到哪个channel中,会根据event中header里面的state属性的值,这个
是通过selector.header控制的
如果state属性的值是CZ,则发送给c1
如果state属性的值是US,则发送给c2 c3
如果state属性的值是其它值,则发送给c4
这些规则是通过selector.mapping和selector.default控制的
这样就可以实现根据一定规则把数据分发给不同的channel了。

案例一:多Channel之Replicating Channel Selector

HHdMo6.md.png

1
2
3
4
5
6
在这个案例中我们使用Replicating选择器,将source采集到的数据重复发送给两个channle,最后每个channel后面接一个sink,负责把数据存储到不同存储介质中,方便后期使用。

在实际工作中这种需求还是比较常见的,就是我们希望把一份数据采集过来以后,分别存储到不同的存储介质中,不同存储介质的特点和应用场景是不一样的,典型的就是hdfssink 和kafkasink,通过hdfssink实现离线数据落盘存储,方便后面进行离线数据计算

通过kafkasink实现实时数据存储,方便后面进行实时计算,
由于我们还没有学kafka,所以在这里先使用loggersink代理。
配置
1
2
下面根据图中列出来的source、channel和sink来配置agent
在bigdata04中flume的conf目录中创建 tcp-to-replicatingchannel.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
[root@bigdata04 conf]# vim tcp-to-replicatingchannel.conf 

# 指定组件名称
a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2

# 配置source组件
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 55555

# 配置channel选择器[默认就是replicating,可以省略]
a1.sources.r1.selector.type = replicating
#a1.sources.r1.channels = c1 c2
#a1.sources.r1.selector.optional = c2

# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# 配置sink组件
a1.sinks.k1.type=logger

a1.sinks.k2.type=hdfs
a1.sinks.k2.hdfs.path = hdfs://bigdata01:9000/flume/replicatingSelector
a1.sinks.k2.hdfs.filePrefix = data-
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.fileSuffiix = .log
a1.sinks.k2.hdfs.writeFormat = Text

#
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
启动,输入数据,查看

image-20230317224043915

1
生成测试数据,通过telnet连接到socket

HHyp1f.png

HHy9c8.md.png

HHyucT.md.png

案例二:多channel之Multiplexing Channel Selector案例

HHyq2V.md.png

1
2
最终再把不同channel中的数据存储到不同介质中。
在这里面我们需要用到正则抽取拦截器在Event的header中生成key-value作为Multiplexing选择器的规则
测试数据格式
1
2
{"name":"jack","age":19,"city":"bj"}
{"name":"tom","age":26,"city":"sh"}
配置
1
2
下面来配置Agent,复制tcp-to-replicatingchannel.conf的内容,
主要增加source拦截器和修改channel选择器,以及hdfsink中的path路径
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
[root@bigdata04 conf]# vim tcp-to-multiplexingchannel.conf 
#
a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2

#
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 55555

# 配置source正则抽取拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=regex_extractor
a1.sources.r1.interceptors.i1.regex = "city":"(\\w+)"
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.name = city

# 配置channel选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = city
a1.sources.r1.selector.mapping.bj = c1
#a1.sources.r1.selector.mapping.sh = c2
a1.sources.r1.selector.default = c2

#
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

#
a1.sinks.k1.type=logger

a1.sinks.k2.type=hdfs
a1.sinks.k2.hdfs.path = hdfs://bigdata01:9000/flume/multiplexingSelector
a1.sinks.k2.hdfs.filePrefix = data-
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.fileSuffiix = .log
a1.sinks.k2.hdfs.writeFormat = Text


#
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
启动,输入测试数据,查看

image-20230317225601537

image-20230317225640999
HHWavt.md.png
HHWUgI.md.png

Sink Processors

1
2
3
4
5
6
7
8
接下来看一下Sink处理器
Sink Processors类型包括这三种:Default Sink Processor、Load balancing Sink Processor和Failover Sink Processor

Default Sink Processor是默认的,不用配置sinkgroup,就是咱们现在使用的这种最普通的形式,一个channel后面接一个sink的形式

Load balancing Sink Processor是负载均衡处理器,一个channle后面可以接多个sink,这多个sink属于一个sink group,根据指定的算法进行轮询或者随机发送,减轻单个sink的压力

Failover Sink Processor是故障转移处理器,一个channle后面可以接多个sink,这多个sink属于一个sink group,按照sink的优先级,默认先让优先级高的sink来处理数据,如果这个sink出现了故障,则用优先级低一点的sink处理数据,可以保证数据不丢失。

负载均衡案例:Load balancing Sink Processor

HbsWad.md.png

HbsfIA.md.png

1
2
3
4
5
6
7
8
看中间的参数信息,
processor.sinks:指定这个sink groups中有哪些sink,指定sink的名称,多个的话中间使用空格隔开即可【注意,这里写的是processor.sinks,但是在下面的example中使用的是sinks,实际上就是sinks,所以文档也是有一些瑕疵的,不过Flume的文档已经算是写的非常好的了】
processor.type:针对负载均衡的sink处理器,这里需要指定load_balance
processor.selector:此参数的值内置支持两个,round_robin和random,round_robin表示轮询,按照sink的顺序,轮流处理数据,random表示随机。
processor.backoff:默认为false,设置为true后,故障的节点会列入黑名单,过一定时间才会再次发送数据,如果还失败,则等待时间是指数级增长;一直到达到最大的时间。

如果不开启,故障的节点每次还会被重试发送,如果真有故障节点的话就会影响效率。
processor.selector.maxTimeOut:最大的黑名单时间,默认是30秒
配置
1
这个负载均衡案例可以解决之前单节点输出能力有限的问题,可以通过多个sink后面连接多个Agent实现负载均衡,如果后面的Agent挂掉1个,也不会影响整体流程,只是处理效率又恢复到了之前的状态。
bigdata04
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
a1.sources=r1
a1.channels=c1
a1.sinks=k1 k2

#
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

#
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置sink组件,[为了方便演示效果,把batch-size设置为1]
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 45454
a1.sinks.k1.batch-size=1

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 41414
a1.sinks.k1.batch-size=1

# 配置sink策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin

#
a1.sources.r1.channels = c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1
1
a1.sinks.k1.port = 45454和a1.sinks.k2.port = 41414的端口一致也没事,因为是在不同的节点上,只要bigdata02,bigdata03上写的一致就行
bigdata02
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 45454

#
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata01:9000/flume/sinkProcessor_load_balance
a1.sinks.k1.hdfs.filePrefix = data130
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.rollInterval=3600
a1.sinks.k1.hdfs.rollSize=134217728
a1.sinks.k1.hdfs.fileSuffix=.log

#
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
bigdata03
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

#
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata01:9000/flume/sinkProcessor_load_balance
a1.sinks.k1.hdfs.filePrefix = data131
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.rollInterval=3600
a1.sinks.k1.hdfs.rollSize=134217728
a1.sinks.k1.hdfs.fileSuffix=.log

#
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动
1
2
3
还是先启动后面,再启动前面

注意:02,03在启动之前需要到/etc/profile中先配置HADOOP_HOME环境变量,因为这个Agent中使用到了hdfs

image-20230317235022827

image-20230317235122655

image-20230317235202917

image-20230317235240574

image-20230317235313603

异常
1
2
3
java.lang.NoSuchMethodError:com.google.common.base.Preconditions.checkArgument(...)(已解决)

hadoop和flume的guava版本不同

URL

结果查看

HbsWad.png
HbsfIA.pngHbvJbD.png
HbvGDO.png
HbvtVe.png

故障转移案例:Failover Sink Processor

Hbz23n.png

1
2
3
4
5
6
7
接下来我们来看一下故障转移
在这个图中,也是一个channel后面接了两个sink,但是这里和负载均衡架构不一样的是,这两个sink正常情况下只有一个干活,另一个是不干活的

这个故障转移案例可以解决sink组件单点故障的问题,如果某一个sink输出功能失效,另一个还可以顶上
来,同时只会存在一个真正输出数据的sink。

来看一下Failover Sink Processor的文档介绍

HbzWj0.png

1
2
3
4
5
processor.type:针对故障转移的sink处理器,使用failover
processor.priority.:指定sink group中每一个sink组件的优先级,默认情况下channel中的数据会被优先级比较高的sink取走
processor.maxpenalty:sink发生故障之后,最大等待时间

只有一个输出数据
配置
bigdata04
1
就sink配置策略改变
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
a1.sources=r1
a1.channels=c1
a1.sinks=k1 k2

#
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

#
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置sink组件,[为了方便演示效果,把batch-size设置为1]
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 45454
a1.sinks.k1.batch-size=1

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 41414
a1.sinks.k1.batch-size=1

#配置sink策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

#
a1.sources.r1.channels = c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1
bigdata02
1
与上个案例一样,就hdfs地址改变
bigdata03
1
与上个案例一样,就hdfs地址改变
结果查看

HqC2IU.png
HqCWiF.png
HqCgaT.png

1
2
然后到hdfs上验证数据,发现现在数据是通过bigdata03这台机器写出去的,因为对应bigdata03这台机
器的sink组件的优先级比较高
1
此时将优先级高的agent停了,在向端口发送数据,会用另一台机子发送数据,再启动优先级高的,又会恢复原样
模拟Agent挂掉
1
2
3
接下来模拟bigdata03这台机器上的的Agent挂掉,也就意味着k2这个sink写不出去数据了,此时,我们再通过socket发送一条数据,看看会怎么样
直接在bigdata03窗口中按 ctrl+c 停止agent即可
然后再生成一条数据

HqACcQ.png
HqA91g.png

1
2
3
此时如果把bigdata03上的Agent再启动的话,会发现新采集的数据会通过bigdata03上的Agent写出去,这是因为它的优先级比较高。

这就是Sink故障转移的应用

本文标题:大数据开发工程师-第七周 第3章 精讲Flume高级组件

文章作者:TTYONG

发布时间:2022年02月16日 - 22:02

最后更新:2023年03月31日 - 00:03

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E4%B8%83%E5%91%A8-%E7%AC%AC3%E7%AB%A0-%E7%B2%BE%E8%AE%B2Flume%E9%AB%98%E7%BA%A7%E7%BB%84%E4%BB%B6.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%