大数据开发工程师-第七周 第3章 Flume出神入化篇


第七周 第3章 Flume出神入化篇

各种自定义组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
咱们前面讲了很多组件,有核心组件和高级组件
source、channel、sink以及Source Interceptors,Channel Selectors、Sink Processors
针对这些组件,Flume都内置提供了组件的很多具体实现,在实际工作中,95%以上的数据采集需求都是可以满足的,但是谁也不敢保证100%都能满足,因为什么奇葩的需求都会有,那针对系统内没有提供的一些组件怎么办呢?

假设我们想把flume采集到的数据输出到mysql中,那这个时候就需要有针对mysql的sink组件了,但是Flume中并没有,因为这种需求不常见,往mysql中写的都是结构化数据,数据的格式是固定的,但是flume采集的一般都是日志数据,这种属于非结构化数据,不支持也是正常的,但是我们在这里就是需要使用Flume往mysql中写数据,那怎么办?

要不我们考虑换一个采集工具把,当然这也是一种解决方案,如果有其他采集工具支持向mysql中写数据
的话那可以考虑换一个采集工具,如果所有的采集工具都不支持向mysql中写数据呢,也就是说你这个需求就是前无古人后无来者的,怎么破?

不用担心,天无绝人之路,其实咱们使用的Flume提供的那些内置组件也都是作者一行代码一行代码写出来的,那我们是不是也可以自己写一个自定义的组件呢?可以的,并且flume也很欢迎你这样去做,它把开发文档什么的东西都给你准备好了。

注意了,就算没有文档,我们也要想办法去自定义,没有文档的话就需要去抠Flume的源码了。在这里Flume针对自定义组件提供了详细的文档说明,我们来看一下
通过Flume User Guide可以看到,针对source、channle、sink、Source Interceptors,Channel Selectors、都是可以的,这里面都显示了针对自定义的组件如何配置使用

Sink Processors目前暂时不支持自定义。
那这些支持自定义的组件具体开发步骤是什么样的呢?代码该写成什么样的呢?
大家还记得Flume有两个文档链接吗?
Flume Developer Guide只不过开发者文档里面目前还不算太完善,但是基本source、sink组件的自定义过程在这里都是有的

image-20230319131119283

image-20230319131259941

1
2
3
自定义channel的内容目前还没完善,如果你确实想自定义这个组件,就需要到Flume源码中找到目前支持的那些channel的代码,参考着实现我们自定义的channel组件。
大家在这里知道可以自定义,并且知道自定义组件的文档在哪里就可以了,目前来说,需要我们自定义组件的场景实在是太少了,几乎和买彩票中奖的概率差不多。
前面我们掌握了Flume的基本使用和高级使用场景,下面我们来看一下针对Flume的一些企业级优化和监控手段

Flume优化

1
2
3
4
5
1. 调整Flume进程的内存大小,建议设置1G~2G,太小的话会导致频繁GC
因为Flume进程也是基于Java的,所以就涉及到进程的内存设置,一般建议启动的单个Flume进程(或者说单个Agent)内存设置为1G~2G,内存太小的话会频繁GC,影响Agent的执行效率。

那具体设置多少合适呢?
这个需求需要根据Agent读取的数据量的大小和速度有关系,所以需要具体情况具体分析,当Flume的Agent启动之后,对应就会启动一个进程,我们可以通过jstat -gcutil PID 1000来看看这个进程GC的信息,每一秒钟刷新一次,如果GC次数增长过快,说明内存不够用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
使用jps查看目前启动flume进程

[root@bigdata04 ~]# jps
2957 Jps
2799 Application

执行 jstat -gcutil PID 1000
[root@bigdata04 ~]# jstat -gcutil 2799 1000
S0 S1 E O M CCS YGC YGCT FGC FGCT GCT
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
1
2
3
4
5
6
7
8
在这里主要看YGC YGCT FGC FGCT GCT
YGC:表示新生代堆内存GC的次数,如果每隔几十秒产生一次,也还可以接受,如果每秒都会发生一次YGC,那说明需要增加内存了

YGCT:表示新生代堆内存GC消耗的总时间

FGC:FULL GC发生的次数,注意,如果发生FUCC GC,则Flume进程会进入暂停状态,FUCC GC执行完以后Flume才会继续工作,所以FUCC GC是非常影响效率的,这个指标的值越低越好,没有更好。

GCT:所有类型的GC消耗的总时间

修改Flume进程内存

1
2
3
4
5
6
如果需要调整Flume进程内存的话,需要调整 flume-env.s h脚本中的 JAVA_OPTS 参数
把 export JAVA_OPTS 参数前面的#号去掉才会生效。

export JAVA_OPTS="-Xms1024m -Xmx1024m -Dcom.sun.management.jmxremote"

建议这里的 Xms 和 Xmx 设置为一样大,避免进行内存交换,内存交换也比较消耗性能。

一台机器多个agent时

1
2
3
4
在一台服务器启动多个agent的时候,建议修改配置区分日志文件
因为在conf目录下有log4j.properties,在这里面指定了日志文件的名称和位置,所有使用conf目录下面配置启动的Agent产生的日志都会记录到同一个日志文件中,如果我们在一台机器上启动了10几个Agent,后期发现某一个Agent挂了,想要查看日志分析问题,这个时候就疯了,因为所有Agent产生的日志都混到一块了,压根都没法分析日志了。
所以建议拷贝多个conf目录,然后修改对应conf目录中log4j.properties日志的文件名称(可以保证多个agent的日志分别存储),并且把日志级别调整为warn(减少垃圾日志的产生),默认info级别会记录很多日志信息。
这样在启动Agent的时候分别通过–conf参数指定不同的conf目录,后期分析日志就方便了,每一个Agent都有一个单独的日志文件。
以bigdata04机器为例:
复制conf-failover目录,以后启动sink的failover任务的时候使用这个目录
修改 log4j.properties中的日志记录级别和日志文件名称,日志文件目录可以不用修改,统一使用logs目录即可。

[root@bigdata04 apache-flume-1.9.0-bin]# cp -r conf/ conf-failover
[root@bigdata04 apache-flume-1.9.0-bin]# cd conf-failover/
[root@bigdata04 conf-failover]# vi log4j.properties
.....
flume.root.logger=WARN,LOGFILE
flume.log.dir=./logs
flume.log.file=flume-failover.log
1
2
3
4
[root@bigdata04 apache-flume-1.9.0-bin]# nohup bin/flume-ng agent --name a1 --conf conf-failover --conf-file xxxx


这样就会在flume的logs目录中产生 flume-failover.log 文件,并且文件中只记录WARN和ERROR级别的日志,这样后期排查日志就很清晰了。

image-20230319135521617

1
2
3
4
[root@bigdata04 apache-flume-1.9.0-bin]# cd logs/
[root@bigdata04 logs]# ll
total 4
-rw-r--r--. 1 root root 478 May 3 16:25 flume-failover.log

Flume进程监控

1
2
3
4
Flume的Agent服务是一个独立的进程,假设我们使用source->channel->sink实现了一个数据采集落盘的功能,如果这个采集进程被误操作干掉了,这个时候我们是发现不了的,什么时候会发现呢?
可能第二天,产品经理找到你了,说昨天的这个指标值有点偏低啊,你来看下怎么回事,然后你就一顿操作猛如虎,结果发现原始数据少了一半多,那是因为Flume的采集程序在昨天下午的时候被误操作干掉了。

找到问题之后,你就苦巴巴的手工去补数据,重跑计算程序,最后再找产品经理确认数据的准确性。类似的问题会有很多,这说明你现在是无法掌控你手下的这些程序,他们都是不受控的状态,说不定哪天哪个程序不高兴,他就自杀了,不干活了,过了好几天,需要用到这个数据的时候你才发现,发现的早的话还能补数据,发现晚的话数据可能都补不回来了,这样对公司来说就是属于比较严重的数据故障问题,这样你年终奖想拿18薪就不太现实了。
1
2
3
4
5
6
所以针对这些存在单点故障的进程,我们都需要添加监控告警机制,最起码出问题能及时知道,再好一点的呢,可以尝试自动修复重启。

那针对Flume中的Agent我们就来实现一个监控功能,并且尝试自动重启
大致思路是这样的,
1. 首先需要有一个配置文件,配置文件中指定你现在需要监控哪些Agent
2. 有一个脚本负责读取配置文件中的内容,定时挨个检查Agent对应的进程还在不在,如果发现对应的进程不在,则记录错误信息,然后告警(发短信或者发邮件) 并尝试重启

编写监控相关程序

配置文件

1
2
[root@bigdata04 myconfFile]# vim monlist.conf 
load-failover.conf=startExample.sh # 等号处空格不能要,monlist.sh要出错

进程启动脚本

1
2
3
4
[root@bigdata04 myconfFile]# vim startExample.sh 
#!/bin/bash
flume_path=/data/soft/apache-flume-1.9.0-bin # 等号处空格不能要
nohup ${flume_path}/bin/flume-ng --name a1 --conf ${flume_path}/conf --conf-file ${flume_path}/conf/myconfFile/load-failover.conf &

监控脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@bigdata04 myconfFile]# vim monlist.sh 
#!/bin/bash
monlist=`cat monlist.conf` #等号那里不能有空格
echo "start check"
for item in ${monlist}
do
# 设置字段分隔符
OLD_IFS=$IFS
IFS="="
# 把一行内容转成多列[数组]
arr=($item)
# 获取等号左边的内容
name=${arr[0]}
# 获取等号右边的内容
script=${arr[1]}
echo "time is:"`date +"%Y-%m-%d %H:%M:%S"`" check "$name
if [ `jps -m|grep $name | wc -l` -eq 0 ]
then
# 发短信或者邮件告警
echo `date +"%Y-%m-%d %H:%M:%S"`$name "is none"
sh -x ./${script}
fi
done

设置定时检查

1
2
3
注意:这个需要定时执行,所以可以使用crontab定时调度

* * * * * root /bin/bash /data/soft/monlist.sh

本文标题:大数据开发工程师-第七周 第3章 Flume出神入化篇

文章作者:TTYONG

发布时间:2022年02月16日 - 22:02

最后更新:2023年03月19日 - 14:03

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E4%B8%83%E5%91%A8-%E7%AC%AC3%E7%AB%A0-Flume%E5%87%BA%E7%A5%9E%E5%85%A5%E5%8C%96%E7%AF%87.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%