李翔-大数据技术

Big data technology!

2026-3-16 实验手册

第5章 Flink 实时数据实验

实验步骤

1.实时数据写入 Kafka

1.1 启动Kafka

# 新建一个master窗口,在master/slave1/slave2启动 ZooKeeper
zkServer.sh start

# 在master/slave1/slave2检查 ZooKeeper状态
zkServer.sh status

# 在master/slave1/slave2启动 Kafka
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties


1.2 写入状态流数据到 Kafka

# 在master节点中操作【新开窗口】
cd /opt/datas

# 启动设备状态模拟脚本,并将数据写入 Kafka
python 01_device_status_sim.py | \
kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic


1.3 在slave1窗口中操作,检查实时数据

kafka-console-consumer.sh --bootstrap-server master:9092 --topic device_status_topic


2.启动 Flink 集群与 SQL Client

# 在 master 启动 Flink 集群
start-cluster.sh

# 在 master 启动 Flink SQL 交互客户端
sql-client.sh

进入后看到下面提示符:

Flink SQL>



3. Kafka 源表定义

3.1 删除流表

DROP TABLE IF EXISTS device_status;

3.2 创建流表(把 Kafka 实时数据映射为 Flink 可查询的表

CREATE TABLE device_status (
  event_type STRING,      -- 事件类型(如状态上报)
  device_id STRING,       -- 设备编号
  status INT,             -- 设备状态(1在线/0离线)
  temperature DOUBLE,     -- 温度
  `load` DOUBLE,          -- 设备负载(注意:load 是关键字,用反引号)
  voltage DOUBLE,         -- 电压
  event_time STRING,      -- 原始时间(字符串格式)
  ts AS TO_TIMESTAMP(event_time),  -- 计算列:将字符串时间转换为时间类型(窗口统计必须用时间类型)
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- 水位线:允许数据最多迟到5秒(解决乱序问题)
) WITH (
  'connector' = 'kafka',            -- 指定数据来源是 Kafka
  'topic' = 'device_status_topic',  -- 订阅的 Kafka 主题
  'properties.bootstrap.servers' = 'master:9092',  -- Kafka 服务器地址
  'properties.group.id' = 'rt_device_20260309_01', -- 消费者组ID(控制消费位置)
  'scan.startup.mode' = 'latest-offset',           -- 从最新数据开始读取
  'format' = 'json'                                -- 数据格式为 JSON
);

3.3 查看当前库中的表

SHOW TABLES;

4.在 Flink SQL 中查询 Kafka 数据

4.1 进入 Flink SQL 客户端

4.2 执行 SQL 查询

在 Flink SQL 客户端中执行以下 SQL 查询,实时查看 Kafka 中的数据:

-- 从 `device_status` 表中实时查询设备 ID、状态、温度和时间戳。
-- 注意:Kafka 源表属于动态表,查询会持续输出数据,需按 Ctrl + C 手动停止
SELECT device_id, status, temperature, ts FROM device_status;


6.4 实验

6.4.1 子实验1:实时在线设备数量统计(10s TUMBLE)

1. 指标说明

  • 任务:每 10 秒统计一次:当前系统中有多少台设备是“在线”的(去重统计)

  • 类型:趋势型指标

  • 窗口:10 秒窗口

2. Flink SQL

(1) 查看窗口的样子

SELECT
  device_id,
  status,
  window_start,
  window_end
FROM TABLE(
  TUMBLE(
    TABLE device_status,
    DESCRIPTOR(ts),
    INTERVAL '10' SECOND
  )
);


(2)每 10 秒统计一次:当前系统中有多少台设备是“在线”的(去重统计)

SELECT
  window_start,                             -- 窗口开始时间
  window_end,                               -- 窗口结束时间
  COUNT(DISTINCT device_id) AS online_count -- 去重统计在线设备数
FROM TABLE(
  TUMBLE(                                   -- 窗口函数(滚动窗口,不重叠)
    TABLE device_status,                    -- 数据来源:Kafka 映射的流表
    DESCRIPTOR(ts),                         -- 使用事件时间 ts 定义窗口
    INTERVAL '10' SECOND                    -- 每10秒一个滚动窗口
  )
)
WHERE status = 1                            -- 只统计在线设备(status=1 表示在线)
GROUP BY window_start, window_end;


6.4.2 子实验2:实时温度趋势(10s/30s HOP)

1. 指标说明

  • 任务:每 10 秒更新一次最近 30 秒内的平均温度和最高温度,用于实时监测设备运行温度变化趋势。

  • 类型:趋势型指标

  • 窗口:30 秒窗口,每 10 秒滑动一次


2. Flink SQL

-- 每 10 秒更新一次“最近 30 秒”的温度趋势
SELECT
  window_start,                             -- 窗口开始时间
  window_end ,                              -- 窗口结束时间
  AVG(temperature) AS avg_temp,             -- 最近30秒的平均温度
  MAX(temperature) AS max_temp              -- 最近30秒的最高温度
FROM TABLE(
  HOP(                                      -- 窗口函数(滑动窗口)
    TABLE device_status,                    -- 数据来源:Kafka流表
    DESCRIPTOR(ts),                         -- 使用 ts 作为事件时间
    INTERVAL '10' SECOND,                   -- 每10秒滑动一次(更新频率)
    INTERVAL '30' SECOND                    -- 窗口大小30秒(统计范围)
  )
)
GROUP BY window_start, window_end;


6.4.3 子实验3:实时告警数量统计(10s TUMBLE)

1. 指标说明

  • 含义:基于 Flink SQL 从设备状态流中实时识别异常并生成告警事件流,再按 10 秒滚动窗口统计系统告警数量,用于实时监控设备运行风险。

  • 类型:规则派生指标

  • 窗口:10 秒滚动窗口

  • 数据来源:device_status_topic(由状态流推导告警)


2. Flink SQL

第一步:由状态流生成告警事件流视图

-- 1.查看当前会话中是否已存在视图
SHOW VIEWS;
-- 2.创建临时视图:告警事件流(只保留异常数据)
CREATE TEMPORARY VIEW v_alarm_event AS
SELECT
  device_id,                                  -- 设备编号
  CASE                                        -- 根据规则判断告警类型
    WHEN temperature >= 80 THEN 'HIGH_TEMP'   -- 温度过高
    WHEN voltage <= 200 THEN 'LOW_VOLT'       -- 电压过低
    WHEN `load` >= 0.90 THEN 'HIGH_LOAD'      -- 负载过高
    WHEN status = 0 THEN 'DEVICE_OFFLINE'     -- 设备离线
  END AS alarm_type,                          -- 告警类型字段
  ts                                          -- 事件时间(用于后续窗口统计)
FROM device_status                            -- 数据来源:设备状态流表
WHERE                                         -- 只筛选“异常数据”
  temperature >= 80
  OR voltage <= 200
  OR `load` >= 0.90
  OR status = 0;
-- 3.查看当前会话中是否已存在视图
SHOW VIEWS;
-- 4.从视图 `v_alarm_event` 中实时查询设备 ID、异常状态和时间戳。
select * from v_alarm_event;

第二步:统计 10 秒内告警数量

-- 每 10 秒统计一次告警总数量

SELECT
  window_start,                               -- 窗口开始时间
  window_end,                                 -- 窗口结束时间
  COUNT(*) AS alarm_count                     -- 该窗口内的告警总次数
FROM TABLE(
  TUMBLE(
    TABLE v_alarm_event,                      -- 数据来源:告警事件流(只包含异常记录)
    DESCRIPTOR(ts),                           -- 使用 ts 作为事件时间
    INTERVAL '10' SECOND                      -- 定义 10 秒滚动窗口(不重叠)
  )
)
GROUP BY window_start, window_end;            -- 每个时间窗口单独统计

3. 验收


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1