日志采集工具Flume的部署
一. Flume概述与核心架构
1.1 概述
系统特点:
Flume是一个分布式、高可靠、高可用的海量日志数据的采集、聚合和传输系统。数据源支持:能够采集多种形式的数据源(如
socket数据包、日志文件、日志目录、Kafka等),这些在Flume中被称为source。数据输出:
channel是作为source(数据源)和sink(数据输出目的地)之间的缓冲和临时存储机制,sink(数据输出目的地)包括HDFS、HBase、Hive、Kafka等。
1.2 基本架构与组件
核心角色 -
Agent:Flume的基本架构中心是Agent(代理),一个运行在JVM进程中的关键组件。Agent负责协调和管理数据从外部源到目的地的整个流程。组件构成
Memory Channel:存储数据于JVM堆内存中,适用于快速数据流,但存在数据丢失风险。File Channel:将数据存储在本地磁盘,适合大量数据处理,提供持久化存储。Kafka Channel:用于Kafka集成,将数据存储在Kafka中。Source(源头):数据流的起点,负责从各种数据源获取数据,并将其以事件(event)的形式发送到Channel中。Channel(通道):作为数据传输的中间环节,负责接收来自Source的数据,并且临时存储这些数据,直到它们被Sink处理。Channel可以是基于内存的缓冲区或基于文本持久化存储。Sink(目的地):数据流的终点,从Channel中提取数据,并执行后续操作,如存储到数据库、发送到远程服务器或转换数据格式。
1.3 系统特性
分布式设计:
Flume是由多个agent连接起来形成的分布式系统。每个agent都是独立工作的,负责特定部分的数据处理。数据传递与管理:每个
agent在内部包含Source、Channel和Sink三个核心组件,共同负责数据的传递和管理。
event /ɪˈvent/ 事件 sink /sɪŋk/ 下沉 channel /ˈtʃænl/ 渠道,传输 Agent /ˈeɪdʒənt/ 代理

主要作用:
实时读取服务器本地磁盘或指定端口的数据,将数据写入到HDFS或者kafka。

1.4 Flume Event的概念
Source 到 Channel 到 Sink之间传递数据的形式是Event事件;Event事件是一个基本数据流单元。

Event的构成
Key(Header):是数据的元数据,包含如时间戳、数据来源等描述信息。
Value(Body):则是数据的具体内容。
基本单元:在Flume中,Event是数据流的基本单位。
键值对结构:Event包含两个部分,即Key(Header)和Value(Body)。
类比解释
邮件内容(Body):对应Event的具体数据内容。
邮件主题(Subject):对应Event的Header,包含了数据的元数据,比如来源和时间戳等。
电子邮件类比:可以将Flume的Event比作一封电子邮件。
具体示例
Flume Agent:类似于邮递员,负责采集、传输、处理和存储数据。
日志Event:类似于一封邮件,每条访问日志被视作一个Event。
日志内容和元数据:日志的具体内容对应邮件的主体,而服务器名和时间戳等元数据则对应邮件的主题。
目的地:类似于邮件的收件地址,将访问日志从多个服务器采集并写入HDFS。
访问日志的采集和存储过程:
通俗地讲,Flume中的Agent和Event之间的关系可以理解为传输与处理的关系。Event是Agent中用于传输、处理的基本数据单位,而Agent则负责控制Event的传输与处理过程。
这样的表达更顺畅些,内容上是正确的。你很准确地理解了Flume中Event和Agent的功能和关系!
1.5 Flume采集系统结构图
1.简单结构
单个agent采集数据
Flume系统的核心角色是agent,它是一个Java进程,通常运行在日志收集节点上,以事件的形式将数据从源头传输到目的地

2. 复杂结构
多级Agent之间串联

使用场景:
一家大型购物网站在北京、上海和广州分别运行了三个Web服务器,每个服务器生成访问日志和用户行为数据。该网站希望将所有日志汇总到HDFS中,供后续分析和推荐模型训练使用。
示例分布:
数据采集(一级Agent)
北京的Agent1、上海的Agent2、广州的Agent3分别从各自的Web服务器收集日志数据,并将数据打包成Event。
数据汇总(二级Agent)
Agent1、Agent2和Agent3将日志数据传输到中央服务器上的Agent4,进行汇总。
数据存储(HDFS)
Agent4将汇总后的数据发送到HDFS,供数据团队进行分析和模型训练。
这个结构实现了不同区域的日志数据统一收集和存储,方便后续分析处理。
二.flume的部署
2.1 伪分布环境-安装flume
伪分布式环境指的是在一台机器上模拟多台机器的分布式环境,用于学习和测试分布式系统。
2.2 集群环境-安装flume
2.2.1 规划
三台主机的主机名分别为master,slave1,slave2(且三台主机的防火墙已关闭)
Flume下载地址: flume下载官网
2.2.2 解压安装包
上传apache-flume-1.9.0-bin.tar.gz在master上的/opt/software文件夹下
# 解压
cd /opt/software
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/apps
# 改名
cd /opt/apps
mv ./apache-flume-1.9.0-bin ./flume #将解压的文件修改名字为flume,简化操作2.2.3 配置环境变量
# 在master上打开环境变量
vi /etc/profile
# 添加以下内容
# Flume
export FLUME_HOME=/opt/apps/flume
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin
# 分发环境变量文件到 slave1 和 slave2 主机
scp -r /etc/profile slave1:/etc
scp -r /etc/profile slave2:/etc
# 分别在master/slave1/slave2主机运行下面的命令,使环境变量生效:
source /etc/profile2.2.4 配置flume-env.sh运行环境变量
cd /opt/apps/flume/conf
# 复制文件
cp ./flume-env.sh.template ./flume-env.sh
# 编辑文件
vi ./flume-env.sh
# 添加以下内容
export JAVA_HOME=/opt/apps/jdk2.2.5 解决Flume 和 Hadoop 使用的 Guava库版本不一致而导致的冲突
# 修改guava文件名,使之失效
mv /opt/apps/flume/lib/guava-11.0.2.jar /opt/apps/flume/lib/guava-11.0.2.jar.bf
# 复制hadoop下的高版本guava库到Flume中
cp /opt/apps/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /opt/apps/flume/lib/2.2.6 查看flume版本信息
# 查看flume版本信息
flume-ng version
# 运行结果
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523可能出现的问题:在查询版本时报下图的错误

问题描述: 如果系统安装了 HBase,运行该命令时可能会遇到错误,提示找不到或无法加载主类 org.apache.flume.tools.GetJavaProperty。这是因为 HBase 的 HBASE_CLASSPATH 设置会干扰 Flume 的类路径。
解决方法:
# 修改hbase-env.sh配置文件
cd /opt/apps/hbase/conf
vi hbase-env.sh更改如下:
# 修改 HBASE_CLASSPATH 配置
export HBASE_CLASSPATH_PREFIX=/opt/apps/hadoop/etc/hadoop2.2.7 创建日志文件目录和检查点目录
# 创建日志文件目录logs目录(必须为空,否则后面会报错)
cd /opt/apps/flume
mkdir logs
# 创建检查点目录(还原点,用于恢复数据)
cd /opt/apps/flume
mkdir -p data/flume/checkpoint解释:
1.日志文件目录:
确保
logs目录为空,因为 Flume 可能会在启动时检查此目录是否有遗留数据或权限问题。空的logs目录可以确保 Flume 启动时不会因该目录出错。2.检查点目录
checkpoint的作用:
这个目录是 Flume 的 File Channel 的检查点目录,用于在 Flume 出现异常停止或系统宕机时恢复数据。
用途:存储 Channel 中未消费的事件及相关元数据,确保在 Flume 重启后能够恢复到上一次处理的状态,保证数据的持久性和可靠性。
2.2.8 分发Flume到slave各节点
把在master节点配置好的flume分发到slave各节点上(slave1,slave2)
scp -r /opt/apps/flume/ slave1:/opt/apps/
scp -r /opt/apps/flume/ slave2:/opt/apps/至此,安装flume(集群模式)完成
实验1:使用 Avro Source 进行分布式数据采集
【实验任务】
该实验将演示如何配置 Flume 在集群中运行,数据将从 master 上的 Avro 客户端发送到 slave1 上的 Flume Agent,并最终存储到 HDFS 中。
Avro:Avro 客户端把数据转换成二进制 Avro 格式后在分布式系统中进行传输。
【实验目标】
使用
Flume在集群环境中采集数据;配置
Flume Agent监听Avro客户端数据;将数据存储到
HDFS中。
【实验步骤】
步骤 1:在 slave1 上配置 Flume Agent
创建配置文件:
cd /opt/apps/flume/conf
# 创建空文件
touch test1_avro_hdfs.conf在
slave1的 Flume 配置目录/opt/apps/flume/conf下创建配置文件test1_avro_hdfs.conf。编辑配置文件:
# 定义 Flume Agent 的组件
# a1 是该 Agent 的名称,用于后续配置中指代此 Agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置 Source 组件 r1
# Source 是数据的输入端,负责从 Avro 客户端接收数据
a1.sources.r1.type = avro
# 绑定 IP 地址为 0.0.0.0,表示源(source)将监听并接受来自所有网络接口的连接。
a1.sources.r1.bind = 0.0.0.0
# Avro Source 监听的端口号 4141,以接收来自avro-client的数据。
a1.sources.r1.port = 4141
# 定义拦截器,为消息添加时间戳
a1.sources.r1.interceptors = i1
# 指定的拦截器类型,并且它会为每个事件添加一个时间戳属性,这个时间戳将反映事件被 Flume source 处理的时间
a1.sources.r1.interceptors.i1.type = timestamp
# 配置 Sink 组件 k1
# Sink 是数据的输出端,用于将数据写入 HDFS
a1.sinks.k1.type = hdfs
# 设置 HDFS 路径,将数据存储在 master 的 HDFS 的 /flume_logs/ 目录中
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/test1/%Y%m%d
# 文件类型为 DataStream,表示数据按流方式写入 HDFS
a1.sinks.k1.hdfs.fileType = DataStream
# 设置写入的文件格式为文本
a1.sinks.k1.hdfs.writeFormat = Text
# 每次批量写入1000条(events)到 HDFS
# 即在数据传输过程中,Sink组件会累积100条(events)后再一次性批量写入 HDFS,减少逐条写数据时I/O操作的开销
a1.sinks.k1.hdfs.batchSize = 100
# 文件滚动间隔设置为 0,表示不断开文件写入(实时写入数据)
a1.sinks.k1.hdfs.rollInterval = 0
# 配置 Channel 组件 c1
# Channel 是 Flume 中用于连接 Source 和 Sink 的通道,存储传输过程中的数据
# # 配置Channel通道类型为 memory,用于在内存中缓存数据
a1.channels.c1.type = memory
# Channel 的容量最多存储1000条事件(events)
# 当(events)事件数达到1000时,Channel将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件。
a1.channels.c1.capacity = 1000
# source 和 sink 从内存 channel 每次事务传输的最大事件数量为1000(events)
a1.channels.c1.transactionCapacity = 100
## 将 Source 与 Sink 和 Channel 连接起来
# 将 source r1 绑定到 channel c1,使数据从 source r1 发送到 channel c1
a1.sources.r1.channels = c1
# 将 sink k1 绑定到 channel c1,使 sink k1 从 channel c1 接收数据
a1.sinks.k1.channel = c1在
test1_avro_hdfs.conf中输入以下内容:
步骤 2:在 slave1 上启动 Flume Agent
cd /opt/apps/flume
# 启动 Flume Agent,加载 `test1_avro_hdfs.conf` 配置文件
# 实现从数据源(Source)接收数据并通过通道(Channel)将数据传输到目标(Sink)
# 即将数据从 Avro Source 接收到后存入 HDFS。
flume-ng agent \
-n a1 \
-c /opt/apps/flume/conf \
-f /opt/apps/flume/conf/test1_avro_hdfs.conf \
-Dflume.root.logger=INFO,console解释:
flume-ng agent:启动 Flume 数据采集代理,用于数据流的收集、传输和处理。-n a1:指定 Flume Agent 的名称为
a1。-c conf:指定 Flume 的配置文件所在目录。
-f /opt/apps/flume/conf/test1_avro_hdfs.conf:指定 Flume 使用的配置文件。
-Dflume.root.logger=INFO,console:设置日志级别为
INFO,并将日志输出到控制台。
步骤 3:在 master 上创建数据文件
在 master 上创建一个数据文件 log.00,其中包含一些测试数据:
cd /opt/apps/flume
echo "hello world from cluster" >> log.00步骤 4:在 master 上启动 Avro 客户端
在 master 上运行 Avro客户端,将 log.00 文件的数据通过 Avro 协议发送到 slave1 上运行的 Flume Agent:
cd /opt/apps/flume
# 使用Flume的avro-client工具,将log.00文件内容通过 Avro 协议发送到主机 slave1 上的4141端口
flume-ng avro-client \
-c conf \
-H slave1 \
-p 4141 \
-F /opt/apps/flume/log.00解释:
flume-ng avro-client: Flume 中的 Avro 客户端命令,用于发送数据到配置好的 Flume 代理的 Avro Source(数据接收端)。-c conf:指定 Flume 配置文件所在的目录。-H slave1:设置要连接的 Flume 代理(agent)的主机名。-p 4141::连接到slave1主机上端口为4141的 Flume 代理的 Avro Source。-F /opt/apps/flume/log.00:指定要发送的文件路径
步骤 5:验证数据存储
方法一:
# 在浏览器中查看HDFS上的数据
192.168.36.100:9870方法二:
# 在 `slave2` 的查看HDFS上的 `/flume_logs/` 目录,检查数据是否成功存储。
hdfs dfs -ls /flume_logs/数据流说明
Avro 客户端 在
master上发送数据到slave1。Flume Agent 在
slave1上接收数据,并将数据通过内存通道传输到 Sink。HDFS Sink 将数据存储在 HDFS 中。
实验2:使用 NetcatSource 数据采集
netcat工具介绍
netcat(简称nc)是一款灵活的网络工具,用于TCP和UDP连接,可用于创建连接、监听端口、数据传输等操作。它常被用于两台设备间的交互,主要工作在监听模式和传输模式。
netcat工作原理
netcat读取一个端点的数据并将其输出到另一个端点。该端点可以是网络上另一台设备,也可以是本地计算机的不同端口。netcat的用途包括从一台设备向另一台设备发送数据,或监听特定端口以接收连接和数据。
netcat工具安装
# 分别在master/slave1/slave2下操作,安装netcat工具
# 1.将安装包libpcap-1.5.3-13.el7_9.x86_64和 map-ncat-6.40-19.el7.x86_64 复制到 /opt/software 目录。
# 2.进入目录,如果slave1/slave2下没有此目录,先创建
cd /opt/software
# 3.安装依赖和工具
rpm -ivh libpcap-1.5.3-13.el7_9.x86_64.rpm
rpm -ivh nmap-ncat-6.40-19.el7.x86_64.rpmnetcat工具测试
步骤1:在 master 上监听端口
在 master 上使用以下命令监听端口 44444:
# -l 选项表示监听模式,-k 选项表示保持监听状态
[root@master ~]# nc -l -k 44444
.....监听并等待接收数据步骤2:在 slave1 上连接并发送数据
在 slave1 上运行以下命令与 master 建立连接并发送数据:
[root@master ~]# nc master 44444
.....编写并发送数据此时,可以通过终端输入数据并实时传输到 master。
注意事项
若 master 端口 10000 正在监听,
netcat将建立 TCP 连接,双方可在连接上交互数据。若 master 未监听端口 10000,连接将失败,出现错误提示:
Ncat: Connection refused.
【实验任务】
在分布式 Flume 环境中,通过 NetcatSource 从远程主机采集数据,并将数据存储到 HDFS 中。
【实验目的】
学习如何在分布式环境中使用 Flume 的
NetcatSource采集远程数据。将采集的数据存储到 HDFS,实现跨主机的数据采集和存储。
了解 Flume 在分布式日志收集和数据存储中的实际应用。
【实验步骤】
master:运行 Flume Agent,监听远程连接并接收数据。slave1:作为数据发送端,通过netcat工具向master发送数据。master:存储数据到 HDFS。
环境配置:确保所有机器上都安装了 Netcat工具
确保Hadoop集群已经启动
步骤 1:在 master 上配置 Flume Agent
创建 Flume 配置文件: 在
master的 Flume 配置目录/opt/apps/flume/conf中创建一个配置文件test2_netcat_hdfs.conf:cd /opt/apps/flume/conf
vim test2_netcat_hdfs.conf编辑配置文件: 在
test2_netcat_hdfs.conf文件中输入以下内容,并添加详细注释:# Flume 配置文件:test2_netcat_hdfs.conf
# 定义 Flume Agent 的组件
# a1 是该 Agent 的名称,用于后续配置中指代此 Agent
a1.sources = r1
# 定义 Sink 名称为 k1
a1.sinks = k1
# 定义 Channel 名称为 c1
a1.channels = c1
# 配置 Source 组件 r1
# Source 是数据的输入端,负责从 netcat 客户端接收数据
a1.sources.r1.type = netcat
# 绑定 IP 地址为 0.0.0.0,表示源(source)将监听并接受来自所有网络接口的连接
a1.sources.r1.bind = 0.0.0.0
# Avro Source 监听的端口号 4141,以接收来自netcat的数据[netcat客户端将通过该端口发送数据]
a1.sources.r1.port = 44444
# 定义拦截器,为消息添加时间戳
a1.sources.r1.interceptors = i1
# 指定的拦截器类型,并且它会为每个事件添加一个时间戳属性,这个时间戳将反映事件被 Flume source 处理的时间
a1.sources.r1.interceptors.i1.type = timestamp
# 配置 Sink 组件 k1
# Sink 是数据的输出端,用于将数据写入 HDFS
a1.sinks.k1.type = hdfs
# 设置 HDFS 路径,将数据存储在 master 的 HDFS 的 /flume_logs/ 目录中
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/test2/%Y%m%d
# 文件类型为 DataStream,表示数据按流方式写入 HDFS
a1.sinks.k1.hdfs.fileType = DataStream
# 设置写入的文件格式为文本
a1.sinks.k1.hdfs.writeFormat = Text
# 每次批量写入 200 条数据到 HDFS
# 即在数据传输过程中,Sink 组件会累积 200条数据后再一次性批量写入 HDFS,减少逐条写数据时的 I/O 操作开销
a1.sinks.k1.hdfs.batchSize = 200
# 文件滚动间隔设置为 0,表示不断开文件写入(实时写入数据)
a1.sinks.k1.hdfs.rollInterval = 0
# 配置 Channel 组件 c1
# Channel 是 Flume 中用于连接 Source 和 Sink 的通道,存储传输过程中的数据
# 配置 Channel 通道类型为 memory,用于在内存中缓存数据
a1.channels.c1.type = memory
# Channel 的容量最多存储 1000 条事件(event)
# 当事件数达到 1000 时,Channel 将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件
a1.channels.c1.capacity = 1000
# Source 和 Sink 从 memory Channel 每次事务传输的最大事件数量为 200
a1.channels.c1.transactionCapacity = 200
# 绑定 Source、Sink 和 Channel 之间的关系
# 将 Source r1 绑定到 Channel c1
a1.sources.r1.channels = c1
# 将 Sink k1 绑定到 Channel c1
a1.sinks.k1.channel = c1
步骤 2:在 master 上启动 Flume Agent
cd /opt/apps/flume
# 启动 Flume Agent,加载 `test2_netcat_hdfs.conf` 配置文件
flume-ng agent \
-n a1 \
-c ./conf \
-f ./conf/test2_netcat_hdfs.conf \
-Dflume.root.logger=INFO,console解释:
-c ./conf:指定 Flume 配置文件所在目录。
-f ./conf/example_hdfs_slave2.conf:指定 Flume 使用的配置文件。
-n a1:指定 Agent 名称为
a1。-Dflume.root.logger=INFO,console:设置日志输出级别为
INFO,并将日志输出到控制台。
步骤 3:在 slave1 上使用 nc 发送数据
使用 netcat 连接到 master:
# 在 `slave1` 上打开终端,使用 `netcat` 工具连接到 `master` 的 IP 地址和端口 `44444`
nc master 44444
# 如果连接成功,会显示 `Connected to master` 等提示信息,表示连接已建立。发送数据:
# 在连接成功的 `nc` 会话中,输入以下字符后按回车键
hello from slave1!查看 HDFS 存储:
# 在slave2下查看HDFS上的数据,查看刚刚传输的内容
hdfs dfs -cat /flume/test1/.............
实验结果验证
成功后,可以在
slave2上的 HDFS 中/flume_logs/路径中看到传输的数据文件,内容为从slave1发送的数据。实验证明,
NetcatSource可以成功接收来自远程设备的数据,并通过 Flume Agent 的 HDFS Sink 将数据存储到 HDFS 中。
注意事项
如果 Flume Agent 启动时报端口冲突错误,可以使用以下命令查看端口占用情况:
netstat -tunlp | grep 44444
# 或者
ss -tunlp | grep 44444找到占用端口的进程 ID 后,可以使用
kill命令终止该进程。
实验总结
通过本实验,学会了如何配置 Flume 的 NetcatSource 以接收来自远程设备的数据传输,并使用 HDFS Sink 将数据存储到指定的 HDFS 路径中。实验演示了如何在分布式环境中实现数据的采集和集中存储,适用于实际场景中的日志采集和数据存储。
【实验改进】
通过 Flume 配置实现从 netcat 客户端采集数据,并将数据同时输出到 HDFS 和控制台。
【实验步骤】
1.重新创建一个配置文件test2-2_netcat_hdfs.conf:
# 在master下操作
cd /opt/apps/flume/conf
# 创建空文件
touch test2-2_netcat_hdfs.conf编辑内容如下:
# 定义 Flume Agent 的组件
# a1 是该 Agent 的名称,用于后续配置中指代此 Agent
a1.sources = r1
# 定义 Sink 名称为 k1, k2
a1.sinks = k1 k2
# 定义 Channel 名称为 c1, c2
a1.channels = c1 c2
# 配置 Source 组件 r1
# Source 是数据的输入端,负责从 netcat 客户端接收数据
a1.sources.r1.type = netcat
# 绑定 IP 地址为 0.0.0.0,表示源(source)将监听并接受来自所有网络接口的连接
a1.sources.r1.bind = 0.0.0.0
# Netcat Source 监听的端口号 44444,以接收来自 netcat 的数据
a1.sources.r1.port = 44444
# 定义拦截器,为消息添加时间戳
a1.sources.r1.interceptors = i1
# 指定的拦截器类型,并且它会为每个事件添加一个时间戳属性,这个时间戳将反映事件被 Flume source 处理的时间
a1.sources.r1.interceptors.i1.type = timestamp
# 配置第1个 Sink 组件 k1(输出到 HDFS)
# Sink 是数据的输出端,用于将数据写入 HDFS
a1.sinks.k1.type = hdfs
# 设置 HDFS 路径,将数据存储在 master 的 HDFS 的 /flume_logs/ 目录中
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/test2/%Y%m%d
# 文件类型为 DataStream,表示数据按流方式写入 HDFS
a1.sinks.k1.hdfs.fileType = DataStream
# 设置写入的文件格式为文本
a1.sinks.k1.hdfs.writeFormat = Text
# 每次批量写入 100 条数据到 HDFS
# 即在数据传输过程中,Sink 组件会累积 200 条数据后再一次性批量写入 HDFS,减少逐条写数据时的 I/O 操作开销
a1.sinks.k1.hdfs.batchSize = 200
# 文件滚动间隔设置为 0,表示不断开文件写入(实时写入数据)
a1.sinks.k1.hdfs.rollInterval = 0
# 配置第2个 Sink 组件 k2(输出到控制台)
a1.sinks.k2.type = logger
# 配置第1个 Channel 组件 c1(用于 HDFS 输出)
# Channel 是 Flume 中用于连接 Source 和 Sink 的通道,存储传输过程中的数据
# 配置 Channel 通道类型为 memory,用于在内存中缓存数据
a1.channels.c1.type = memory
# Channel 的容量最多存储 1000 条事件(event)
# 当事件数达到 1000 时,Channel 将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件
a1.channels.c1.capacity = 1000
# Source 和 Sink 从 memory Channel 每次事务传输的最大事件数量为 200
a1.channels.c1.transactionCapacity = 200
# 配置第2个 Channel 组件 c2(用于 Logger Sink 输出)
# Channel 是 Flume 中用于连接 Source 和 Sink 的通道,存储传输过程中的数据
# 配置 Channel 通道类型为 memory,用于在内存中缓存数据
a1.channels.c2.type = memory
# Channel 的容量最多存储 1000 条事件(event)
# 当事件数达到 1000 时,Channel 将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件
a1.channels.c2.capacity = 1000
# Source 和 Sink 从 memory Channel 每次事务传输的最大事件数量为 200
a1.channels.c2.transactionCapacity = 200
# 绑定 Source、Sink 和 Channel 之间的关系
# 将 Source r1 绑定到 Channel c1 和 c2,以便事件可以传输到两个 Channel
a1.sources.r1.channels = c1 c2
# 将 Sink k1 绑定到 Channel c1
a1.sinks.k1.channel = c1
# 将 Sink k2 绑定到 Channel c2
a1.sinks.k2.channel = c22.重新启动flume 和 netcat,检验数据多路输出
步骤 2:在 master 上启动 Flume Agent
cd /opt/apps/flume
# 启动 Flume Agent,加载 `test2-2_netcat_hdfs.conf` 配置文件
flume-ng agent \
-n a1 \
-c ./conf \
-f ./conf/test2-2_netcat_hdfs.conf \
-Dflume.root.logger=INFO,console步骤 3:在 slave1 上使用 nc 发送数据
使用 netcat 连接到 master:
# 在 `slave1` 上打开终端,使用 `netcat` 工具连接到 `master` 的 IP 地址和端口 `44444`
nc master 44444发送数据:
# 在连接成功的 `nc` 会话中,输入以下字符后按回车键
hello !查看 HDFS 存储:
# 在slave2下查看HDFS上的数据,查看刚刚传输的内容
hdfs dfs -cat /flume/test2/.............
实验3:Flume收集并传输日志数据到 HDFS
【实验任务】设置并运行 Flume 以收集并传输日志数据到 HDFS
【实验目标】本实验旨在使用 Flume 配置文件来收集系统日志文件,并将其实时传输到 Hadoop 分布式文件系统(HDFS)。实验将涉及配置 Flume agent、source、sink 和 channel。
【实验步骤】
1.编辑 Flume 配置文件
在master的/opt/apps/flume/conf 中创建配置文件,命名为 test3_log_hdfs.conf
cd /opt/apps/flume/conf
# 创建空文件
touch test3_log_hdfs.conf双击打开编辑,使用以下配置内容:
## 定义 Agent 名称
# 为 Flume 配置的 Agent 定义各组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
## 配置 Source
# 配置 Source 组件,类型为 exec, Flume 会通过执行某个系统命令[tail]来获取数据流
# exec 类型的 source 常用于实时采集日志或其他流式数据。
a1.sources.r1.type = exec
# 从指定的日志文件中实时不间断读取日志内容,默认显示文件的最后 10行 内容。
a1.sources.r1.command = tail -F /opt/apps/flume/data/logfile.log
# 为 source 配置一个拦截器i1,用于添加时间戳。
a1.sources.r1.interceptors = i1
# 配置拦截器 i1 的类型为 timestamp,作用是自动在每个事件中添加时间戳
a1.sources.r1.interceptors.i1.type = timestamp
## 配置 HDFS Sink
# 配置 Sink 组件,类型为 hdfs,将数据写入到 HDFS
a1.sinks.k1.type = hdfs
# 指定 HDFS 存储路径,文件夹按日期自动组织,格式为年月日
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/test3/%Y%m%d
# 设置文件类型为 DataStream,表示以流的形式写入数据
a1.sinks.k1.hdfs.fileType = DataStream
# 指定写入格式为 Text,以纯文本格式写入到 HDFS 中
a1.sinks.k1.hdfs.writeFormat = Text
## 配置 Channel
# 配置 Channel 组件,类型为 memory,数据临时存储在内存中
a1.channels.c1.type = memory
# Channel 的容量最多存储1000条事件event
# 当事件数达到1000时,Channel将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件。
a1.channels.c1.capacity = 1000
# source 和 sink 从内存 channel 每次事务传输的最大事件数量为100
a1.channels.c1.transactionCapacity = 100
## 将 Source 与 Sink 和 Channel 连接起来
# 为 Source r1 绑定 Channel c1,使数据可以从 Source 传输到 Channel
a1.sources.r1.channels = c1
# 为 Sink k1 绑定 Channel c1,使数据可以从 Channel 传输到 Sink
a1.sinks.k1.channel = c14.启动 Flume Agent:
# 在master上操作
cd /opt/apps/flume
# 在master节点上启动 Flume Agent
flume-ng agent \
-n a1 \
-c conf \
-f conf/test3_log_hdfs.conf \
-Dflume.root.logger=INFO,console5.模拟实时日志,输入文本验证
# 在slave1上运行下面的命令添加文本,模拟实时日志
# 通过SSH连接到 master 服务器,并将输入的数据追加写入到logfile.log文件中
ssh master "cat >> /opt/apps/flume/data/logfile.log"6.验证和监控:
检查 HDFS 的 /flume/test3/当天的日期 目录,确认日志文件是否成功上传。
# 查看方法一:打开浏览器,输入以上网址
master:9870
# 或者
192.168.36.100:9870
# 查看方法二:命令法
# 在slave2下运行下面的命令,查看hdfs上的文本内容
hdfs dfs -ls -R /
hdfs dfs -cat /flume/test3/当天的日期/*7.实验报告:
编写实验报告,其中包括配置文件的解释、启动过程、遇到的问题及其解决方案,以及成功传输日志到 HDFS 的证据截图。
实验4:flume采集目录到hdfs
【实验任务】
通过 Flume 采集指定目录中的新文件,将其内容发送至 HDFS 进行存储,模拟日志数据实时收集和分布式存储的流程。
【实验目标】
掌握 Flume Source、Channel、Sink 的配置和使用。
实现多节点间的数据采集和传输,并将数据上传至 HDFS。
探索 spooldir 和 avro source 的应用场景,并了解数据传输格式和文件滚动策略。
【实验步骤】
首先启动hadoop集群
start-dfs.sh # 在master上启动HDFS
start-yarn.sh # 在slave1上启动yarn1. 配置test4_file_avro.conf 文件
# 在master节点上运行
cd /opt/apps/flume/conf
# 创建空文件
touch test4_file_avro.conf
# 编辑此文件
vi test4_file_avro.conf配置文件作用:在 slave1 主机上监控指定目录中的新增文件,采集内容后通过 File Channel 缓存数据。数据经 Avro 格式转换后,发送至目标 agent 的 RPC 端口,实现数据传输。
# 配置test4_file_avro.conf
## 定义 Agent 名称
# 为 Flume 配置的 Agent 定义各组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
## source的配置描述
# 定义source (监视一个'目录',如果目录中出现了新的文件,就把文件内容采集过来。)
# spooldir 是 Flume 的一种 source 类型,用于监控目录并收集其中的新文件
a1.sources.r1.type = spooldir
# 创建此目录,保证里面空的【被监视的目录(从这个目录收集数据,如不为空则一并会被读取)】
a1.sources.r1.spoolDir = /opt/apps/flume/log_data
# sink的配置描述
# 配置 sink 类型为 avro,"k1" 会将数据以 Avro 格式传输到指定的目标
a1.sinks.k1.type = avro
# hostname 指定数据发送的目标主机名称或 IP 地址
a1.sinks.k1.hostname = master
# Sink 组件 k1 会将数据发送到 master 主机的 44444 端口
a1.sinks.k1.port = 44444
## channel的配置描述
# 使用File Channel做数据的临时缓存。
a1.channels.c1.type = file
# 定义 File Channel 数据存储目录,即将 Channel 中缓存的 Event 存储在该目录下的文件中。
a1.channels.c1.dataDirs = /opt/apps/flume/data
# 创建检查点目录,用于存储Channel的状态和偏移信息【读取位置】,如果Flume意外关闭,数据和状态可通过该目录下的检查点文件恢复
a1.channels.c1.checkpointDir = /opt/apps/flume/data/flume/checkpoint
## 将 Source 与 Sink 和 Channel 连接起来
# 为 Source r1 绑定 Channel c1,使数据可以从 Source 传输到 Channel
a1.sources.r1.channels = c1
# 为 Sink k1 绑定 Channel c1,使数据可以从 Channel 传输到 Sink
a1.sinks.k1.channel = c1spooldir:“斯普尔·迪尔”是指Flume的一个Source组件,用于监控指定目录下的文件并将文件内容作为Event发送给Channel组件,以进行后续的数据处理和传输。Avro: /ævroʊ/是Hadoop中的一个子项目,Avro是一个基于二进制数据传输高性能的中间件,Avro能够将数据转换为更高效的二进制格式,方便在应用程序间传输和存储,适合处理结构化数据如数据库和日志文件。
3. 分发test4_file_avro.conf
把在master节点配置好的test4_file_avro.conf分发到slave1/slave2节点上
scp /opt/apps/flume/conf/test4_file_avro.conf slave1:/opt/apps/flume/conf/
scp /opt/apps/flume/conf/test4_file_avro.conf slave2:/opt/apps/flume/conf/4.创建监听日志文件目录logs目录(必须为空,否则后面会报错)
# 在slave1上创建
cd /opt/apps/flume
mkdir log_data5. 配置test4_avro_hdfs.conf文件
在master主节点上面创建test4_avro_hdfs.conf文件,编辑配置
cd /opt/apps/flume/conf
# 创建空文件
touch test4_avro_hdfs.conf
# 编辑文件
vi test4_avro_hdfs.conf配置文件作用:在 master 主机上通过 Avro source 接收数据,使用 memory channel 缓存后传输。数据按日期存储到 HDFS,并根据文件大小或时间间隔进行文件滚动管理。
# test4_avro_hdfs配置
## 定义 Agent 名称
# 为 Flume 配置的 Agent 定义各组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
## source的配置描述
# 是指定的source类型为Avro协议从某个来源接收数据。
a1.sources.r1.type = avro
# 接收的主机名和端口号
a1.sources.r1.bind = master
# Avro Source 监听的端口号44444,以接收来自slave1上的Agent的Avro Sink的数据传输。
a1.sources.r1.port = 44444
# 定义拦截器,为消息添加时间戳
a1.sources.r1.interceptors = i1
# 指定的拦截器类型,并且它会为每个事件添加一个时间戳属性,这个时间戳将反映事件被 Flume source 处理的时间
a1.sources.r1.interceptors.i1.type = timestamp
## sink的配置描述
# 数据传递到hdfs上面
a1.sinks.k1.type = hdfs
# 设置master的hdfs路径地址(core.site.xml里设置的hdfs)
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/test4/%Y%m%d
# 数据写入到 HDFS 文件系统中,并且生成的文件名会以 "events-" 开头
a1.sinks.k1.hdfs.filePrefix = events-
# 文件类型被设置为 "DataStream",它是一种 HDFS 文件类型,允许数据以流式方式写入和读取。
a1.sinks.k1.hdfs.fileType = DataStream
# rollCount 指定了每个文件包含的事件数。当一个文件中的事件数达到 rollCount 值时,Flume 会自动滚动生成一个新的文件,以便存储新的事件。
# 表示数据汇不会根据事件数量来滚动生成文件。即Flume 会将所有的事件写入到同一个文件中,直到数据汇被关闭
a1.sinks.k1.hdfs.rollCount = 0
# HDFS上的文件达到128M时生成一个文件
a1.sinks.k1.hdfs.rollSize = 134217728
# HDFS上的文件达到60秒生成一个文件
a1.sinks.k1.hdfs.rollInterval = 60
## channel的配置描述
# 使用内存缓冲区域做数据的临时缓存
a1.channels.c1.type = memory
# 设置channel缓存的event事件数量最大为1000个,如超过,则不再接收新的 event,直到传递到sink
a1.channels.c1.capacity = 1000
# 设置channel在单个事务(transaction)中能处理的最大事件数量为 100
a1.channels.c1.transactionCapacity = 100
## 将 Source 与 Sink 和 Channel 连接起来
# 为 Source r1 绑定 Channel c1,使数据可以从 Source 传输到 Channel
a1.sources.r1.channels = c1
# 为 Sink k1 绑定 Channel c1,使数据可以从 Channel 传输到 Sink
a1.sinks.k1.channel = c15. 启动flume
5.1 先启动master节点的flume (如果先启动slave上的会导致拒绝连接)
cd /opt/apps/flume/
flume-ng agent \
-n a1 \
-c conf \
-f /opt/apps/flume/conf/test4_avro_hdfs.conf \
-Dflume.root.logger=INFO,console
# 解释:
flume-ng # Flume的启动脚本,启动时需要指定Agent的名字、配置文件的目录和配置文件的名称
-n a1 # 指定我们这个agent的名字
-c conf # 指定flume自身的配置文件所在目录,包含flume-env.sh和log4j的配置文件
-f /opt/apps/flume/conf/test4_avro_hdfs.conf # 指定我们所描述的采集方案(配置文件)
-Dflume.root.logger=INFO,console # 表示将运行日志输出到控制台
# 将 Flume 的根日志记录器配置为 info,console。这意味着 Flume 将同时将日志信息输出到控制台(console)和日志文件,其中日志级别为 info。日志级别包括:log、info、warn、error
# 如果运行以上程序,出现以下提示:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/apps/flume/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/apps/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
# 原因:这个提示表示在程序的类路径中有多个 SLF4J 绑定,可能导致日志记录出现问题。建议检查程序的依赖,移除其中一个绑定
# 解决,移除低版本日志jar包:
mv /opt/apps/flume/lib/slf4j-log4j12-1.6.1.jar /opt/apps/flume/lib/slf4j-log4j12-1.6.1.jar_bak5.2 在slave1节点上启动test4_file_avro.conf
cd /opt/apps/flume
# 运行flume采集和监听日志目录
flume-ng agent \
-n a1 \
-c conf \
-f /opt/apps/flume/conf/test4_file_avro.conf \
-Dflume.root.logger=INFO,console
或者可以后台启动命令:(这样可以不用另开窗口操作)
# 命令后加 &
flume-ng agent \
-n a1 \
-c conf \
-f /opt/apps/flume/conf/test4_file_avro.conf \
-Dflume.root.logger=INFO,console &
在slave2节点上也可以按上述方法启动conf配置文件因我们只需要在
slave1上完成测试,故没有启动
4.3 启动成功后,在slave2上编辑文件并发送到slave1的监听目录下
# 在slave2下操作
cd ~
touch test
# 随便写些内容
hello test
hello将其复制到slave1监听目录/opt/apps/flume/logs文件夹下
scp test slave1:/opt/apps/flume/log_data/
注意:不要将相同文件名的文件复制到监听目录 /opt/apps/flume/logs,否则会报错:File name has been re-used with different files. Spooling assumptions violated for /opt/apps/flume/logs/test2.COMPLETED.
原因:Flume 的 Spool Directory Source 组件会监视目录中新文件,并在处理完成后将文件标记为已完成(COMPLETED)。如果相同文件名被重复使用,Flume 会认为该文件已处理过,从而导致错误。
解决方法:删除相同名称但内容不同的文件 test 并重新启动 Flume。如果需要重复测试,确保删除 test 文件或使用不同的文件名。
4.4 查看master节点运行结果

该日志显示了 Flume 的 HDFS Sink 从创建临时文件、写入数据到最终关闭和重命名的完整流程,确保数据可靠写入 HDFS。
文件写入路径: hdfs://master:9000/flume/test4/20241109/events-.1731150892182
4.5 web端查看
http://master:9870
# 或者
192.168.36.100:9870


