一、Flume安装

1)下载
官网下载地址:http://flume.apache.org/download.html
这里我们下载1.9版本的:

2)上传至/usr/flume目录,然后执行如下指令解压:

 tar -zxvf apache-flume-1.9.0-bin.tar.gz 

3)配置Flume的环境变量,在/etc/profile中新增如下:

 #配置新增 export FLUME_HOME=/usr/flume/apache-flume-1.9.0-bin export PATH=$PATH:$FLUME_HOME/bin #执行生效指令 source /etc/profile 

到这里,Flume就安装完成了。

二、Flume典型应用

1、本地数据读取

首先我们来看一个从本地目录下搜集数据的实例。具体操作步骤如下:
1)在flume目录下新建名为flume1.conf的配置文件,内容如下:

 #指定sources、sinks、channels a1.sources=r1 a1.sinks=k1 a1.channels=c1 #配置Source a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/usr/flume/source #配置Sink a1.sinks.k1.type=file_roll a1.sinks.k1.sink.directory=/usr/flume/sink #将Channel类型设置为memory a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 #把Source和Sink绑到Channel上 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 

此配置定义了一个名为a1的代理。a1具有侦听端口/usr/flume/source目录中的数据的源,在内存中缓冲事件数据的通道以及将事件数据记录到/usr/flume/sink目录。配置文件为各个组件命名,然后描述它们的类型和配置参数。给定的配置文件可能会定义几个命名的代理。当启动给定的Flume进程时,会传递一个标志,告诉它要显示哪个命名的代理。
2)创建相应的文件夹
/usr/flume/下分别创建source(作为数据源)和sink(作为输出目录)文件夹,

 mkdir source mkdir sink 

3)启动Flume代理
使用flume-ng命令启动Flume代理,同时在console控制台打印日志,命令如下:

 flume-ng agent --conf conf --conf-file /usr/flume/flume1.conf --name a1 -Dflume.root.logger=INFO,console 

4)测试数据
/usr/flume/source文件夹下导入一个test.txt文本文件Flume客户端会有如下显示:

一旦Flume成功启动并完成日志收集,再查看source文件夹的test.txt文件时,则变成了test.txt.COMPLETED,sink文件夹中会收集到文件。

2、收集至HDFS

1)在前一个案例基础上,我们参考flume1.conf配置,新建flume2.conf配置文件如下:

 #指定sources、sinks、channels a1.sources=r1 a1.sinks=k1 a1.channels=c1 #配置Source a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/usr/flume/source #配置Sink a1.sinks.k1.type=hdfs #配置收集后的文件,放在HDFS的哪个位置 a1.sinks.k1.hdfs.path=hdfs://master:9820/flume/data a1.sinks.k1.hdfs.rollInterval=0 a1.sinks.k1.hdfs.rollSize=10240000 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=3 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.round=true a1.sinks.k1.hdfs.roundValue=10 a1.sinks.k1.hdfs.roundUnit=minute a1.sinks.k1.hdfs.useLocalTimeStamp=true #将Channel类型设置为memory a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 #把Source和Sink绑到Channel上 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 

2)启动hadoop

 start-all.sh 

3)启动Flume代理

 flume-ng agent --conf conf --conf-file /usr/flume/flume2.conf --name a1 -Dflume.root.logger=INFO,console 

4)在source目录下新建test2.txt文件,这时我们可能或发现如下报错:

 2021-05-06 17:57:19,198 ERROR hdfs.HDFSEventSink: process failed java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357) at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338) at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1679) at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221) at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) 

这是由于flume内依赖的guava.jar和hadoop内的版本不一致会造成,我们需要查看hadoop安装目录下share/hadoop/common/libguava.jar版本,再查看flume安装目录下libguava.jar的版本,如果两者不一致,删除版本低的,并拷贝高版本过去。此处我们发现flume中版本为guava-11.0.2.jar,hadoop中版本为guava-27.0-jre.jar,所以,此处我把flume中的删除,把hadoop中的复制过去

 #删除 rm /usr/flume/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar #复制 cp /usr/hadoop/hadoop-3.2.1/share/hadoop/common/lib/guava-27.0-jre.jar /usr/flume/apache-flume-1.9.0-bin/lib/ 

5)重启flume,发现正常采集到hdfs

3、基于日期分区的数据收集

有些时候,由于收集的文件很大,或者因业务需求,需要将收集到的数据按照日期分开存储。本案例基于上面的HDFS收集,再根据年、月、日、时分分别生成文件,具体操作步骤如下:
1)在前一个案例基础上,我们参考flume2.conf配置,新建flume3.conf配置文件如下:

 #指定sources、sinks、channels a1.sources=r1 a1.sinks=k1 a1.channels=c1 #配置Source a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/usr/flume/source #配置Sink a1.sinks.k1.type=hdfs #配置收集后的文件,放在HDFS的哪个位置,并按日期分别存储 a1.sinks.k1.hdfs.path=hdfs://master:9820/flume/data/%Y-%m-%d/%H%M a1.sinks.k1.hdfs.rollInterval=0 a1.sinks.k1.hdfs.rollSize=10240000 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=3 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.round=true a1.sinks.k1.hdfs.roundValue=10 a1.sinks.k1.hdfs.roundUnit=minute a1.sinks.k1.hdfs.useLocalTimeStamp=true #将Channel类型设置为memory a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 #把Source和Sink绑到Channel上 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1 

2)启动hadoop和flume代理:

 flume-ng agent --conf conf --conf-file /usr/flume/flume3.conf --name a1 -Dflume.root.logger=INFO,console 

3)在source添加test3.txt文件,发现正常采集到hdfs中并按日期分别存储