第5章 Flink 实时数据实验
实验步骤
1.实时数据写入 Kafka
1.1 启动Kafka
# 新建一个master窗口,在master/slave1/slave2启动 ZooKeeper zkServer.sh start # 在master/slave1/slave2检查 ZooKeeper状态 zkServer.sh status # 在master/slave1/slave2启动 Kafka kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties
1.2 写入状态流数据到 Kafka
# 在master节点中操作【新开窗口】 cd /opt/datas # 启动设备状态模拟脚本,并将数据写入 Kafka python 01_device_status_sim.py | \ kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic
1.3 在slave1窗口中操作,检查实时数据
kafka-console-consumer.sh --bootstrap-server master:9092 --topic device_status_topic
2.启动 Flink 集群与 SQL Client
# 在 master 启动 Flink 集群 start-cluster.sh # 在 master 启动 Flink SQL 交互客户端 sql-client.sh
进入后看到下面提示符:
Flink SQL>
3. Kafka 源表定义
3.1 删除流表
DROP TABLE IF EXISTS device_status;
3.2 创建流表(把 Kafka 实时数据映射为 Flink 可查询的表
CREATE TABLE device_status ( event_type STRING, -- 事件类型(如状态上报) device_id STRING, -- 设备编号 status INT, -- 设备状态(1在线/0离线) temperature DOUBLE, -- 温度 `load` DOUBLE, -- 设备负载(注意:load 是关键字,用反引号) voltage DOUBLE, -- 电压 event_time STRING, -- 原始时间(字符串格式) ts AS TO_TIMESTAMP(event_time), -- 计算列:将字符串时间转换为时间类型(窗口统计必须用时间类型) WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 水位线:允许数据最多迟到5秒(解决乱序问题) ) WITH ( 'connector' = 'kafka', -- 指定数据来源是 Kafka 'topic' = 'device_status_topic', -- 订阅的 Kafka 主题 'properties.bootstrap.servers' = 'master:9092', -- Kafka 服务器地址 'properties.group.id' = 'rt_device_20260309_01', -- 消费者组ID(控制消费位置) 'scan.startup.mode' = 'latest-offset', -- 从最新数据开始读取 'format' = 'json' -- 数据格式为 JSON );
3.3 查看当前库中的表
SHOW TABLES;
4.在 Flink SQL 中查询 Kafka 数据
4.1 进入 Flink SQL 客户端
4.2 执行 SQL 查询
在 Flink SQL 客户端中执行以下 SQL 查询,实时查看 Kafka 中的数据:
-- 从 `device_status` 表中实时查询设备 ID、状态、温度和时间戳。 -- 注意:Kafka 源表属于动态表,查询会持续输出数据,需按 Ctrl + C 手动停止 SELECT device_id, status, temperature, ts FROM device_status;
6.4 实验
6.4.1 子实验1:实时在线设备数量统计(10s TUMBLE)
1. 指标说明
任务:每 10 秒统计一次:当前系统中有多少台设备是“在线”的(去重统计)
类型:趋势型指标
窗口:10 秒窗口
2. Flink SQL
(1) 查看窗口的样子
SELECT device_id, status, window_start, window_end FROM TABLE( TUMBLE( TABLE device_status, DESCRIPTOR(ts), INTERVAL '10' SECOND ) );
(2)每 10 秒统计一次:当前系统中有多少台设备是“在线”的(去重统计)
SELECT window_start, -- 窗口开始时间 window_end, -- 窗口结束时间 COUNT(DISTINCT device_id) AS online_count -- 去重统计在线设备数 FROM TABLE( TUMBLE( -- 窗口函数(滚动窗口,不重叠) TABLE device_status, -- 数据来源:Kafka 映射的流表 DESCRIPTOR(ts), -- 使用事件时间 ts 定义窗口 INTERVAL '10' SECOND -- 每10秒一个滚动窗口 ) ) WHERE status = 1 -- 只统计在线设备(status=1 表示在线) GROUP BY window_start, window_end;
6.4.2 子实验2:实时温度趋势(10s/30s HOP)
1. 指标说明
任务:每 10 秒更新一次最近 30 秒内的平均温度和最高温度,用于实时监测设备运行温度变化趋势。
类型:趋势型指标
窗口:30 秒窗口,每 10 秒滑动一次
2. Flink SQL
-- 每 10 秒更新一次“最近 30 秒”的温度趋势 SELECT window_start, -- 窗口开始时间 window_end , -- 窗口结束时间 AVG(temperature) AS avg_temp, -- 最近30秒的平均温度 MAX(temperature) AS max_temp -- 最近30秒的最高温度 FROM TABLE( HOP( -- 窗口函数(滑动窗口) TABLE device_status, -- 数据来源:Kafka流表 DESCRIPTOR(ts), -- 使用 ts 作为事件时间 INTERVAL '10' SECOND, -- 每10秒滑动一次(更新频率) INTERVAL '30' SECOND -- 窗口大小30秒(统计范围) ) ) GROUP BY window_start, window_end;
6.4.3 子实验3:实时告警数量统计(10s TUMBLE)
1. 指标说明
含义:基于 Flink SQL 从设备状态流中实时识别异常并生成告警事件流,再按 10 秒滚动窗口统计系统告警数量,用于实时监控设备运行风险。
类型:规则派生指标
窗口:10 秒滚动窗口
数据来源:device_status_topic(由状态流推导告警)
2. Flink SQL
第一步:由状态流生成告警事件流视图
-- 1.查看当前会话中是否已存在视图 SHOW VIEWS;
-- 2.创建临时视图:告警事件流(只保留异常数据) CREATE TEMPORARY VIEW v_alarm_event AS SELECT device_id, -- 设备编号 CASE -- 根据规则判断告警类型 WHEN temperature >= 80 THEN 'HIGH_TEMP' -- 温度过高 WHEN voltage <= 200 THEN 'LOW_VOLT' -- 电压过低 WHEN `load` >= 0.90 THEN 'HIGH_LOAD' -- 负载过高 WHEN status = 0 THEN 'DEVICE_OFFLINE' -- 设备离线 END AS alarm_type, -- 告警类型字段 ts -- 事件时间(用于后续窗口统计) FROM device_status -- 数据来源:设备状态流表 WHERE -- 只筛选“异常数据” temperature >= 80 OR voltage <= 200 OR `load` >= 0.90 OR status = 0;
-- 3.查看当前会话中是否已存在视图 SHOW VIEWS;
-- 4.从视图 `v_alarm_event` 中实时查询设备 ID、异常状态和时间戳。 select * from v_alarm_event;
第二步:统计 10 秒内告警数量
-- 每 10 秒统计一次告警总数量 SELECT window_start, -- 窗口开始时间 window_end, -- 窗口结束时间 COUNT(*) AS alarm_count -- 该窗口内的告警总次数 FROM TABLE( TUMBLE( TABLE v_alarm_event, -- 数据来源:告警事件流(只包含异常记录) DESCRIPTOR(ts), -- 使用 ts 作为事件时间 INTERVAL '10' SECOND -- 定义 10 秒滚动窗口(不重叠) ) ) GROUP BY window_start, window_end; -- 每个时间窗口单独统计
3. 验收