大数据开发工程师-第七周 第2章 极速上手Flume使用


第七周 第2章 极速上手Flume使用

Flume的Hello World!

1
2
3
4
5
6
7
8
9
10
下面我们就想上手操作Flume,具体该怎么做呢?
先来看一个入门级别的Hello World案例。
我们前面说了,启动Flume任务其实就是启动一个Agent,Agent是由source、channel、sink组成的,这些组件在使用的时候只需要写几行配置就可以了
那下面我们就看一下source、channel、sink该如何配置呢?

接下来带着大家看一下官网
找到左边的documentation,查看文档信息

其实Flume的操作文档是非常良心的,整理的非常详细,给flume的维护者们点个赞。
进入Flume User Guide

image-20230317132510977

H5YXR0.md.png

image-20230317132647786

flume配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
下面有一个Agent配置的例子:

# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
这个例子中首先定义了source的名字、sink的名字还有channel的名字

下面配置source的相关参数
下面配置了sink的相关参数
接着配置了channel的相关参数

最后把这三个组件连接到了一起,就是告诉source需要向哪个channel写入数据,告诉sink需要从哪个
channel读取数据,这样source、channel、sink这三个组件就联通了。

总结下来,配置Flume agent的主要流程是这样的
1. 给每个组件起名字
2. 配置每个组件的相关参数
3. 把它们联通起来

注意了,在Agent中配置的三大组件为什么要这样写呢?如果我是第一次使用我也不会写啊。
三大组件的配置在文档中是有详细说明的,来看一下,在Flume Sources下面显示的都是已经内置支持的
Source组件

H5YOGq.png

source

NetCat Tcp
1
咱们刚才看的案例中使用的是source类型是netcat,其实就是NetCat TCP Source,看一下详细内容

H5YLin.png

1
2
3
4
5
6
7
8
9
10
11
12
13
这里面的粗体字体是必选的参数
第一个参数是为了指定source需要向哪个channel写数据,这个其实是通用的参数,
主要看下面这三个,type、bind、port
type:类型需要指定为natcat
bind:指定当前机器的ip,使用hostname也可以
port:指定当前机器中一个没有被使用的端口

指定bind和port表示开启监听模式,监听指定ip和端口中的数据,其实就是开启了一个socket的服务端,
等待客户端连接进来写入数据

在这里给agent起名为a1,所以netcat类型的配置如下,这里面还指定了source、channel的名字,并且把
source和channel连接到一起了,刨除这几个配置之外就剩下了三行配置,就是刚才我们分析的那三个必
填参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

注意了,bind参数后面指定的ip是四个0,这个当前机器的通用ip,因为一台机器可以有多个ip,例如:
内网ip、外网ip,如果通过bind参数指定某一个ip的话,表示就只监听通过这个ip发送过来的数据了,这
样会有局限性,所以可以指定0.0.0.0。

下面几个参数都是可选配置,默认可以不配置。

接着是channel,案例中channel使用的是memory

channel

memory channel

H5t5f1.png
H5t76K.md.png

1
这里面只有type是必填项,其他都是可选的

sink

logger sink
1
2
3
最后看一下sink,在案例中sink使用的是logger,对应的就是Logger Sink

logger sink中默认也只需要指定type即可

H5NajK.png
H5NUc6.md.png

1
2
3
4
5
后期我们如果想要使用其他的内置组件,直接到官网文档这里查找即可,这里面的配置有很多,没有必要去记,肯定记不住,只要知道到哪里去找就可以,工作的时候又不是闭卷考试,官网是可以随便使用的,

所以建议大家到官网找到配置之后直接拷贝,要不然自己手写很容易出错。

配置文件分析完了,可以把这些配置放到一个配置文件中,起名叫example.conf,把这个配置文件放到conf/ 目录下。

Flume配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 注意:这里没有s,真他妈操蛋
1
注意了,这个配置文件中的a1表示是agent的名称,还有就是port指定的端口必须是未被使用的,可以先查询一下当前机器使用了哪些端口,端口的可用范围是1-65535,如果懒得去查的话,就尽量使用偏大一些的端口,这样被占用的概率就非常低了

启动Agent

1
2
Agent配置好了以后就可以启动了,下面来看一下启动Agent的命令
可以使用命令:
1
2
3
4
5
6
7
8
9
10
bin/flume-ng agent --name a1 --conf conf --conf-file example.conf -Dflume.root.logger=INFO,console
(经过实验,必须在apche安装目录下执行; 在bin下执行有问题)

这里面使用flume-ng命令

后面指定agent,表示启动一个Flume的agent代理
--name:指定agent的名字
--conf:指定flume配置文件的根目录
--conf-file:指定Agent对应的配置文件(包含source、channel、sink配置的文件)
-D:动态添加一些参数,在这里是指定了flume的日志输出级别和输出位置,INFO表示日志级别,
1
2
3
4
5
6
7
8
注意了,其实agent的启动命令还可以这样写
bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template


这里面的-n属于简写,完整的写法就是–name
-c完整写法的–conf
-f完整写法是–conf-file
所以以后看到这两种写法要知道他们都是正确的写法。
1
2
3
启动Agent
在这里我们使用完整的写法,看起来清晰一些
注意了,由于配置文件里面指定了agent的名称为a1,所以在–name后面也需要指定a1,还有就是通过–conf-file指定配置文件的时候需要指定conf目录下的example.conf配置文件
1
启动之后会看到如下信息,表示启动成功,启动成功之后,这个窗口会被一直占用,因为Agent服务一直在运行,现在属于一个前台进程

image-20230317134908448

1
如果看到提示的有ERROR级别的日志信息,就需要具体问题具体分析了,一般都是配置文件配置错误了。

开启的socket服务端

1
2
3
4
5
6
接下来我们需要连接到source中通过netcat开启的socket服务端
克隆一个bigdata04的会话,因为前面启动Agent之后,窗口就被占用了

使用telnet命令可以连接到指定socket服务,telnet后面的主机名和端口是根据example.conf配置文件中配置的

注意:如果提示找不到telnet命令,则需要使用yum在线安装

image-20230317135723781

1
回到Agent所在的窗口,可以看到下面多了一行日志,就是我们在telnet中输入的内容

image-20230317163259384

使其它机器也可以telnet连接

1
2
3
4
5
6
按 ctrl+c 断开telnet连接
重新使用telnet连接,此时不使用localhost,使用本机的内网ip可以吗? 192.168.182.103

[root@bigdata04 ~]# telnet 192.168.182.103 44444
Trying 192.168.182.103...
telnet: connect to address 192.168.182.103: Connection refused
1
2
3
4
所以此时Agent中source的配置在使用的时候就受限制了,在开启telnet客户端的时候就只能在本地开启了,无法在其他机器上使用,因为source中绑定的ip是localhost。

如果想要支持一个网络内其它机器上也可以使用telnet链接的话就需要修改bind参数指定的值了最直接的就是指定192.168.182.103这个内网ip,其实还有一种更加通用的方式是指定0.0.0.0,此时表示会监听每一个可用的ip地址,所以在绑定ip端口时,ip通常都会使用0.0.0.0
那在这里我们把example.conf中的localhost改为0.0.0.0
1
2
3
4
5
6
7
8
按ctrl+c停止刚才启动的agent

a1.sources.r1.bind = 0.0.0.0

启动Agent

在另一个会话窗口中使用telnet连接
[root@bigdata04 ~]# telnet 192.168.182.103 44444

image-20230317140821644

1
此时可以在其他机器上使用telnet连接也可以,在bigdata01机器上

image-20230317140840936

Flume Agent后台运行

1
2
3
4
5
6
7
但是注意了,此时Flume中Agent服务是在前台运行,这个服务实际工作中需要一直运行,所以需要放到后台运行。

Flume自身没有提供直接把进程放到后台执行的参数,所以就需要使用咱们前面学习的nohup和&了。

此时就不需要指定-Dflume.root.logger=INFO,console参数了,默认情况下flume的日志会记录到日志文件中。

停掉之前的Agent,重新执行
1
2
3
4
[root@bigdata04 apache-flume-1.9.0-bin]# nohup bin/flume-ng agent --name a1 --conf conf --conf-file xxx &

启动之后,通过jps命令可以查看到一个application进程,这个就是启动的Agent
[root@bigdata04 apache-flume-1.9.0-bin]# jps

image-20230317141323680

1
2
3
这样看起来不清晰,如果后期启动了多个Agent,都分不出来哪个是哪个了
可以在jps后面加上参数 -ml,这样可以看到启动时指定的一些参数信息
[root@bigdata04 apache-flume-1.9.0-bin]# jps -m

image-20230317141421264

1
2
或者使用ps命令也可以
[root@bigdata04 apache-flume-1.9.0-bin]# ps -ef|grep flume

image-20230317141259623

1
2
3
哪个都可以,条条道路通罗马,具体就看你个人喜好了,ps命令显示的内容更为详细。

此时需要想要停止这个Agent的话就需要使用kill命令了

日志

1
这个Agent中的sink组件把数据以日志的方式写出去了,所以这个数据默认就会进入到flume的日志文件中,那我们来看一下flume的日志文件在flume的logs目录中有一个flume.log

image-20230317141805134

1
再使用telnet向里面输入一些数据

image-20230317142036506

1
2
3
4
再回agent看
[root@bigdata04 logs]# tail -2 flume.log

如果配置文件里不是配置的logger,在这个日志文件里就看不见了

image-20230317142128502

1
此时需要想要停止这个Agent的话就需要使用kill命令了

image-20230317142209964

案例:采集文件内容上传至HDFS

1
2
3
4
5
6
7
8
9
10
接下来我们来看一个工作中的典型案例:
采集文件内容上传至HDFS

需求:采集目录中已有的文件内容,存储到HDFS
分析:source是要基于目录的,channel建议使用file,可以保证不丢数据,sink使用hdfs

下面要做的就是配置Agent了,可以把example.conf拿过来修改一下,新的文件名为file-to-hdfs.conf

首先是基于目录的source,咱们前面说过,Spooling Directory Source可以实现目录监控
来看一下这个Spooling Directory Source

Source

image-20230317142817723

image-20230317144229651

1
2
3
4
5
channels和type肯定是必填的,还有一个是spoolDir,就是指定一个监控的目录

看他下面的案例,里面还多指定了一个fileHeader,这个我们暂时也用不到,后面等我们讲了Event之后大家就知道这个fileHeader可以干什么了,先记着有这个事把。

那来配置一下source
1
2
3
4
5
6
7
8
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/log/studentDir

Channel

1
2
3
4
5
6
接下来是channel了

channel在这里使用基于文件的,可以保证数据的安全性
如果针对采集的数据,丢个一两条对整体结果影响不大,只要求采集效率,那么这个时候完全可以使用基于内存的channel

咱们前面的例子中使用的是基于内存的channel,下面我们到文档中找一下基于文件的channel

image-20230317144559907

image-20230317144652517

1
2
3
4
根据这里的例子可知,主要配置checkpointDir和dataDir,因为这两个目录默认会在用户家目录下生成,建议修改到其他地方

checkpointDir是存放检查点目录
data是存放数据的目录

image-20230317145305755

Sink

1
2
最后是sink
因为要向hdfs中输出数据,所以可以使用hdfssink

image-20230317145429105

image-20230317145448653

image-20230317150059393

image-20230317145512380

1
2
3
4
5
6
7
8
9
10
11
hdfs.path是必填项,指定hdfs上的存储目录
看这里例子中还指定了filePrefix参数,这个是一个文件前缀,会在hdfs上生成的文件前面加上这个前缀,这个属于可选项,有需求的话可以加上

一般在这我们需要设置writeFormat和fileType这两个参数
默认情况下writeFormat的值是Writable,建议改为Text,看后面的解释,如果后期想使用hive或者impala操作这份数据的话,必须在生成数据之前设置为Text,Text表示是普通文本数据

fileType默认是SequenceFile,还支持DataStream和 CompressedStream ,DataStream 不会对输出
数据进行压缩,CompressedStream 会对输出数据进行 压缩,在这里我们先不使用压缩格式的,所以选择DataStream

除了这些参数以外,还有三个也比较重要
hdfs.rollInterval、hdfs.rollSize和hdfs.rollCount
1
2
3
4
5
6
7
8
hdfs.rollInterval默认值是30,单位是秒,表示hdfs多长时间切分一个文件,因为这个采集程序是一直运行的,只要有新数据,就会被采集到hdfs上面,hdfs默认30秒钟切分出来一个文件,如果设置为0表示不按时间切文件

hdfs.rollSize默认是1024,单位是字节,最终hdfs上切出来的文件大小都是1024字节,如果设置为0表示不按大小切文件

hdfs.rollCount默认设置为10,表示每隔10条数据切出来一个文件,如果设置为0表示不按数据条数切文件

这三个参数,如果都设置的有值,哪个条件先满足就按照哪个条件都会执行。
在实际工作中一般会根据时间或者文件大小来切分文件,我们之前在工作中是设置的时间和文件大小相结合,时间设置的是一小时,文件大小设置128M,这两个哪个满足执行哪个

image-20230317150814090

1
把Agent的配置保存到flume的conf目录下的 file-to-hdfs.conf 文件中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[root@bigdata04 conf]# vi file-to-hdfs.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/log/studentDir

# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/student
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/studentDir/d

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/flume/studentDir
a1.sinks.k1.hdfs.filePrefix = stua1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动Agent

1
2
3
4
5
6
7
8
9
下面就可以启动agent了,在启动agent之前,先初始化一下测试数据
创建/data/log/studentDir目录,然后在里面添加一个文件,class1.dat

class1.dat中存储的是学生信息,学生姓名、年龄、性别

[root@bigdata04 studentDir]# more class1.dat
jack 18 male
jessic 20 female
tom 17 male
1
2
启动Hadoop集群
start-all.sh

image-20230317151754428

image-20230317151821211

image-20230317151956938

1
启动Agent,使用在前台启动的方式,方便观察现象

让flume所在节点成为hadoop的客户端节点

1
2
3
4
5
6
7
8
9
10
11
12
但是发现在启动的时候报错,提示找不到SequenceFile,但是我们已经把fileType改为了DataStream,

但是Flume默认还是会加载这个类
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.SequenceFile$CompressionT
ype

就算你把SequenceFile相关的jar包都拷贝到flume的lib目录下解决了这个问题,但是还是会遇到找不到HDFS这种文件类型,还是缺少hdfs相关的jar包
No FileSystem for scheme: hdfs

当然这个问题也可以通过拷贝jar包来解决这个问题,但是这样其实太费劲了,并且后期我们有很大可能需要在这个节点上操作HDFS,所以其实最简单直接的方法就是把这个节点设置为hadoop集群的一个客户端节点,这样操作hdfs就没有任何问题了。

咱们之前在讲Hadoop的时候讲了客户端节点的特性,其实很简单,我们直接把集群中修改好配置的hadoop目录远程拷贝到bigdata04上就可以了。

image-20230317153529320

1
2
3
由于bigdata01和bigdata04没有做免密码登录,也不认识它的主机名,所以就使用ip,并且输入密码了。

拷贝完成之后到bigdata04节点上验证一下

image-20230317153608211

1
注意:还需要修改环境变量,配置HADOOP_HOME,否则启动Agent的时候还是会提示找不到SequenceFile

image-20230317153723631

image-20230317154215124

1
2
这里报错的原因是:直接使用/,不使用hdfs://xxxx.xxxx.xxxx.xxxx:9000/
时,会去读core-site.xml中的值;而此时bigdata04的hosts中并没有配置bigdata01的相关信息
1
再重新启动Agent

image-20230317151956938

1
此时可以看到Agent正常启动

image-20230317155135112

image-20230317155226589

1
2
3
4
5
此时发现文件已经生成了,只不过默认情况下现在的文件是 .tmp 结尾的,表示它在被使用,因为Flume只要采集到数据就会向里面写,这个后缀默认是由 hdfs.inUseSuffix 参数来控制的。

文件名上还拼接了一个当前时间戳,这个是默认文件名的格式,当达到文件切割时机的时候会给文件改名字,去掉.tmp

这个文件现在也是可以查看的,里面的内容其实就是class1.dat文件中的内容

image-20230317155717356

image-20230317155344885

1
2
3
4
5
6
所以此时Flume就会监控linux中的/data/log/studentDir目录,当发现里面有新文件的时候就会把数据采集过来。
那Flume怎么知道哪些文件是新文件呢?它会不会重复读取同一个文件的数据呢?

不会的,我们到/data/log/studentDir目录看一下你就知道了

我们发现此时这个文件已经被加了一个后缀.COMPLETED,表示这个文件已经被读取过了,所以Flume在读取的时候会忽略后缀为.COMPLETED的文件。

image-20230317155537841

1
2
3
接着我们再看一下channel中的数据,因为数据是存在本地磁盘文件中的,所以是可以去看一下的,进入dataDir指定的目录

cd /data/soft/apache-flume-1.9.0-bin/data/studentDir/

image-20230317160415553

image-20230317160443780

1
2
3
4
发现里面有一个 log-1 的文件,这个文件中存储的其实就是读取到的内容,只不过在这无法直接查看。

现在我们想看一下Flume最终生成的文件是什么样子的,难道要根据配置等待1个小时或者弄一个128M的文件过来吗,其实也没必要,我们可以暴力操作一下
停止Agent就可以看到了,当Agent停止的时候就会去掉 .tmp 标志了

image-20230317160628611

1
2
3
4
5
那我再重启Agent之后,会不会再给加上.tmp呢,不会了,每次停止之前都会把所有的文件解除占用状态,下次启动的时候如果有新数据,则会产生新的文件,这其实就模拟了一下自动切文件之后的效果。

但是这个文件看起来比较别扭,连个后缀都没有,没有后缀倒不影响使用,就是看起来不好看

在这给大家留一个作业,下一次再生成新文件的时候我希望文件名有一个后缀是.log,大家下去之后自己查看官网文档资料,修改Agent配置,添加测试数据,验证效果。

image-20230317160819391

image-20230317160900163

1
答案:其实就是使用hdfs sink中的hdfs.fileSuffix参数

案例:采集网站日志上传至HDFS

文章标题(可选)

本文标题:大数据开发工程师-第七周 第2章 极速上手Flume使用

文章作者:TTYONG

发布时间:2022年02月16日 - 22:02

最后更新:2023年03月17日 - 17:03

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E4%B8%83%E5%91%A8-%E7%AC%AC2%E7%AB%A0-%E6%9E%81%E9%80%9F%E4%B8%8A%E6%89%8BFlume%E4%BD%BF%E7%94%A8.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%