李翔-大数据技术

Big data technology!

第11章 Flume整合Kafka数据采集实验

六、Flume整合Kafka(使用 exec数据源)

在实际生产中,可以配置 exec Source 使用 tail -F 命令实时监控日志文件的新增内容,并逐行生成事件。通过 Memory Channel 缓存数据后,使用 Kafka Sink 将日志写入指定的 Kafka 主题,实现实时日志采集与分发。配置文件需指定日志路径、Kafka 集群地址及主题等关键参数,确保流式数据处理。

Exec Source + Kafka Sink 数据流方案

  • Exec Source:通过执行用户定义的命令(如 tail -F)从日志文件中实时获取新增内容。

  • Memory Channel:Flume 的内存通道缓存,用于在 Exec SourceKafka Sink 之间传递事件。

  • Kafka Sink:将缓存的事件转换为 Kafka 消息,写入目标 Kafka 主题 mylog_6


6.1 启动zookeeper集群

# 三台机器都要运行
[root@master ~]# zkServer.sh start


6.2 启动kafka集群

# master/slave1/slave2三台主机都要运行
cd /opt/apps/kafka

kafka-server-start.sh /opt/apps/kafka/config/server.properties &


6.3 在kafka创建topic主题,并查看

# 在master中创建topic,不能存在有相同名称的主题【如之前已经创建mylog主题,可以直接使用不用再创建】
kafka-topics.sh --create \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic mylog_6 \
--partitions 1 \
--replication-factor 3

# 查看topic主题
kafka-topics.sh  --list \
--bootstrap-server master:9092,slave1:9092,slave2:9092


6.4 创建kafka消费者

# 在master启动Kafka的消费者脚本,让其监听mylog_6主题
kafka-console-consumer.sh \
--bootstrap-server master:9092 \
--topic mylog_6 \
--from-beginning

注意:保持监听窗口不要关闭


6.5 配置flume的conf文件

创建exec-to-kafka.conf文件

# 在slave1主机上运行
cd /opt/apps/flume/conf
# 创建空文件
touch exec-to-kafka.conf

添加以下内容:

# 定义 Flume Agent 的组件
# a1 是该 Agent 的名称,用于后续配置中指代此 Agent
a1.sources = r1
# 定义 Channel 名称为 c1
a1.channels = c1
# 定义 Sink 名称为 k1
a1.sinks = k1

# 配置 Source 组件 r1
# Source 是数据的输入端,通过执行用户定义的命令或脚本来获取数据
a1.sources.r1.type = exec
# 使用 tail 命令实时监控日志文件的新增内容
# 实时捕获 /opt/apps/flume/logs/log-01.txt 文件新增的日志数据
a1.sources.r1.command = tail -F /opt/apps/flume/logs/log-01.txt

# 配置 Channel 组件 c1
# Channel 是 Flume 中用于连接 Source 和 Sink 的通道,存储传输过程中的数据
# 配置 Channel 通道类型为 memory,用于在内存中缓存事件
a1.channels.c1.type = memory
# Channel 的容量最多存储 1000 条事件
# 当事件数达到 1000 时,Channel 将暂时停止接收新事件,直到事件被消费腾出空间
a1.channels.c1.capacity = 1000
# Source 和 Sink 每次事务传输的最大事件数量为 100
a1.channels.c1.transactionCapacity = 100

# 配置 Sink 组件 k1
# Sink 是数据的输出端,用于将数据写入 Kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 设置 Kafka 主题名称,将日志数据写入 Kafka 的 mylog_6 主题
a1.sinks.k1.topic = mylog_6
# 配置 Kafka 的 broker 列表,格式为 "主机名:端口号"
# master、slave1 和 slave2 是 Kafka 集群的节点
a1.sinks.k1.brokerList = master:9092,slave1:9092,slave2:9092
# 每次批量传输 20 条事件到 Kafka,优化性能
a1.sinks.k1.batchSize = 20

# 绑定 Source、Sink 和 Channel 之间的关系
# 将 Source r1 绑定到 Channel c1,数据从 Source 传递到 Channel
a1.sources.r1.channels = c1
# 将 Sink k1 绑定到 Channel c1,数据从 Channel 传递到 Sink
a1.sinks.k1.channel = c1


6.6 启动flume

# 在slave1启动flume
flume-ng agent \
-n a1 \
-c /opt/apps/flume/conf \
-f /opt/apps/flume/conf/exec-to-kafka.conf \
-Dflume.root.logger=INFO,console

注:保持flume所在窗口不要关闭


6.7 编辑传输文件

# 在slave2 向远程主机 slave1 的文件 /opt/apps/flume/logs/log-01.txt 中追加输入内容。
# 在slave2 中运行下面的命令
ssh slave1 "cat  >> /opt/apps/flume/logs/log-01.txt"
回车后输入以下内容:
hello are you


6.8 Master节点上的Kafka消费数据,并输出在终端

此时Flume可以通过tail -F命令实时监控文件中的新增数据,发现有新数据就写入kafka,然后kafka的消费者脚本实时拉取数据并输出到控制台。

image-20230127160018474



七、多路复用的实时数据采集【重要】

7.1 任务描述

数据采集到 Kafka

  • 在 Master 节点部署 Flume,并配置一个实时数据采集任务。

  • Flume 从 Master 节点的 10050 端口监听 socket 数据流。【 socket 网络套接字是一个通信端点,允许应用程序通过 IP 和端口在网络上进行数据交换】

  • 收集到的数据被写入 Kafka 的 job_info 主题中,该主题配置了 3 个分区

  • 使用 Kafka 自带的消费者工具消费 job_info 主题中的数据,截取前 2 条消费结果并将截图粘贴到报告中。

多路复用存储

  • 通过 Flume 的 多路复用模式,同时将接收到的数据分流至两个目标:


    • Kafka:实时将数据写入 job_info 主题。

    • HDFS:将数据备份到 HDFS 的 /user/flumebackup 目录中。

实现原理

img


7.2 实时数据准备





#!/bin/bash
LOGFILE="./messages"
SOCKET="slave1 10050"
COUNT=0  # 用于记录发送的行数

while true; do
    while IFS= read -r line || [ -n "$line" ]; do
        echo "$line" | nc $SOCKET
        ((COUNT++))  # 增加计数
        echo "成功向slave1的10050端口发送第${COUNT}条数据"
        sleep 1
    done < "$LOGFILE"
done


7.3 任务实现

一、启动环境

1)启动HDFS集群。

# 在master上启动
[root@master ~]# start-dfs.sh
# 在slave1上启动
[root@slave1 ~]# start-yarn.sh

2) 启动zookeeper集群(三台机器都要启动):

zkServer.sh start


二、Flume配置

slave1主机/opt/apps/flume/conf/目录下,创建一个flume配置文件netcat-kafka-hdfs.conf

编辑内容如下:

# 定义 Flume Agent 的组件
# a1 是该 Agent 的名称,用于后续配置中指代此 Agent
a1.sources = r1
# 定义 Sink 名称为 k1 和 k2
a1.sinks = k1 k2
# 定义 Channel 名称为 c1 和 c2
a1.channels = c1 c2

# 配置 Source 组件 r1
# Source 是数据的输入端,负责从 netcat 客户端接收数据
a1.sources.r1.type = netcat
# 将 IP 地址绑定为 0.0.0.0,表示源(source)将监听来自任何网络接口的连接
a1.sources.r1.bind = 0.0.0.0
# Netcat Source 监听的端口号 10050,以接收来自 netcat 的数据
a1.sources.r1.port = 10050

# 配置第1个 Channel 组件 c1(用于 HDFS 输出)
# Channel 是 Flume 中用于连接 Source 和 Sink 的通道,存储传输过程中的数据
# 配置 Channel 通道类型为 memory,用于在内存中缓存数据
a1.channels.c1.type = memory
# Channel 的容量最多存储 1000 条事件(event)
# 当事件数达到 1000 时,Channel 将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件
a1.channels.c1.capacity = 1000
# Source 和 Sink 从 memory Channel 每次事务传输的最大事件数量为 100
a1.channels.c1.transactionCapacity = 100

# 配置第2个 Channel 组件 c2(用于 Kafka 输出)
# Channel 是 Flume 中用于连接 Source 和 Sink 的通道,存储传输过程中的数据
# 配置 Channel 通道类型为 memory,用于在内存中缓存数据
a1.channels.c2.type = memory
# Channel 的容量最多存储 1000 条事件(event)
# 当事件数达到 1000 时,Channel 将暂时停止接收新事件,直到有事件被消费(发送到 Sink),腾出空间后才会继续接收新事件
a1.channels.c2.capacity = 1000
# Source 和 Sink 从 memory Channel 每次事务传输的最大事件数量为 100
a1.channels.c2.transactionCapacity = 100

# 配置第1个 Sink 组件 k1(输出到 HDFS)
# Sink 是数据的输出端,用于将数据写入 HDFS
a1.sinks.k1.type = hdfs
# 设置 HDFS 路径,将数据存储在 master 的 HDFS 的 /user/flumebackup/ 目录中
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/flumebackup
# 文件类型为 DataStream,表示数据按流方式写入 HDFS
a1.sinks.k1.hdfs.fileType = DataStream
# 设置写入的文件格式为文本
a1.sinks.k1.hdfs.writeFormat = Text
# 设置为 10,表示每当 HDFS Sink 累积到 10 条事件后,就会强制滚动生成一个新文件。
a1.sinks.k1.hdfs.rollCount = 10
# 禁用基于时间间隔的滚动机制,不会因为时间间隔而触发文件滚动。
# 如果该参数未禁用(非 0),即使事件不足 rollCount,也会在时间到达 rollInterval 时生成新文件。
a1.sinks.k1.hdfs.rollInterval = 0
# 禁用基于文件大小的滚动机制,不会因为文件大小超过指定值而触发文件滚动。
# 如果该参数未禁用(非 0),当文件大小达到 rollSize 时,将生成新文件。
a1.sinks.k1.hdfs.rollSize = 0

# 配置第2个 Sink 组件 k2(输出到 Kafka)
# Sink 是数据的输出端,用于将数据写入 Kafka
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
# 设置 Kafka 的 broker 列表,格式为 "主机名:端口号"
a1.sinks.k2.brokerList = master:9092
# 设置 Kafka 主题名称,将日志数据写入 Kafka 的 job_info 主题
a1.sinks.k2.topic = job_info
# 每次批量传输 10 条事件到 Kafka,优化性能
a1.sinks.k2.batchSize = 10

# 绑定 Source、Sink 和 Channel 之间的关系
# 将 Source r1 绑定到 Channel c1 和 c2,以便事件可以传输到两个 Channel
a1.sources.r1.channels = c1 c2
# 将 Sink k1 绑定到 Channel c1
a1.sinks.k1.channel = c1
# 将 Sink k2 绑定到 Channel c2
a1.sinks.k2.channel = c2

配置详情:

  • Source:配置了一个 netcat 类型的数据源,用于监听和接收来自客户端的数据。

    • 该 Source 运行在主机的 10050 端口上,通过网络连接接收实时数据流。

  • Channel:配置了两个通道(c1c2)作为 Source 和 Sink 之间的数据缓冲区。

    • c1 通道用于连接到 HDFS Sink,负责缓存并传输数据到 HDFS。

    • c2 通道用于连接到 Kafka Sink,负责缓存并传输数据到 Kafka。

  • Sink:配置了两个数据输出端(Sink),分别用于将数据发送到不同的目标:

    • Kafka Sink:将数据写入 Kafka 的指定主题,用于实时数据处理或流式计算。

    • HDFS Sink:将数据以文件形式写入 Hadoop 分布式文件系统(HDFS),用于持久化和后续分析。


三、执行实时数据采集

1) 启动Kafka集群:

# 三台机器都要启动kafka服务
# 启动方法:【后台运行,但终端的屏幕上打印日志,便于检查kafka启动错误】
kafka-server-start.sh /opt/apps/kafka/config/server.properties &

2) 在master上创建Kafka主题job_info,指定使用4个分区:

# 在master上创建job_info主题,不能存在有相同名称的主题
kafka-topics.sh --create \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic job_info \
--replication-factor 1 --partitions 3

3)查看order主题是否创建成功:

kafka-topics.sh  --list --bootstrap-server master:9092,slave1:9092,slave2:9092

4) 在master上启动Kafka自带的消费者脚本,订阅job_info主题,以便观察Kafka接收到的数据。保持运行:

kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic job_info

5) 启动Flume代理

# 在slave1节点启动Flume代理
flume-ng agent \
-n a1 \
-c /opt/apps/flume/conf \
-f /opt/apps/flume/conf/netcat-kafka-hdfs.conf \
-Dflume.root.logger=DEBUG,console

6) 启动数据生成器脚本,将生成的订单数据发往Socket 10050端口。

# 在slave2上运行send_data.sh脚本
cd /root/data

# 运行脚本
./send_data.sh


7.4 验证结果

1)在master节点,应该可以看到kafka消费的实时数据

image-20241124193101171

2) 查看HDFS的/user/flumebackup/目录,应该可以看到,Flume接收数据注入kafka 的同时,将数据备份到HDFS目录/user/flumebackup下了,如下图所示:(也可以直接使用hdfs shell命令查看):

image-20241124193223308

3)查看HDFS上的/user/flumebackup/*,检查是否接收到信息:

hdfs dfs -cat /user/flumebackup/* | head -5

image-20241124193315930

实验结束


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1