李翔-大数据技术

Big data technology!

实时任务04

实验任务4:

Flink 实时消费 Kafka 数据,统计城市的出现频次,并写入 Redis。

注意:此 Redis 表的数据是数据可视化【大屏】地图(实时)的数据来源


实验目标:

  • 掌握 Flink 流处理任务的基本构建流程与语法结构。

  • 熟悉 Kafka 新版 KafkaSource 的数据读取方式。

  • 学会使用 case class 封装数据,提升可读性与可维护性。

  • 掌握 Flink 中基于 keyBy + sum 的实时分组聚合统计。

  • 学会使用 RedisSink 将处理结果写入 Redis 实时存储。

  • 实现从 Kafka → 实时统计 → 控制台打印 → Redis 写入 的完整流式链路。


实验任务:

  1. 构建 Flink 流处理环境,启用 checkpoint,提升容错。

  2. 从 Kafka 中消费消息,提取城市字段,封装为结构化对象 CityCount

  3. 基于城市字段 city,对城市进行分组计数统计。

  4. 控制台输出结果,格式为 (城市, 次数),便于验证。

  5. 使用 RedisSink 将 (城市, 次数) 写入 Redis,城市名为 key,数量为 value。

  6. 实现 Redis 实时缓存功能,支持其他系统快速查询。


实验步骤:

第一步:初始化 Flink 流处理环境

  • 获取 StreamExecutionEnvironment

  • 设置并行度为 1(便于调试观察)。

  • 启用 checkpoint,每 5 秒执行一次状态快照,提高容错能力。

第二步:创建 Kafka 数据源

  • 使用新版 KafkaSource

  • 设置:

    • Kafka 地址为 master:9092

    • 订阅主题为 my-topic

    • 消费者组 ID 为 flink-consumer-group

  • 设置从最新位置读取(OffsetsInitializer.latest())。

  • 只解析消息的 value 部分(使用 SimpleStringSchema)。

第三步:数据清洗与结构化转换

  • 将 Kafka 中每条消息按逗号 , 拆分字段。

  • 提取第 0 列作为城市字段。

  • 使用 case class CityCount(city: String, count: Int) 封装为结构化数据。

第四步:按城市分组并统计数量

  • 使用 keyBy(_.city) 对城市字段进行分组。

  • 使用 .sum("count") 对每个城市的统计数量进行累加。

第五步:控制台输出调试结果

  • 使用 .map(record => (record.city, record.count))CityCount 转为元组。

  • 使用 .print() 打印 (城市, 数量),方便观察每条数据处理情况。

第六步:写入 Redis 实现数据落地

  • 配置 Redis 连接,主机为 master,端口为 6379

  • 使用 RedisSink 写入数据。

  • 使用自定义 RedisMapper,将 (城市, 数量) 写入 Redis,命令类型为 SET

    • key:城市名

    • value:城市对应的统计数量

第七步:启动 Flink 作业

  • 使用 env.execute("Kafka 城市统计写入 Redis") 启动作业并开始运行。

配置Redis

(1)在/opt/apps/redis/目录下,修改配置文件redis.conf

# 允许所有 IP 访问 Redis
bind 0.0.0.0

# 关闭保护模式,允许任何机器连接到 Redis
protected-mode no

完整启动环境

1.启动Hadoop集群

start-dfs.sh
start-yarn.sh

2.启动Zookeeper(需先启动Hadoop服务,且三个主从节点均需启动Zookeeper)

# 在三个节点启动Zookeeper
zkServer.sh start

# 检测Zookeeper是否成功启动
zkServer.sh status

3.启动kafka服务(三个主从节点均需启动)

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

4.启动模拟数据流

# 在第1个master节点运行
# 1.进入根目录
cd /

# 2.运行Python程序,产生模拟数据
python3 main.py

5.启动Flume

# 在第2个 master节点运行下面的命令,将读取到的数据推送至Kafka指定topic中
flume-ng agent \
 -n a1 \
 -c /opt/apps/flume/conf/dataCleanLog \
 -f /opt/apps/flume/conf/dataCleanLog/flume_To_Kafka_dataClean.properties \
 -Dflume.root.logger=info,console

6.启动Kafka消费者

# 在第3个master节点启动kafka消费者,消费数据流
kafka-console-consumer.sh --bootstrap-server master:9092 --topic my-topic

7.启动redis服务

# 1.在第4个master节点启动Redis服务
cd /opt/apps/redis/src/

# 2.启动 Redis 服务器,使用指定的配置文件 redis.conf
./redis-server /opt/apps/redis/redis.conf

# 3.检查Redis服务是否启动,返回PONG即可已启动
redis-cli ping


完整代码如下:

package org.example

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

// 主程序入口
object FlinkCityToRedis {

 // 定义一个结构体类(case class),表示每条数据的结构:城市名 + 统计数量
 case class CityCount(city: String, count: Int)

 def main(args: Array[String]): Unit = {

   // 1.创建 Flink 执行环境(负责调度任务)
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   // 设置并行度为 1,方便打印观察(实际部署时可设为更高)
   env.setParallelism(1)
   // 启用检查点(Checkpoint),提高程序容错性
   env.enableCheckpointing(5000)

   println("1.Flink 执行环境已启动...")

   // 2.配置 Kafka 数据源(新 API)
   val kafkaSource = KafkaSource.builder[String]
     .setBootstrapServers("master:9092")       // Kafka 服务地址
     .setTopics("my-topic")                    // 监听的 Kafka 主题
     .setGroupId("flink-consumer-group")       // 消费者组,用于管理消费进度
     .setStartingOffsets(OffsetsInitializer.latest()) // 从最新数据开始消费
     .setValueOnlyDeserializer(new SimpleStringSchema()) // 只读取消息体部分
     .build()

   println("2.KafkaSource 构建完成,准备接收数据...")

   // 3.创建数据流,从 Kafka 获取字符串消息
   val stream: DataStream[String] = env.fromSource(
     kafkaSource,
     WatermarkStrategy.noWatermarks(),         // 不启用事件时间机制
     "Kafka Source"                            // 数据源名称
   )

   // 打印原始 Kafka 数据
   stream.map { msg =>
     println(s"接收到 Kafka 消息:$msg")
     msg
   }

   /*
   把 Kafka 中的原始数据流 stream 中的每条消息通过 .map 方法处理,
   提取出城市字段并转换为 CityCount(city, 1) 对象,
   形成一个新的结构化数据流 cityStream,其每条数据都是 CityCount 类型,用于后续的按城市分组和统计。
   : DataStream[CityCount] 指定变量的类型为 Flink 的数据流,里面的每个元素都是 CityCount 类型的对象。
   stream.map { ... } 对原始的 stream 数据流做 .map 转换,把每条原始数据处理后变成 CityCount 类型。
   */
   // 4.清洗数据,提取城市字段,并封装为 CityCount(city, 1)
   val cityStream: DataStream[CityCount] = stream
     .map { line =>
       // 每条 Kafka 消息假设格式为:城市名,其他字段,...
       val parts = line.split(",")  // 拆分字符串
       val city = if (parts.nonEmpty) parts(0).trim else "unknown" // 取第 0 列作为城市名
       CityCount(city, 1)  // 每来一条城市记录,初始化为计数 1
     }

   // 5.统计每个城市的出现次数(按城市名分组后,对 count 累加)
   val cityCounts: DataStream[CityCount] = cityStream
     .keyBy(record => record.city)      // 根据 city 字段分组,相当于 SQL 的 GROUP BY
     .sum("count")       // 每有一条新记录,count + 1

//    // 6.将统计结果打印出来,格式化为 (城市, 次数)
//    cityCounts
//      .map(record => (record.city, record.count)) // 转为元组类型,方便输出更简洁
//      .print() // 控制台打印:示例 (广州, 3)

   // 7.配置 Redis 连接参数
   val redisConfig = new FlinkJedisPoolConfig.Builder()
     .setHost("master")   // Redis 主机名(可改为实际 IP)
     .setPort(6379)       // Redis 默认端口
     .build()

   // 8.将统计结果写入 Redis:键为城市名,值为数量
   cityCounts
     .map{record =>
       println(s"💾 准备写入 Redis:key=${record.city}, value=${record.count}")
       (record.city, record.count) // 转为 (String, Int),便于 RedisSink 使用
     }
       // 将 (城市名, 次数) 写入 Redis,使用自定义映射器指定写入方式
     .addSink(new RedisSink[(String, Int)](redisConfig, new CityRedisMapper))

   println("3.Flink 流任务即将启动...")
   // 9.启动作业
   env.execute("Kafka Redis")
 }

 // 自定义 Redis 映射器:告诉 Flink 如何把数据写入 Redis
 class CityRedisMapper extends RedisMapper[(String, Int)] {

   // 使用 Redis 的 SET 命令:例如 set "广州" 3
   override def getCommandDescription: RedisCommandDescription =
     new RedisCommandDescription(RedisCommand.SET)

   // 获取 Redis 的 key(城市名)
   // 从元组 (city, count) 中取出第一个字段(即城市名)作为 Redis 的 键 key;
   // 参数 data 就是 Flink 流中的一条数据记录,它的类型是:(String, Int)
   override def getKeyFromData(data: (String, Int)): String = data._1

   // 获取 Redis 的 value(城市计数值),转成字符串
   // 从元组中取出第二个字段(计数值),并转换为字符串,作为 Redis 的 值 value;
   override def getValueFromData(data: (String, Int)): String = data._2.toString
 }
}

程序运行结果:

image-20250501120758474

在Redis中查看结果:

# 方法一: 
# 1.以原始格式启动 Redis 客户端,支持中文和非 ASCII 字符的正常显示。
redis-cli --raw
# 查看当前数据库中 key 的数量
dbsize
# 查看所有的Key
keys *
# 查看键对应的值
get 北京市

# 方法二
# 直接在Master的命令行【不用进入Redis终端】查看Key是北京市的值
redis-cli --raw get 北京市

注意:此程序 不会自动停止,因为它是一个流式处理程序

Flink 的流处理程序设计为 长期运行 的,它会持续地监听数据源(如 Kafka)并实时处理数据,直到手动停止

在这个程序中,数据源是 Kafka 的 my-topic 主题,程序会持续消费该主题中消息,并将结果更新到 Redis 中。



发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1