李翔-大数据技术

Big data technology!

第9章 Flume的部署与使用

日志采集工具Flume的部署

一. Flume概述与核心架构

1.1 概述

  • 系统特点Flume是一个分布式、高可靠、高可用的海量日志数据的采集、聚合和传输系统。

  • 数据源支持:能够采集多种形式的数据源(如socket数据包、日志文件、日志目录、Kafka等),这些在Flume中被称为source

  • 数据输出channel是作为source(数据源)和sink(数据输出目的地)之间的缓冲和临时存储机制,sink(数据输出目的地)包括HDFSHBaseHiveKafka等。

1.2 基本架构与组件

  • 核心角色 - AgentFlume的基本架构中心是Agent(代理),一个运行在JVM进程中的关键组件。Agent负责协调和管理数据从外部源到目的地的整个流程。

  • 组件构成

    • Memory Channel:存储数据于JVM堆内存中,适用于快速数据流,但存在数据丢失风险。

    • File Channel:将数据存储在本地磁盘,适合大量数据处理,提供持久化存储。

    • Kafka Channel:用于Kafka集成,将数据存储在Kafka中。

    • Source(源头):数据流的起点,负责从各种数据源获取数据,并将其以事件(event)的形式发送到Channel中。

    • Channel(通道):作为数据传输的中间环节,负责接收来自Source的数据,并且临时存储这些数据,直到它们被Sink处理。Channel可以是基于内存的缓冲区或基于文本持久化存储。

    • Sink(目的地):数据流的终点,从Channel中提取数据,并执行后续操作,如存储到数据库、发送到远程服务器或转换数据格式。

1.3 系统特性

  • 分布式设计Flume是由多个agent连接起来形成的分布式系统。每个agent都是独立工作的,负责特定部分的数据处理。

  • 数据传递与管理:每个agent在内部包含SourceChannelSink三个核心组件,共同负责数据的传递和管理。

event /ɪˈvent/ 事件 sink /sɪŋk/ 下沉 channel /ˈtʃænl/ 渠道,传输 Agent /ˈeɪdʒənt/ 代理

image-20230127111912152

主要作用:

实时读取服务器本地磁盘或指定端口的数据,将数据写入到HDFS或者kafka。

image-20220929121330318

1.4 Flume Event的概念

SourceChannelSink之间传递数据的形式是Event事件;Event事件是一个基本数据流单元。

img


  1. Event的构成

    • Key(Header):是数据的元数据,包含如时间戳、数据来源等描述信息。

    • Value(Body):则是数据的具体内容。

    • 基本单元:在Flume中,Event是数据流的基本单位。

    • 键值对结构:Event包含两个部分,即Key(Header)和Value(Body)。

  2. 类比解释

    • 邮件内容(Body):对应Event的具体数据内容。

    • 邮件主题(Subject):对应Event的Header,包含了数据的元数据,比如来源和时间戳等。

    • 电子邮件类比:可以将Flume的Event比作一封电子邮件。

  3. 具体示例

    • Flume Agent:类似于邮递员,负责采集、传输、处理和存储数据。

    • 日志Event:类似于一封邮件,每条访问日志被视作一个Event。

    • 日志内容和元数据:日志的具体内容对应邮件的主体,而服务器名和时间戳等元数据则对应邮件的主题。

    • 目的地:类似于邮件的收件地址,将访问日志从多个服务器采集并写入HDFS。

    • 访问日志的采集和存储过程:

通俗地讲,Flume中的Agent和Event之间的关系可以理解为传输与处理的关系。Event是Agent中用于传输、处理的基本数据单位,而Agent则负责控制Event的传输与处理过程。

这样的表达更顺畅些,内容上是正确的。你很准确地理解了Flume中Event和Agent的功能和关系!


1.5 Flume采集系统结构图

1.简单结构

单个agent采集数据

Flume系统的核心角色是agent,它是一个Java进程,通常运行在日志收集节点上,以事件的形式将数据从源头传输到目的地

img


2. 复杂结构

多级Agent之间串联

img


使用场景:

一家大型购物网站在北京、上海和广州分别运行了三个Web服务器,每个服务器生成访问日志和用户行为数据。该网站希望将所有日志汇总到HDFS中,供后续分析和推荐模型训练使用。

示例分布:

  1. 数据采集(一级Agent)

    • 北京的Agent1、上海的Agent2、广州的Agent3分别从各自的Web服务器收集日志数据,并将数据打包成Event。

  2. 数据汇总(二级Agent)

    • Agent1、Agent2和Agent3将日志数据传输到中央服务器上的Agent4,进行汇总。

  3. 数据存储(HDFS)

    • Agent4将汇总后的数据发送到HDFS,供数据团队进行分析和模型训练。

这个结构实现了不同区域的日志数据统一收集和存储,方便后续分析处理。


二.flume的部署

2.1 伪分布环境-安装flume

伪分布式环境指的是在一台机器上模拟多台机器的分布式环境,用于学习和测试分布式系统。


2.2 集群环境-安装flume

2.2.1 规划

三台主机的主机名分别为masterslave1slave2(且三台主机的防火墙已关闭)

Flume下载地址: flume下载官网


2.2.2 解压安装包

上传apache-flume-1.9.0-bin.tar.gzmaster上的/opt/software文件夹下

# 解压
cd /opt/software
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/apps

# 改名
cd /opt/apps
mv ./apache-flume-1.9.0-bin ./flume   #将解压的文件修改名字为flume,简化操作


2.2.3 配置环境变量

# 在master上打开环境变量
vi /etc/profile

# 添加以下内容
# Flume
export FLUME_HOME=/opt/apps/flume
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin

# 分发环境变量文件到 slave1 和 slave2 主机
scp -r /etc/profile slave1:/etc
scp -r /etc/profile slave2:/etc

# 分别在master/slave1/slave2主机运行下面的命令,使环境变量生效:
source /etc/profile


2.2.4 配置flume-env.sh运行环境变量

cd /opt/apps/flume/conf 

# 复制文件
cp ./flume-env.sh.template ./flume-env.sh

# 编辑文件
vi ./flume-env.sh
# 添加以下内容
export JAVA_HOME=/opt/apps/jdk


2.2.5 解决Flume 和 Hadoop 使用的 Guava库版本不一致而导致的冲突

# 修改guava文件名,使之失效
mv /opt/apps/flume/lib/guava-11.0.2.jar /opt/apps/flume/lib/guava-11.0.2.jar.bf

# 复制hadoop下的高版本guava库到Flume中
cp /opt/apps/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /opt/apps/flume/lib/


2.2.6 查看flume版本信息

# 查看flume版本信息
flume-ng version

# 运行结果
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523


可能出现的问题:在查询版本时报下图的错误

image-20230628173025286

问题描述: 如果系统安装了 HBase,运行该命令时可能会遇到错误,提示找不到或无法加载主类 org.apache.flume.tools.GetJavaProperty。这是因为 HBase 的 HBASE_CLASSPATH 设置会干扰 Flume 的类路径。

解决方法

# 修改hbase-env.sh配置文件
cd  /opt/apps/hbase/conf
vi  hbase-env.sh

更改如下:

# 修改 HBASE_CLASSPATH 配置
export HBASE_CLASSPATH_PREFIX=/opt/apps/hadoop/etc/hadoop


2.2.7 创建日志文件目录和检查点目录

# 创建日志文件目录logs目录(必须为空,否则后面会报错)
cd  /opt/apps/flume
mkdir logs

# 创建检查点目录(还原点,用于恢复数据)
cd /opt/apps/flume
mkdir -p data/flume/checkpoint

解释:

1.日志文件目录

  • 确保 logs 目录为空,因为 Flume 可能会在启动时检查此目录是否有遗留数据或权限问题。空的 logs 目录可以确保 Flume 启动时不会因该目录出错。

2.检查点目录 checkpoint 的作用

  • 这个目录是 Flume 的 File Channel 的检查点目录,用于在 Flume 出现异常停止或系统宕机时恢复数据。

  • 用途:存储 Channel 中未消费的事件及相关元数据,确保在 Flume 重启后能够恢复到上一次处理的状态,保证数据的持久性和可靠性。


2.2.8 分发Flume到slave各节点

把在master节点配置好的flume分发到slave各节点上(slave1,slave2)

scp -r /opt/apps/flume/ slave1:/opt/apps/
scp -r /opt/apps/flume/ slave2:/opt/apps/


至此,安装flume(集群模式)完成


实验1:使用 Avro Source 进行分布式数据采集

【实验任务】

该实验将演示如何配置 Flume 在集群中运行,数据将从 master 上的 Avro 客户端发送到 slave1 上的 Flume Agent,并最终存储到 HDFS 中。

Avro: Avro 客户端把数据转换成二进制 Avro 格式后在分布式系统中进行传输。

【实验目标】

  • 使用 Flume 在集群环境中采集数据;

  • 配置 Flume Agent 监听 Avro 客户端数据;

  • 将数据存储到 HDFS 中。

【实验步骤】

步骤 1:在 slave1 上配置 Flume Agent

  1. 创建配置文件

    cd /opt/apps/flume/conf
    # 创建空文件
    touch test1_avro_hdfs.conf
    • slave1 的 Flume 配置目录 /opt/apps/flume/conf 下创建配置文件 test1_avro_hdfs.conf

  2. 编辑配置文件

    # 定义 Flume Agent 的组件
    # a1 是该 Agent 的名称,用于后续配置中指代此 Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置 Source 组件 r1
    # Source 是数据的输入端,负责从 Avro 客户端接收数据
    a1.sources.r1.type = avro
    # 绑定 IP 地址为 0.0.0.0,表示源(source)将监听并接受来自所有网络接口的连接。
    a1.sources.r1.bind = 0.0.0.0
    # Avro Source 监听的端口号 4141,以接收来自avro-client的数据。
    a1.sources.r1.port = 4141
    # 定义拦截器,为消息添加时间戳
    a1.sources.r1.interceptors = i1
    # 指定的拦截器类型,并且它会为每个事件添加一个时间戳属性,这个时间戳将反映事件被 Flume source 处理的时间
    a1.sources.r1.interceptors.i1.type = timestamp

    # 配置 Sink 组件 k1
    # Sink 是数据的输出端,用于将数据写入 HDFS
    a1.sinks.k1.type = hdfs
    # 设置 HDFS 路径,将数据存储在 master 的 HDFS 的 /flume_logs/ 目录中
    a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/test1/%Y%m%d
    # 文件类型为 DataStream,表示数据按流方式写入 HDFS
    a1.sinks.k1.hdfs.fileType = DataStream
    # 设置写入的文件格式为文本
    a1.sinks.k1.hdfs.writeFormat = Text
    # 每次批量写入1000条(events)到 HDFS
    # 即在数据传输过程中,Sink组件会累积100条(events)后再一次性批量写入 HDFS,减少逐条写数据时I/O操作的开销
    a1.sinks.k1.hdfs.batchSize = 100
    # 文件滚动间隔设置为 0,表示不断开文件写入(实时写入数据)
    a1.sinks.k1.hdfs.rollInterval = 0

    # 配置 Channel 组件 c1
    # Channel 是 Flume 中用于连接 Source 和 Sink 的通道,存储传输过程中的数据
    # # 配置Channel通道类型为 memory,用于在内存中缓存数据
    a1.channels.c1.type = memory
    # Channel 的容量最多存储1000条事件(events)
    # 当(events)事件数达到1000时,Channel将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件。
    a1.channels.c1.capacity = 1000
    # source 和 sink 从内存 channel 每次事务传输的最大事件数量为1000(events)
    a1.channels.c1.transactionCapacity = 100

    ## 将 Source 与 Sink 和 Channel 连接起来
    # 将 source r1 绑定到 channel c1,使数据从 source r1 发送到 channel c1
    a1.sources.r1.channels = c1
    # 将 sink k1 绑定到 channel c1,使 sink k1 从 channel c1 接收数据
    a1.sinks.k1.channel = c1
    • test1_avro_hdfs.conf 中输入以下内容:


步骤 2:在 slave1 上启动 Flume Agent

cd /opt/apps/flume

# 启动 Flume Agent,加载 `test1_avro_hdfs.conf` 配置文件
# 实现从数据源(Source)接收数据并通过通道(Channel)将数据传输到目标(Sink)
# 即将数据从 Avro Source 接收到后存入 HDFS。
flume-ng agent \
-n a1 \
-c /opt/apps/flume/conf \
-f /opt/apps/flume/conf/test1_avro_hdfs.conf \
-Dflume.root.logger=INFO,console

解释:

  • flume-ng agent:启动 Flume 数据采集代理,用于数据流的收集、传输和处理。

  • -n a1:指定 Flume Agent 的名称为 a1

  • -c conf:指定 Flume 的配置文件所在目录。

  • -f /opt/apps/flume/conf/test1_avro_hdfs.conf:指定 Flume 使用的配置文件。

  • -Dflume.root.logger=INFO,console:设置日志级别为 INFO ,并将日志输出到控制台。


步骤 3:在 master 上创建数据文件

master 上创建一个数据文件 log.00,其中包含一些测试数据:

cd /opt/apps/flume
echo "hello world from cluster" >> log.00


步骤 4:在 master 上启动 Avro 客户端

master 上运行 Avro客户端,将 log.00 文件的数据通过 Avro 协议发送到 slave1 上运行的 Flume Agent

cd /opt/apps/flume

# 使用Flume的avro-client工具,将log.00文件内容通过 Avro 协议发送到主机 slave1 上的4141端口
flume-ng avro-client \
-c conf \
-H slave1 \
-p 4141 \
-F /opt/apps/flume/log.00

解释:

  • flume-ng avro-client: Flume 中的 Avro 客户端命令,用于发送数据到配置好的 Flume 代理的 Avro Source(数据接收端)。

  • -c conf:指定 Flume 配置文件所在的目录。

  • -H slave1:设置要连接的 Flume 代理(agent)的主机名。

  • -p 4141:连接到 slave1 主机上端口为 4141 的 Flume 代理的 Avro Source。

  • -F /opt/apps/flume/log.00:指定要发送的文件路径


步骤 5:验证数据存储

方法一:

# 在浏览器中查看HDFS上的数据
192.168.36.100:9870

方法二:

# 在 `slave2` 的查看HDFS上的 `/flume_logs/` 目录,检查数据是否成功存储。
hdfs dfs -ls /flume_logs/


数据流说明

  • Avro 客户端master 上发送数据到 slave1

  • Flume Agentslave1 上接收数据,并将数据通过内存通道传输到 Sink。

  • HDFS Sink 将数据存储在 HDFS 中。



实验2:使用 NetcatSource 数据采集

netcat工具介绍

netcat(简称nc)是一款灵活的网络工具,用于TCP和UDP连接,可用于创建连接、监听端口、数据传输等操作。它常被用于两台设备间的交互,主要工作在监听模式传输模式

netcat工作原理

netcat读取一个端点的数据并将其输出到另一个端点。该端点可以是网络上另一台设备,也可以是本地计算机的不同端口。netcat的用途包括从一台设备向另一台设备发送数据,或监听特定端口以接收连接和数据。

netcat工具安装

# 分别在master/slave1/slave2下操作,安装netcat工具

# 1.将安装包libpcap-1.5.3-13.el7_9.x86_64和 map-ncat-6.40-19.el7.x86_64 复制到 /opt/software 目录。

# 2.进入目录,如果slave1/slave2下没有此目录,先创建
cd /opt/software

# 3.安装依赖和工具
rpm -ivh libpcap-1.5.3-13.el7_9.x86_64.rpm
rpm -ivh nmap-ncat-6.40-19.el7.x86_64.rpm

netcat工具测试

步骤1:在 master 上监听端口

在 master 上使用以下命令监听端口 44444:

# -l 选项表示监听模式,-k 选项表示保持监听状态
[root@master ~]# nc -l -k 44444
.....监听并等待接收数据

步骤2:在 slave1 上连接并发送数据

在 slave1 上运行以下命令与 master 建立连接并发送数据:

[root@master ~]# nc master 44444
.....编写并发送数据

此时,可以通过终端输入数据并实时传输到 master。

注意事项

  • 若 master 端口 10000 正在监听,netcat 将建立 TCP 连接,双方可在连接上交互数据。

  • 若 master 未监听端口 10000,连接将失败,出现错误提示:Ncat: Connection refused.


【实验任务】

在分布式 Flume 环境中,通过 NetcatSource 从远程主机采集数据,并将数据存储到 HDFS 中。

【实验目的】

  • 学习如何在分布式环境中使用 Flume 的 NetcatSource 采集远程数据。

  • 将采集的数据存储到 HDFS,实现跨主机的数据采集和存储。

  • 了解 Flume 在分布式日志收集和数据存储中的实际应用。

【实验步骤】

  • master:运行 Flume Agent,监听远程连接并接收数据。

  • slave1:作为数据发送端,通过 netcat 工具向 master 发送数据。

  • master:存储数据到 HDFS。

环境配置:确保所有机器上都安装了 Netcat工具

确保Hadoop集群已经启动


步骤 1:在 master 上配置 Flume Agent

  1. 创建 Flume 配置文件 master 的 Flume 配置目录 /opt/apps/flume/conf 中创建一个配置文件 test2_netcat_hdfs.conf

    cd /opt/apps/flume/conf
    vim test2_netcat_hdfs.conf
  2. 编辑配置文件 test2_netcat_hdfs.conf 文件中输入以下内容,并添加详细注释:

    # Flume 配置文件:test2_netcat_hdfs.conf

    # 定义 Flume Agent 的组件
    # a1 是该 Agent 的名称,用于后续配置中指代此 Agent
    a1.sources = r1
    # 定义 Sink 名称为 k1
    a1.sinks = k1
    # 定义 Channel 名称为 c1
    a1.channels = c1

    # 配置 Source 组件 r1
    # Source 是数据的输入端,负责从 netcat 客户端接收数据
    a1.sources.r1.type = netcat
    # 绑定 IP 地址为 0.0.0.0,表示源(source)将监听并接受来自所有网络接口的连接
    a1.sources.r1.bind = 0.0.0.0
    # Avro Source 监听的端口号 4141,以接收来自netcat的数据[netcat客户端将通过该端口发送数据]
    a1.sources.r1.port = 44444
    # 定义拦截器,为消息添加时间戳
    a1.sources.r1.interceptors = i1
    # 指定的拦截器类型,并且它会为每个事件添加一个时间戳属性,这个时间戳将反映事件被 Flume source 处理的时间
    a1.sources.r1.interceptors.i1.type = timestamp

    # 配置 Sink 组件 k1
    # Sink 是数据的输出端,用于将数据写入 HDFS
    a1.sinks.k1.type = hdfs
    # 设置 HDFS 路径,将数据存储在 master 的 HDFS 的 /flume_logs/ 目录中
    a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/test2/%Y%m%d
    # 文件类型为 DataStream,表示数据按流方式写入 HDFS
    a1.sinks.k1.hdfs.fileType = DataStream
    # 设置写入的文件格式为文本
    a1.sinks.k1.hdfs.writeFormat = Text
    # 每次批量写入 200 条数据到 HDFS
    # 即在数据传输过程中,Sink 组件会累积 200条数据后再一次性批量写入 HDFS,减少逐条写数据时的 I/O 操作开销
    a1.sinks.k1.hdfs.batchSize = 200
    # 文件滚动间隔设置为 0,表示不断开文件写入(实时写入数据)
    a1.sinks.k1.hdfs.rollInterval = 0

    # 配置 Channel 组件 c1
    # Channel 是 Flume 中用于连接 Source 和 Sink 的通道,存储传输过程中的数据
    # 配置 Channel 通道类型为 memory,用于在内存中缓存数据
    a1.channels.c1.type = memory
    # Channel 的容量最多存储 1000 条事件(event)
    # 当事件数达到 1000 时,Channel 将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件
    a1.channels.c1.capacity = 1000
    # Source 和 Sink 从 memory Channel 每次事务传输的最大事件数量为 200
    a1.channels.c1.transactionCapacity = 200

    # 绑定 Source、Sink 和 Channel 之间的关系
    # 将 Source r1 绑定到 Channel c1
    a1.sources.r1.channels = c1
    # 将 Sink k1 绑定到 Channel c1
    a1.sinks.k1.channel = c1


步骤 2:在 master 上启动 Flume Agent

cd /opt/apps/flume

# 启动 Flume Agent,加载 `test2_netcat_hdfs.conf` 配置文件
flume-ng agent \
-n a1 \
-c ./conf \

-f ./conf/test2_netcat_hdfs.conf \
-Dflume.root.logger=INFO,console

解释:

  • -c ./conf:指定 Flume 配置文件所在目录。

  • -f ./conf/example_hdfs_slave2.conf:指定 Flume 使用的配置文件。

  • -n a1:指定 Agent 名称为 a1

  • -Dflume.root.logger=INFO,console:设置日志输出级别为 INFO,并将日志输出到控制台。



步骤 3:在 slave1 上使用 nc 发送数据

  1. 使用 netcat 连接到 master

    # 在 `slave1` 上打开终端,使用 `netcat` 工具连接到 `master` 的 IP 地址和端口 `44444`
    nc master 44444

    # 如果连接成功,会显示 `Connected to master` 等提示信息,表示连接已建立。
  2. 发送数据

    # 在连接成功的 `nc` 会话中,输入以下字符后按回车键
    hello from slave1!
  3. 查看 HDFS 存储

    # 在slave2下查看HDFS上的数据,查看刚刚传输的内容
    hdfs dfs -cat /flume/test1/.............

实验结果验证

  • 成功后,可以在 slave2 上的 HDFS 中 /flume_logs/ 路径中看到传输的数据文件,内容为从 slave1 发送的数据。

  • 实验证明,NetcatSource 可以成功接收来自远程设备的数据,并通过 Flume Agent 的 HDFS Sink 将数据存储到 HDFS 中。


注意事项

  • 如果 Flume Agent 启动时报端口冲突错误,可以使用以下命令查看端口占用情况:

 netstat -tunlp | grep 44444
# 或者
ss -tunlp | grep 44444
  • 找到占用端口的进程 ID 后,可以使用 kill 命令终止该进程。


实验总结

通过本实验,学会了如何配置 Flume 的 NetcatSource 以接收来自远程设备的数据传输,并使用 HDFS Sink 将数据存储到指定的 HDFS 路径中。实验演示了如何在分布式环境中实现数据的采集和集中存储,适用于实际场景中的日志采集和数据存储。


【实验改进】

通过 Flume 配置实现从 netcat 客户端采集数据,并将数据同时输出到 HDFS 和控制台。

【实验步骤】

1.重新创建一个配置文件test2-2_netcat_hdfs.conf

# 在master下操作
cd /opt/apps/flume/conf
# 创建空文件
touch test2-2_netcat_hdfs.conf

编辑内容如下:

# 定义 Flume Agent 的组件
# a1 是该 Agent 的名称,用于后续配置中指代此 Agent
a1.sources = r1
# 定义 Sink 名称为 k1, k2
a1.sinks = k1 k2
# 定义 Channel 名称为 c1, c2
a1.channels = c1 c2

# 配置 Source 组件 r1
# Source 是数据的输入端,负责从 netcat 客户端接收数据
a1.sources.r1.type = netcat
# 绑定 IP 地址为 0.0.0.0,表示源(source)将监听并接受来自所有网络接口的连接
a1.sources.r1.bind = 0.0.0.0
# Netcat Source 监听的端口号 44444,以接收来自 netcat 的数据
a1.sources.r1.port = 44444
# 定义拦截器,为消息添加时间戳
a1.sources.r1.interceptors = i1
# 指定的拦截器类型,并且它会为每个事件添加一个时间戳属性,这个时间戳将反映事件被 Flume source 处理的时间
a1.sources.r1.interceptors.i1.type = timestamp

# 配置第1个 Sink 组件 k1(输出到 HDFS)
# Sink 是数据的输出端,用于将数据写入 HDFS
a1.sinks.k1.type = hdfs
# 设置 HDFS 路径,将数据存储在 master 的 HDFS 的 /flume_logs/ 目录中
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/test2/%Y%m%d
# 文件类型为 DataStream,表示数据按流方式写入 HDFS
a1.sinks.k1.hdfs.fileType = DataStream
# 设置写入的文件格式为文本
a1.sinks.k1.hdfs.writeFormat = Text
# 每次批量写入 100 条数据到 HDFS
# 即在数据传输过程中,Sink 组件会累积 200 条数据后再一次性批量写入 HDFS,减少逐条写数据时的 I/O 操作开销
a1.sinks.k1.hdfs.batchSize = 200
# 文件滚动间隔设置为 0,表示不断开文件写入(实时写入数据)
a1.sinks.k1.hdfs.rollInterval = 0

# 配置第2个 Sink 组件 k2(输出到控制台)
a1.sinks.k2.type = logger

# 配置第1个 Channel 组件 c1(用于 HDFS 输出)
# Channel 是 Flume 中用于连接 Source 和 Sink 的通道,存储传输过程中的数据
# 配置 Channel 通道类型为 memory,用于在内存中缓存数据
a1.channels.c1.type = memory
# Channel 的容量最多存储 1000 条事件(event)
# 当事件数达到 1000 时,Channel 将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件
a1.channels.c1.capacity = 1000
# Source 和 Sink 从 memory Channel 每次事务传输的最大事件数量为 200
a1.channels.c1.transactionCapacity = 200

# 配置第2个 Channel 组件 c2(用于 Logger Sink 输出)
# Channel 是 Flume 中用于连接 Source 和 Sink 的通道,存储传输过程中的数据
# 配置 Channel 通道类型为 memory,用于在内存中缓存数据
a1.channels.c2.type = memory
# Channel 的容量最多存储 1000 条事件(event)
# 当事件数达到 1000 时,Channel 将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件
a1.channels.c2.capacity = 1000
# Source 和 Sink 从 memory Channel 每次事务传输的最大事件数量为 200
a1.channels.c2.transactionCapacity = 200

# 绑定 Source、Sink 和 Channel 之间的关系
# 将 Source r1 绑定到 Channel c1 和 c2,以便事件可以传输到两个 Channel
a1.sources.r1.channels = c1 c2
# 将 Sink k1 绑定到 Channel c1
a1.sinks.k1.channel = c1
# 将 Sink k2 绑定到 Channel c2
a1.sinks.k2.channel = c2


2.重新启动flume 和 netcat,检验数据多路输出

步骤 2:在 master 上启动 Flume Agent

cd /opt/apps/flume

# 启动 Flume Agent,加载 `test2-2_netcat_hdfs.conf` 配置文件
flume-ng agent \
-n a1 \
-c ./conf \

-f ./conf/test2-2_netcat_hdfs.conf \
-Dflume.root.logger=INFO,console


步骤 3:在 slave1 上使用 nc 发送数据

  1. 使用 netcat 连接到 master

    # 在 `slave1` 上打开终端,使用 `netcat` 工具连接到 `master` 的 IP 地址和端口 `44444`
    nc master 44444
  2. 发送数据

    # 在连接成功的 `nc` 会话中,输入以下字符后按回车键
    hello !
  3. 查看 HDFS 存储

    # 在slave2下查看HDFS上的数据,查看刚刚传输的内容
    hdfs dfs -cat /flume/test2/.............




实验3:Flume收集并传输日志数据到 HDFS

【实验任务】设置并运行 Flume 以收集并传输日志数据到 HDFS

【实验目标】本实验旨在使用 Flume 配置文件来收集系统日志文件,并将其实时传输到 Hadoop 分布式文件系统(HDFS)。实验将涉及配置 Flume agent、source、sink 和 channel。

【实验步骤】

1.编辑 Flume 配置文件

master/opt/apps/flume/conf 中创建配置文件,命名为 test3_log_hdfs.conf

cd /opt/apps/flume/conf 
# 创建空文件
touch test3_log_hdfs.conf

双击打开编辑,使用以下配置内容:

## 定义 Agent 名称
# 为 Flume 配置的 Agent 定义各组件的名称
a1.sources = r1  
a1.sinks = k1    
a1.channels = c1

## 配置 Source
# 配置 Source 组件,类型为 exec, Flume 会通过执行某个系统命令[tail]来获取数据流
# exec 类型的 source 常用于实时采集日志或其他流式数据。
a1.sources.r1.type = exec
# 从指定的日志文件中实时不间断读取日志内容,默认显示文件的最后 10行 内容。
a1.sources.r1.command = tail -F /opt/apps/flume/data/logfile.log
# 为 source 配置一个拦截器i1,用于添加时间戳。
a1.sources.r1.interceptors = i1
# 配置拦截器 i1 的类型为 timestamp,作用是自动在每个事件中添加时间戳
a1.sources.r1.interceptors.i1.type = timestamp  

## 配置 HDFS Sink
# 配置 Sink 组件,类型为 hdfs,将数据写入到 HDFS
a1.sinks.k1.type = hdfs
# 指定 HDFS 存储路径,文件夹按日期自动组织,格式为年月日
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/test3/%Y%m%d
# 设置文件类型为 DataStream,表示以流的形式写入数据
a1.sinks.k1.hdfs.fileType = DataStream
# 指定写入格式为 Text,以纯文本格式写入到 HDFS 中
a1.sinks.k1.hdfs.writeFormat = Text

## 配置 Channel
# 配置 Channel 组件,类型为 memory,数据临时存储在内存中
a1.channels.c1.type = memory
# Channel 的容量最多存储1000条事件event
# 当事件数达到1000时,Channel将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件。
a1.channels.c1.capacity = 1000
#  source 和 sink 从内存 channel 每次事务传输的最大事件数量为100
a1.channels.c1.transactionCapacity = 100

## 将 Source 与 Sink 和 Channel 连接起来
# 为 Source r1 绑定 Channel c1,使数据可以从 Source 传输到 Channel
a1.sources.r1.channels = c1
# 为 Sink k1 绑定 Channel c1,使数据可以从 Channel 传输到 Sink
a1.sinks.k1.channel = c1

4.启动 Flume Agent

# 在master上操作
cd /opt/apps/flume

# 在master节点上启动 Flume Agent
flume-ng agent \
-n a1 \
-c conf \
-f conf/test3_log_hdfs.conf \
-Dflume.root.logger=INFO,console

5.模拟实时日志,输入文本验证

# 在slave1上运行下面的命令添加文本,模拟实时日志
# 通过SSH连接到 master 服务器,并将输入的数据追加写入到logfile.log文件中
ssh master "cat  >> /opt/apps/flume/data/logfile.log"

6.验证和监控:

检查 HDFS 的 /flume/test3/当天的日期 目录,确认日志文件是否成功上传。

# 查看方法一:打开浏览器,输入以上网址
 master:9870
 # 或者
 192.168.36.100:9870

# 查看方法二:命令法
# 在slave2下运行下面的命令,查看hdfs上的文本内容
 hdfs dfs -ls -R /
 hdfs dfs -cat /flume/test3/当天的日期/*

7.实验报告:

编写实验报告,其中包括配置文件的解释、启动过程、遇到的问题及其解决方案,以及成功传输日志到 HDFS 的证据截图。



实验4:flume采集目录到hdfs

【实验任务】

通过 Flume 采集指定目录中的新文件,将其内容发送至 HDFS 进行存储,模拟日志数据实时收集和分布式存储的流程。

【实验目标】

  • 掌握 Flume Source、Channel、Sink 的配置和使用。

  • 实现多节点间的数据采集和传输,并将数据上传至 HDFS。

  • 探索 spooldir 和 avro source 的应用场景,并了解数据传输格式和文件滚动策略。

【实验步骤】

首先启动hadoop集群

start-dfs.sh        # 在master上启动HDFS
start-yarn.sh       # 在slave1上启动yarn


1.  配置test4_file_avro.conf 文件

# 在master节点上运行
cd /opt/apps/flume/conf
# 创建空文件
touch test4_file_avro.conf
# 编辑此文件
vi test4_file_avro.conf

配置文件作用:slave1 主机上监控指定目录中的新增文件,采集内容后通过 File Channel 缓存数据。数据经 Avro 格式转换后,发送至目标 agent 的 RPC 端口,实现数据传输。

# 配置test4_file_avro.conf

## 定义 Agent 名称
# 为 Flume 配置的 Agent 定义各组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

## source的配置描述
# 定义source (监视一个'目录',如果目录中出现了新的文件,就把文件内容采集过来。)
# spooldir 是 Flume 的一种 source 类型,用于监控目录并收集其中的新文件
a1.sources.r1.type = spooldir
# 创建此目录,保证里面空的【被监视的目录(从这个目录收集数据,如不为空则一并会被读取)】
a1.sources.r1.spoolDir = /opt/apps/flume/log_data

# sink的配置描述
# 配置 sink 类型为 avro,"k1" 会将数据以 Avro 格式传输到指定的目标
a1.sinks.k1.type = avro
# hostname 指定数据发送的目标主机名称或 IP 地址
a1.sinks.k1.hostname = master
# Sink 组件 k1 会将数据发送到 master 主机的 44444 端口
a1.sinks.k1.port = 44444

## channel的配置描述
# 使用File Channel做数据的临时缓存。
a1.channels.c1.type = file
# 定义 File Channel 数据存储目录,即将 Channel 中缓存的 Event 存储在该目录下的文件中。
a1.channels.c1.dataDirs = /opt/apps/flume/data
# 创建检查点目录,用于存储Channel的状态和偏移信息【读取位置】,如果Flume意外关闭,数据和状态可通过该目录下的检查点文件恢复
a1.channels.c1.checkpointDir = /opt/apps/flume/data/flume/checkpoint

## 将 Source 与 Sink 和 Channel 连接起来
# 为 Source r1 绑定 Channel c1,使数据可以从 Source 传输到 Channel
a1.sources.r1.channels = c1
# 为 Sink k1 绑定 Channel c1,使数据可以从 Channel 传输到 Sink
a1.sinks.k1.channel = c1
  • spooldir:“斯普尔·迪尔”是指Flume的一个Source组件,用于监控指定目录下的文件并将文件内容作为Event发送给Channel组件,以进行后续的数据处理和传输。

  • Avro: /ævroʊ/Hadoop中的一个子项目,Avro是一个基于二进制数据传输高性能的中间件,Avro能够将数据转换为更高效的二进制格式,方便在应用程序间传输和存储,适合处理结构化数据如数据库和日志文件。


3. 分发test4_file_avro.conf

把在master节点配置好的test4_file_avro.conf分发到slave1/slave2节点上

scp  /opt/apps/flume/conf/test4_file_avro.conf slave1:/opt/apps/flume/conf/
scp  /opt/apps/flume/conf/test4_file_avro.conf slave2:/opt/apps/flume/conf/


4.创建监听日志文件目录logs目录(必须为空,否则后面会报错)

# 在slave1上创建
cd /opt/apps/flume
mkdir log_data


5.  配置test4_avro_hdfs.conf文件

master主节点上面创建test4_avro_hdfs.conf文件,编辑配置

cd /opt/apps/flume/conf

# 创建空文件
touch test4_avro_hdfs.conf
# 编辑文件
vi test4_avro_hdfs.conf

配置文件作用:master 主机上通过 Avro source 接收数据,使用 memory channel 缓存后传输。数据按日期存储到 HDFS,并根据文件大小或时间间隔进行文件滚动管理。

# test4_avro_hdfs配置

## 定义 Agent 名称
# 为 Flume 配置的 Agent 定义各组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

## source的配置描述
# 是指定的source类型为Avro协议从某个来源接收数据。
a1.sources.r1.type = avro
# 接收的主机名和端口号
a1.sources.r1.bind = master
# Avro Source 监听的端口号44444,以接收来自slave1上的Agent的Avro Sink的数据传输。
a1.sources.r1.port = 44444
# 定义拦截器,为消息添加时间戳
a1.sources.r1.interceptors = i1
# 指定的拦截器类型,并且它会为每个事件添加一个时间戳属性,这个时间戳将反映事件被 Flume source 处理的时间
a1.sources.r1.interceptors.i1.type = timestamp

## sink的配置描述
# 数据传递到hdfs上面
a1.sinks.k1.type = hdfs
# 设置master的hdfs路径地址(core.site.xml里设置的hdfs)
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/test4/%Y%m%d
# 数据写入到 HDFS 文件系统中,并且生成的文件名会以 "events-" 开头
a1.sinks.k1.hdfs.filePrefix = events-
# 文件类型被设置为 "DataStream",它是一种 HDFS 文件类型,允许数据以流式方式写入和读取。
a1.sinks.k1.hdfs.fileType = DataStream
# rollCount 指定了每个文件包含的事件数。当一个文件中的事件数达到 rollCount 值时,Flume 会自动滚动生成一个新的文件,以便存储新的事件。
# 表示数据汇不会根据事件数量来滚动生成文件。即Flume 会将所有的事件写入到同一个文件中,直到数据汇被关闭
a1.sinks.k1.hdfs.rollCount = 0
# HDFS上的文件达到128M时生成一个文件
a1.sinks.k1.hdfs.rollSize = 134217728
# HDFS上的文件达到60秒生成一个文件
a1.sinks.k1.hdfs.rollInterval = 60

## channel的配置描述
# 使用内存缓冲区域做数据的临时缓存
a1.channels.c1.type = memory
# 设置channel缓存的event事件数量最大为1000个,如超过,则不再接收新的 event,直到传递到sink
a1.channels.c1.capacity = 1000
# 设置channel在单个事务(transaction)中能处理的最大事件数量为 100
a1.channels.c1.transactionCapacity = 100

## 将 Source 与 Sink 和 Channel 连接起来
# 为 Source r1 绑定 Channel c1,使数据可以从 Source 传输到 Channel
a1.sources.r1.channels = c1
# 为 Sink k1 绑定 Channel c1,使数据可以从 Channel 传输到 Sink
a1.sinks.k1.channel = c1


5. 启动flume

5.1 先启动master节点的flume (如果先启动slave上的会导致拒绝连接)

cd /opt/apps/flume/

flume-ng agent \
-n a1 \
-c conf \
-f /opt/apps/flume/conf/test4_avro_hdfs.conf \
-Dflume.root.logger=INFO,console

# 解释:
flume-ng    # Flume的启动脚本,启动时需要指定Agent的名字、配置文件的目录和配置文件的名称
-n a1       # 指定我们这个agent的名字
-c conf     # 指定flume自身的配置文件所在目录,包含flume-env.sh和log4j的配置文件
-f /opt/apps/flume/conf/test4_avro_hdfs.conf   # 指定我们所描述的采集方案(配置文件)
-Dflume.root.logger=INFO,console      # 表示将运行日志输出到控制台
# 将 Flume 的根日志记录器配置为 info,console。这意味着 Flume 将同时将日志信息输出到控制台(console)和日志文件,其中日志级别为 info。日志级别包括:log、info、warn、error

image-20220929230455774


# 如果运行以上程序,出现以下提示:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/apps/flume/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/apps/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

# 原因:这个提示表示在程序的类路径中有多个 SLF4J 绑定,可能导致日志记录出现问题。建议检查程序的依赖,移除其中一个绑定

# 解决,移除低版本日志jar包:
mv /opt/apps/flume/lib/slf4j-log4j12-1.6.1.jar /opt/apps/flume/lib/slf4j-log4j12-1.6.1.jar_bak


5.2 在slave1节点上启动test4_file_avro.conf

cd /opt/apps/flume

# 运行flume采集和监听日志目录
flume-ng agent \
-n a1 \
-c conf \
-f /opt/apps/flume/conf/test4_file_avro.conf \
-Dflume.root.logger=INFO,console

image-20241109191255976


或者可以后台启动命令:(这样可以不用另开窗口操作)

# 命令后加 &
flume-ng agent \
-n a1 \
-c conf \
-f /opt/apps/flume/conf/test4_file_avro.conf \
-Dflume.root.logger=INFO,console &


在slave2节点上也可以按上述方法启动conf配置文件

因我们只需要在slave1上完成测试,故没有启动


4.3 启动成功后,在slave2上编辑文件并发送到slave1的监听目录下

# 在slave2下操作
cd ~
touch test

# 随便写些内容
hello test
hello

将其复制到slave1监听目录/opt/apps/flume/logs文件夹下

scp test slave1:/opt/apps/flume/log_data/

注意:不要将相同文件名的文件复制到监听目录 /opt/apps/flume/logs,否则会报错:File name has been re-used with different files. Spooling assumptions violated for /opt/apps/flume/logs/test2.COMPLETED.

原因:Flume 的 Spool Directory Source 组件会监视目录中新文件,并在处理完成后将文件标记为已完成(COMPLETED)。如果相同文件名被重复使用,Flume 会认为该文件已处理过,从而导致错误。

解决方法:删除相同名称但内容不同的文件 test 并重新启动 Flume。如果需要重复测试,确保删除 test 文件或使用不同的文件名。


4.4 查看master节点运行结果

image-20241109192345180

该日志显示了 Flume 的 HDFS Sink 从创建临时文件、写入数据到最终关闭和重命名的完整流程,确保数据可靠写入 HDFS。

文件写入路径: hdfs://master:9000/flume/test4/20241109/events-.1731150892182



4.5 web端查看

http://master:9870
# 或者
192.168.36.100:9870

image-20221209210144163

image-20221209210106730

image-20221209210030434

image-20241109191628034



发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1