李翔-大数据技术

Big data technology!

2026-3-23 实验手册

1. 启动实验环境

1.1 启动Kafka

# 新建一个master窗口,在master/slave1/slave2启动 ZooKeeper
zkServer.sh start

# 在master/slave1/slave2检查 ZooKeeper状态
zkServer.sh status

# 在master/slave1/slave2启动 Kafka
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties

#  检查Topic主题是否存在
kafka-topics.sh --bootstrap-server master:9092 --list


1.2 写入状态流数据到 Kafka

# 在master节点中操作【新开窗口】
cd /opt/datas
# 启动设备状态模拟脚本,并将数据写入 Kafka
python 01_device_status_sim.py | \
kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic



下载POM文件

flink-pom.zip



2.MySQL 先建库建表

启动 Mysql :

# 进入 mysql 客户端
mysql -uroot -p123456


创建数据库与表:

-- 创建数据库 rt(如果不存在则创建),并设置默认字符集为 utf8mb4(支持中文)【用于存放实时指标结果表】
CREATE DATABASE IF NOT EXISTS rt DEFAULT CHARSET utf8mb4;

-- 选择数据库
USE rt;

创建数据表:

--------------------------------------------------
-- 创建表1:10秒粒度的在线设备数量(KPI)
--------------------------------------------------
CREATE TABLE IF NOT EXISTS rt.rt_kpi_online_10s (
  dt DATE,               -- 日期
  ts DATETIME,           -- 时间戳(精确到秒)
  online_count BIGINT,   -- 在线数
  PRIMARY KEY (ts)       -- 主键:时间唯一
);

--------------------------------------------------
-- 表2:10秒粒度的温度趋势指标(KPI)
--------------------------------------------------
CREATE TABLE IF NOT EXISTS rt.rt_temp_trend_10s (
  dt DATE,              -- 日期
  ts DATETIME,          -- 时间戳(精确到秒)
  avg_temp DOUBLE,      -- 平均温度
  max_temp DOUBLE,      -- 最高温度
  PRIMARY KEY (ts)      -- 主键:时间唯一
);

--------------------------------------------------
-- 表3:10秒粒度的告警数量(KPI)
--------------------------------------------------
CREATE TABLE IF NOT EXISTS rt.rt_kpi_alarm_10s (
  dt DATE,              -- 日期
  ts DATETIME,          -- 时间戳(精确到秒)
  alarm_count BIGINT,   -- 告警数量
  PRIMARY KEY (ts)      -- 主键:时间唯一
);

--------------------------------------------------
-- 表4:1分钟粒度的告警 Top5 排行
--------------------------------------------------
CREATE TABLE IF NOT EXISTS rt.rt_alarm_top5_1m (
  dt DATE,                          -- 日期
  window_end DATETIME,              -- 窗口结束时间(统计时间点)
  device_id VARCHAR(64),            -- 设备ID
  cnt BIGINT,                       -- 告警次数(该设备1分钟内触发多少次)
  rn BIGINT,                        -- 排名(Top N,第几名)
  PRIMARY KEY (dt, window_end, rn)  -- 主键:同一时间窗口下排名唯一
);

查看创建好的表:

-- 查看创建的表
use rt;
show tables;




3. 单指标 Job 实现方案

3.1 单指标计算:RtOnlineKpiJob

文件路径:src/main/java/com/demo/flink/kpi/RtOnlineKpiJob.java


功能说明:本作业实现“Kafka 实时数据 → 10 秒在线设备统计 → 数据落库”的完整实时指标链路。

任务目标:

1)从 Kafka 主题 device_status_topic 读取设备状态数据

2)以 10 秒为时间窗口,统计当前在线设备数量(status = 1

3)将统计结果同时输出到:

  • 🖥 控制台(用于开发阶段调试与观察)

  • 🗄 MySQL 数据库(用于后端接口查询和大屏展示)

数据处理流程: Kafka → Flink SQL 窗口统计 → Print 输出 + MySQL 持久化

完成下面代码:

package com.demo.flink.kpi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 【任务】实时统计在线设备数(每 10 秒一条)
 *
 * 【数据流】Kafka(设备上报) → Flink(窗口统计) →(控制台打印 + 写入 MySQL)
 *
 * 【填写说明】共 14 处需要补全,分三种类型:
 *   ★     填词型:填一个值或方法名
 *   ★★    填句型:填写一整行关键语句
 *   ★★★  填块型:填写多行完整逻辑
 */
public class RtOnlineKpiJob {

    public static void main(String[] args) throws Exception {

        // =========================================================
        // 1. 创建 Flink 运行环境
        // =========================================================
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        // ★ TODO 1:开启 Checkpoint,每 30 秒保存一次进度(填写毫秒数)
        env.enableCheckpointing(______);

        System.out.println(">>> 第1步完成:创建 Flink 运行环境成功 ✓");

        // =========================================================
        // 2. 创建 Flink SQL 环境
        // =========================================================

        // ★ TODO 2:设置为流式处理模式(填写方法名,不含括号)
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .______()
                .build();

        // ★★ TODO 3:创建 StreamTableEnvironment(填写完整一行)
        // 参考:StreamTableEnvironment.create(流环境, 配置);
        StreamTableEnvironment tEnv = ________________________________;

        System.out.println(">>> 第2步完成:创建 Flink SQL 环境成功 ✓");

        // =========================================================
        // 3. 注册 Kafka 源表
        // =========================================================
        String createKafkaSource =
                "CREATE TABLE device_status (\n" +
                        "  event_type STRING,\n" +
                        "  device_id STRING,\n" +
                        "  status INT,\n" +
                        "  temperature DOUBLE,\n" +
                        "  `load` DOUBLE,\n" +
                        "  voltage DOUBLE,\n" +
                        "  event_time STRING,\n" +

                        // ★★ TODO 4:填写时间戳派生字段(整行)
                        // 要求:把字符串 event_time 转成时间戳,字段名为 ts
                        // 参考:字段名 AS TO_TIMESTAMP(原字段名),
                        "  ______,\n" +

                        // ★★ TODO 5:填写 Watermark 声明(整行)
                        // 要求:基于 ts 字段,允许数据最多晚到 5 秒
                        // 参考:WATERMARK FOR 时间字段 AS 时间字段 - INTERVAL '秒' SECOND
                        "  ______\n" +

                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'device_status_topic',\n" +
                        "  'properties.bootstrap.servers' = 'master:9092',\n" +
                        "  'properties.group.id' = 'rt_online_kpi_g1',\n" +

                        // ★ TODO 6:填写消费模式(只读最新数据,不回放历史)
                        "  'scan.startup.mode' = '______',\n" +

                        "  'format' = 'json',\n" +
                        "  'json.ignore-parse-errors' = 'true'\n" +
                        ")";

        tEnv.executeSql(createKafkaSource);

        System.out.println(">>> 第3步完成:Kafka 源表注册成功 ✓");

        // =========================================================
        // 4. 编写统计 SQL:每 10 秒统计一次"在线设备数"
        // =========================================================
        String kpiSql =
                "SELECT\n" +
                        "  CAST(window_end AS DATE) AS dt,\n" +
                        "  window_end AS ts,\n" +

                        // ★★ TODO 7:统计不重复在线设备数(整行)
                        // 要求:对 device_id 去重计数,结果别名为 online_count
                        // 参考:COUNT(DISTINCT 字段) AS 别名
                        "  ______\n" +

                        "FROM TABLE(\n" +

                        // ★★ TODO 8:填写完整的 TUMBLE 窗口(整行)
                        // 要求:
                        //   - 窗口类型:TUMBLE(滚动窗口)
                        //   - 数据来源:device_status 表
                        //   - 时间字段:ts
                        //   - 窗口大小:10 秒
                        // 参考:窗口函数(TABLE 表名, DESCRIPTOR(时间字段), INTERVAL '大小' SECOND)
                        "  ______\n" +

                        ")\n" +

                        // ★ TODO 9:填写过滤条件的值(status = ? 表示在线)
                        "WHERE status = ______\n" +

                        "GROUP BY window_start, window_end";

        // ★★ TODO 10:将统计结果注册为临时视图(整行)
        // 要求:视图名为 v_kpi_online_10s,供后面 INSERT 使用
        // 参考:tEnv.executeSql("CREATE TEMPORARY VIEW 视图名 AS \n" + sql变量);
        ______;

        System.out.println(">>> 第4步完成:统计视图创建成功 ✓");

        // =========================================================
        // 5. 注册 MySQL Sink 表
        // =========================================================
        String createMySqlSink =
                "CREATE TABLE mysql_kpi_online_10s (\n" +
                        "  dt DATE,\n" +
                        "  ts TIMESTAMP(3),\n" +
                        "  online_count BIGINT\n" +
                        ") WITH (\n" +
                        "  'connector'='jdbc',\n" +
                        "  'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
                        "  'table-name'='rt_kpi_online_10s',\n" +
                        "  'username'='root',\n" +
                        "  'password'='123456',\n" +
                        "  'driver'='com.mysql.cj.jdbc.Driver',\n" +

                        // ★ TODO 11:填写批量写入配置的值
                        // 要求:攒满 100 行 或 每隔 2 秒,触发一次写入(以先到为准)
                        "  'sink.buffer-flush.max-rows'='______',\n" +
                        "  'sink.buffer-flush.interval'='______'\n" +

                        ")";

        tEnv.executeSql(createMySqlSink);

        System.out.println(">>> 第5步完成:MySQL Sink 表注册成功 ✓");

        // =========================================================
        // 6. 注册 Print Sink
        // =========================================================
        // 说明:print 不是存储,只是把结果输出到控制台,方便你看"有没有算出来"
        String createPrintSink =
                "CREATE TABLE print_kpi_online_10s (\n" +
                        "  dt DATE,\n" +
                        "  ts TIMESTAMP(3),\n" +
                        "  online_count BIGINT\n" +
                        ") WITH (\n" +

                        // ★ TODO 12:填写连接器名称(输出到控制台)
                        "  'connector'='______'\n" +

                        ")";

        tEnv.executeSql(createPrintSink);

        System.out.println(">>> 第6步完成:Print Sink 表注册成功 ✓");

        // =========================================================
        // 7. 把同一份结果同时写两处:控制台 + MySQL
        // =========================================================
        StatementSet stmtSet = tEnv.createStatementSet();

        // ★★ TODO 13:补全写入控制台的 INSERT 语句
        // 要求:从视图 v_kpi_online_10s 查出 dt、ts、online_count,写入 print_kpi_online_10s
        stmtSet.addInsertSql(
                "INSERT INTO ______ SELECT dt, ts, online_count FROM ______"
        );

        // ★★ TODO 14:补全写入 MySQL 的 INSERT 语句
        // 要求:从视图 v_kpi_online_10s 查出 dt、ts、online_count,写入 mysql_kpi_online_10s
        stmtSet.addInsertSql(
                "INSERT INTO ______ SELECT dt, ts, online_count FROM ______"
        );

        System.out.println(">>> 第7步:提交Flink任务,开始实时计算...");
        
        // 运行作业:
        // - 程序不会自动结束(Kafka 数据一直来,就一直算)
        // - 想停止:在控制台按 Ctrl + C 或在 Flink Web UI 取消作业
        stmtSet.execute();
    }
}

📌 验收结果:

-- 1.在Mysql中查询最新5条在线设备数(验证实时写入是否正常)
SELECT * 
FROM rt.rt_kpi_online_10s 
ORDER BY ts DESC 
LIMIT 5;

-- 2.统计最近2分钟数据条数(验证是否持续输出,10秒≈1条)
SELECT COUNT(*) AS rows_2min
FROM rt.rt_kpi_online_10s
WHERE ts >= NOW() - INTERVAL 2 MINUTE;



3.2  单指标计算:RtTempTrendJob

文件路径:src/main/java/com/demo/flink/kpi/RtTempTrendJob.java


功能说明:本作业用于实现 实时温度趋势统计指标,通过使用 HOP 窗口对温度数据进行实时统计和处理。

任务目标:

1)从 Kafka 读取设备状态数据。

2)使用 30 秒窗口,每 10 秒滑动一次(HOP 窗口),实时计算温度趋势。

3)统计温度指标:

  • 平均温度(avg_temp

  • 最高温度(max_temp

4)将统计结果同时输出到:

  • 🖥 控制台(用于调试和观察结果)

  • 🗄 MySQL(用于后端接口查询和大屏展示)

数据处理流程: Kafka → HOP 窗口统计 → Print 输出 + MySQL 持久化


完成下面代码:

package com.demo.flink.kpi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 任  务:实时统计温度趋势
 * 规则说明:
 * - 窗口长度:30秒
 * - 每10秒滑动一次(HOP窗口)
 *
 * 数据流:
 * Kafka → Flink SQL(HOP窗口统计) → (Print + MySQL)
 *
 * 【填写说明】共 14 处需要补全,分三种类型:
 *   ★     填词型:填一个值或方法名
 *   ★★    填句型:填写一整行关键语句
 *   ★★★  填块型:填写多行完整逻辑
 */
public class RtTempTrendJob {

    public static void main(String[] args) throws Exception {

        // 1)创建 Flink 运行环境 =====================================
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        // ★ TODO 1:设置并行度为 1(填写数字)
        env.setParallelism(______);

        // ★ TODO 2:开启 Checkpoint,每 30 秒保存一次进度(填写毫秒数)
        env.enableCheckpointing(______);

        System.out.println(">>> 第1步完成:创建 Flink 运行环境成功 ✓");

        // 2)创建 Flink SQL 环境 ====================================

        // ★ TODO 3:设置为流式处理模式(填写方法名,不含括号)
        EnvironmentSettings settings =
                EnvironmentSettings.newInstance()
                        .______()
                        .build();

        // ★★ TODO 4:创建 StreamTableEnvironment(填写完整一行)
        // 参考:StreamTableEnvironment.create(流环境, 配置);
        StreamTableEnvironment tEnv =
                _______________________________________;

        System.out.println(">>> 第2步完成:创建 Flink SQL 环境成功 ✓");

        // 3)把 Kafka Topic 映射成 Flink SQL 的"表" ==================
        String createKafkaSource =
                "CREATE TABLE device_status (\n" +
                        "  event_type STRING,\n" +
                        "  device_id STRING,\n" +
                        "  status INT,\n" +
                        "  temperature DOUBLE,\n" +
                        "  `load` DOUBLE,\n" +
                        "  voltage DOUBLE,\n" +
                        "  event_time STRING,\n" +

                        // ★★ TODO 5:填写时间戳派生字段(整行含逗号)
                        // 要求:把字符串 event_time 转成时间戳,字段名为 ts
                        // 参考:字段名 AS TO_TIMESTAMP(原字段名),
                        "  ______,\n" +

                        // ★★ TODO 6:填写 Watermark 声明(整行,无逗号)
                        // 要求:基于 ts 字段,允许数据最多晚到 5 秒
                        // 参考:WATERMARK FOR 时间字段 AS 时间字段 - INTERVAL '秒' SECOND
                        "  ______\n" +

                        ") WITH (\n" +
                        "  'connector'='kafka',\n" +
                        "  'topic'='device_status_topic',\n" +
                        "  'properties.bootstrap.servers'='master:9092',\n" +
                        "  'properties.group.id'='rt_temp_trend_g1',\n" +

                        // ★ TODO 7:填写消费模式(只读最新数据,不回放历史)
                        "  'scan.startup.mode'='______',\n" +

                        "  'format'='json',\n" +
                        "  'json.ignore-parse-errors'='true'\n" +
                        ")";

        tEnv.executeSql(createKafkaSource);

        System.out.println(">>> 第3步完成:Kafka 源表注册成功 ✓");

        // 4)编写趋势统计 SQL(HOP窗口) ============================
        // HOP 窗口说明:
        // - 第一个时间参数:滑动步长(每隔多久触发一次)
        // - 第二个时间参数:窗口长度(每次覆盖多长时间的数据)
        // 本任务:每 10 秒触发,覆盖最近 30 秒数据

        String trendSql =
                "SELECT\n" +
                        "  CAST(window_end AS DATE) AS dt,\n" +
                        "  window_end AS ts,\n" +

                        // ★★ TODO 8:填写平均温度聚合(整行含逗号)
                        // 要求:计算平均温度,别名为 avg_temp
                        // 参考:聚合函数(字段名) AS 别名,
                        "  ______,\n" +

                        // ★★ TODO 9:填写最高温度聚合(整行,无逗号)
                        // 要求:计算最高温度,别名为 max_temp
                        "  ______\n" +

                        "FROM TABLE(\n" +

                        // ★★★ TODO 10:填写完整的 HOP 窗口(整块,共2行)
                        // 要求:
                        //   - 窗口类型:HOP(滑动窗口)
                        //   - 数据来源:device_status 表,时间字段:ts
                        //   - 第一个时间参数:滑动步长 10 秒
                        //   - 第二个时间参数:窗口长度 30 秒
                        // 参考:
                        //   HOP(TABLE 表名, DESCRIPTOR(时间字段),
                        //       INTERVAL '步长' SECOND, INTERVAL '长度' SECOND)
                        "  ______\n" +
                        "      ______\n" +

                        ")\n" +
                        "GROUP BY window_start, window_end";

        // ★★ TODO 11:将统计结果注册为临时视图(整行)
        // 要求:视图名为 v_temp_trend_hop,供后面 INSERT 使用
        // 参考:tEnv.executeSql("CREATE TEMPORARY VIEW 视图名 AS \n" + sql变量);
        ______;

        System.out.println(">>> 第4步完成:统计视图创建成功 ✓");

        // 5)定义 MySQL Sink 表 =====================================
        // 注意:rt_temp_trend_10s 必须提前在 MySQL 建好表结构
        String createMySqlSink =
                "CREATE TABLE mysql_temp_trend_10s (\n" +
                        "  dt DATE,\n" +
                        "  ts TIMESTAMP(3),\n" +
                        "  avg_temp DOUBLE,\n" +
                        "  max_temp DOUBLE\n" +
                        ") WITH (\n" +
                        "  'connector'='jdbc',\n" +
                        "  'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
                        "  'table-name'='rt_temp_trend_10s',\n" +
                        "  'username'='root',\n" +
                        "  'password'='123456',\n" +
                        "  'driver'='com.mysql.cj.jdbc.Driver',\n" +

                        // ★ TODO 12:填写批量写入配置的值
                        // 要求:攒满 100 行 或 每隔 2 秒,触发一次写入(以先到为准)
                        "  'sink.buffer-flush.max-rows'='______',\n" +
                        "  'sink.buffer-flush.interval'='______'\n" +

                        ")";

        tEnv.executeSql(createMySqlSink);

        System.out.println(">>> 第5步完成:MySQL Sink 表注册成功 ✓");

        // 6)定义 Print Sink(控制台输出) ==========================
        String createPrintSink =
                "CREATE TABLE print_temp_trend_10s (\n" +
                        "  dt DATE,\n" +
                        "  ts TIMESTAMP(3),\n" +
                        "  avg_temp DOUBLE,\n" +
                        "  max_temp DOUBLE\n" +
                        ") WITH (\n" +

                        // ★ TODO 13:填写连接器名称(输出到控制台)
                        "  'connector'='______'\n" +

                        ")";

        tEnv.executeSql(createPrintSink);

        System.out.println(">>> 第6步完成:Print Sink 表注册成功 ✓");

        // 7)同时输出两份:Print + MySQL ==========================
        StatementSet stmtSet = tEnv.createStatementSet();

        // ★★ TODO 14:补全写入控制台的 INSERT 语句
        // 要求:从视图 v_temp_trend_hop 查出 dt、ts、avg_temp、max_temp,写入 print_temp_trend_10s
        stmtSet.addInsertSql(
                "INSERT INTO ______ SELECT dt, ts, avg_temp, max_temp FROM ______"
        );

        // ★★ TODO 15:补全写入 MySQL 的 INSERT 语句
        // 要求:从视图 v_temp_trend_hop 查出 dt、ts、avg_temp、max_temp,写入 mysql_temp_trend_10s
        stmtSet.addInsertSql(
                "INSERT INTO ______ SELECT dt, ts, avg_temp, max_temp FROM ______"
        );

        System.out.println(">>> 第7步:提交Flink任务,开始实时计算...");
        
        // ★ TODO 16:执行 StatementSet,启动作业(填写方法名)
        // 提示:stmtSet.方法名();
        stmtSet.______();
    }
}

📌 验收结果:

-- 1.在 MySQL 中查询最新5条温度趋势数据(验证实时写入是否正常)
SELECT * 
FROM rt.rt_temp_trend_10s 
ORDER BY ts DESC 
LIMIT 5;

-- 2.统计最近2分钟数据条数(验证是否持续输出,约10秒1条)
SELECT COUNT(*) AS rows_2min
FROM rt.rt_temp_trend_10s
WHERE ts >= NOW() - INTERVAL 2 MINUTE;



3.3  单指标计算:RtAlarmKpiJob

文件路径:src/main/java/com/demo/flink/kpi/RtAlarmKpiJob.java


功能说明:本模块用于实现 实时告警数量统计功能(10 秒粒度),根据设备的状态数据实时计算告警数量,并将结果输出到 MySQL 数据库和控制台。

任务目标:

1)从 Kafka 设备状态流中识别“告警事件”

2)使用 10 秒滚动窗口(TUMBLE)统计告警数量

3)将统计结果写入 MySQL 指标表

数据处理流程:

Kafka(设备状态流) → Flink SQL(规则派生告警事件流) → Flink SQL(10秒 TUMBLE 窗口统计) → Print 控制台输出 + MySQL 持久化


完成下面代码:

package com.demo.flink.kpi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 任  务:实时统计告警数量(每10秒)
 *
 * 数据流:
 * Kafka(device_status_topic)
 *   → Flink SQL(规则派生告警事件流 v_alarm_event)
 *   → Flink SQL(10s TUMBLE 统计告警数量)
 *   → (Print + MySQL)
 *
 * 【填写说明】共 17 处需要补全,分三种类型:
 *   ★     填词型:填一个值或方法名
 *   ★★    填句型:填写一整行关键语句
 *   ★★★  填块型:填写多行完整逻辑
 */
public class RtAlarmKpiJob {

    public static void main(String[] args) throws Exception {

        // 1)创建 Flink 运行环境 =====================================
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        // ★ TODO 1:设置并行度为 1(填写数字)
        env.setParallelism(______);

        // ★ TODO 2:开启 Checkpoint,每 10 秒保存一次进度(填写毫秒数)
        env.enableCheckpointing(______);

        System.out.println(">>> 第1步完成:创建 Flink 运行环境成功 ✓");

        // 2)创建 Flink SQL 环境 ====================================

        // ★ TODO 3:设置为流式处理模式(填写方法名,不含括号)
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .______()
                .build();

        // ★★ TODO 4:创建 StreamTableEnvironment(填写完整一行)
        // 参考:StreamTableEnvironment.create(流环境, 配置);
        StreamTableEnvironment tEnv = ____________________________;

        System.out.println(">>> 第2步完成:创建 Flink SQL 环境成功 ✓");

        // 3)把 Kafka Topic 映射成 Flink SQL 的"表" ==================
        String createKafkaSource =
                "CREATE TABLE device_status (\n" +
                        "  event_type STRING,\n" +
                        "  device_id STRING,\n" +
                        "  status INT,\n" +
                        "  temperature DOUBLE,\n" +
                        "  `load` DOUBLE,\n" +
                        "  voltage DOUBLE,\n" +
                        "  event_time STRING,\n" +

                        // ★★ TODO 5:填写时间戳派生字段(整行含逗号)
                        // 要求:把字符串 event_time 转成时间戳,字段名为 ts
                        // 参考:字段名 AS TO_TIMESTAMP(原字段名),
                        "  ______,\n" +

                        // ★★ TODO 6:填写 Watermark 声明(整行,无逗号)
                        // 要求:基于 ts 字段,允许数据最多晚到 5 秒
                        // 参考:WATERMARK FOR 时间字段 AS 时间字段 - INTERVAL '秒' SECOND
                        "  ______\n" +

                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'device_status_topic',\n" +
                        "  'properties.bootstrap.servers' = 'master:9092',\n" +
                        "  'properties.group.id' = 'rt_alarm_kpi_g1',\n" +

                        // ★ TODO 7:填写消费模式(只读最新数据,不回放历史)
                        "  'scan.startup.mode' = '______',\n" +

                        "  'format' = 'json',\n" +
                        "  'json.ignore-parse-errors' = 'true'\n" +
                        ")";
        tEnv.executeSql(createKafkaSource);

        System.out.println(">>> 第3步完成:Kafka 源表注册成功 ✓");

        // 4)第一步:由状态流生成"告警事件流" =========================
        // 规则说明:
        // - WHERE:过滤出异常记录(只保留满足告警条件的数据)
        // - CASE:把异常原因分类成告警类型 alarm_type
        // 告警规则:温度>=80 / 电压<=200 / 负载>=0.90 / 设备离线(status=0)

        String createAlarmEventView =
                "CREATE TEMPORARY VIEW v_alarm_event AS\n" +
                        "SELECT\n" +
                        "  device_id,\n" +
                        "  CASE\n" +

                        // ★★★ TODO 8:填写 CASE WHEN 的四个告警分类(整块,共4行)
                        // 要求:
                        //   温度>=80      → 'HIGH_TEMP'
                        //   电压<=200     → 'LOW_VOLT'
                        //   负载>=0.90    → 'HIGH_LOAD'
                        //   status=0      → 'DEVICE_OFFLINE'
                        // 参考:WHEN 条件 THEN '告警类型'
                        "    WHEN ______ THEN '______'\n" +
                        "    WHEN ______ THEN '______'\n" +
                        "    WHEN ______ THEN '______'\n" +
                        "    WHEN ______ THEN '______'\n" +

                        "  END AS alarm_type,\n" +
                        "  ts\n" +
                        "FROM device_status\n" +
                        "WHERE\n" +

                        // ★★★ TODO 9:填写 WHERE 过滤条件(整块,共4行)
                        // 要求:满足上述任意一个告警条件即保留(用 OR 连接)
                        "  ______\n" +
                        "  OR ______\n" +
                        "  OR ______\n" +
                        "  OR ______";
        tEnv.executeSql(createAlarmEventView);

        System.out.println(">>> 第4步完成:告警事件流视图创建成功 ✓");

        // 5)第二步:统计 10 秒内告警数量(10s TUMBLE) ===============
        String kpiSql =
                "SELECT\n" +
                        "  CAST(window_end AS DATE) AS dt,\n" +
                        "  window_end AS ts,\n" +

                        // ★★ TODO 10:统计告警总条数(整行)
                        // 要求:对所有告警事件计数,别名为 alarm_count
                        // 提示:不需要去重,直接 COUNT(*) 即可
                        "  ______\n" +

                        "FROM TABLE(\n" +

                        // ★★ TODO 11:填写完整的 TUMBLE 窗口(整行)
                        // 要求:对 v_alarm_event 做 10 秒滚动窗口,时间字段为 ts
                        // 参考:TUMBLE(TABLE 表名, DESCRIPTOR(时间字段), INTERVAL '秒' SECOND)
                        "  ______\n" +

                        ")\n" +
                        "GROUP BY window_start, window_end";

        // ★★ TODO 12:将统计结果注册为临时视图(整行)
        // 要求:视图名为 v_kpi_alarm_10s,供后面 INSERT 使用
        // 参考:tEnv.executeSql("CREATE TEMPORARY VIEW 视图名 AS \n" + sql变量);
        ______;

        System.out.println(">>> 第5步完成:告警统计视图创建成功 ✓");

        // 6)定义 MySQL Sink 表 =====================================
        // 注意:MySQL 中 rt_kpi_alarm_10s 必须提前建好表结构
        String createMySqlSink =
                "CREATE TABLE mysql_kpi_alarm_10s (\n" +
                        "  dt DATE,\n" +
                        "  ts TIMESTAMP(3),\n" +
                        "  alarm_count BIGINT\n" +
                        ") WITH (\n" +
                        "  'connector'='jdbc',\n" +
                        "  'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
                        "  'table-name'='rt_kpi_alarm_10s',\n" +
                        "  'username'='root',\n" +
                        "  'password'='123456',\n" +
                        "  'driver'='com.mysql.cj.jdbc.Driver',\n" +

                        // ★ TODO 13:填写批量写入配置的值
                        // 要求:攒满 100 行 或 每隔 2 秒,触发一次写入(以先到为准)
                        "  'sink.buffer-flush.max-rows'='______',\n" +
                        "  'sink.buffer-flush.interval'='______'\n" +

                        ")";
        tEnv.executeSql(createMySqlSink);

        System.out.println(">>> 第6步完成:MySQL Sink 表注册成功 ✓");

        // 7)定义 Print Sink(控制台输出) ===========================
        // 用于课堂观察统计结果(不落库,只打印)
        String createPrintSink =
                "CREATE TABLE print_kpi_alarm_10s (\n" +
                        "  dt DATE,\n" +
                        "  ts TIMESTAMP(3),\n" +
                        "  alarm_count BIGINT\n" +
                        ") WITH (\n" +

                        // ★ TODO 14:填写连接器名称(输出到控制台)
                        "  'connector'='______'\n" +

                        ")";
        tEnv.executeSql(createPrintSink);

        System.out.println(">>> 第7步完成:Print Sink 表注册成功 ✓");

        // 8)同时输出两份:Print + MySQL ============================
        // StatementSet:一次提交多个 INSERT 任务
        StatementSet stmtSet = tEnv.createStatementSet();

        // ★★ TODO 15:补全写入控制台的 INSERT 语句
        // 要求:从视图 v_kpi_alarm_10s 查出 dt、ts、alarm_count,写入 print_kpi_alarm_10s
        stmtSet.addInsertSql(
                "INSERT INTO ______ SELECT dt, ts, alarm_count FROM ______"
        );

        // ★★ TODO 16:补全写入 MySQL 的 INSERT 语句
        // 要求:从视图 v_kpi_alarm_10s 查出 dt、ts、alarm_count,写入 mysql_kpi_alarm_10s
        stmtSet.addInsertSql(
                "INSERT INTO ______ SELECT dt, ts, alarm_count FROM ______"
        );

        System.out.println(">>> 第8步:提交Flink任务,开始实时计算...");
        // ★ TODO 17:执行 StatementSet,启动作业(填写方法名)
        // 提示:stmtSet.方法名();
        stmtSet.______();
    }
}

📌 验收结果:

-- 1.在 MySQL 中查询最新5条告警数量数据(验证实时写入是否正常)
SELECT *
FROM rt.rt_kpi_alarm_10s
ORDER BY ts DESC
LIMIT 5;

-- 2.统计最近2分钟数据条数(验证是否持续输出,约10秒1条)
SELECT COUNT(*) AS rows_2min
FROM rt.rt_kpi_alarm_10s
WHERE ts >= NOW() - INTERVAL 2 MINUTE;



3.4  单指标计算:RtAlarmTop5Job

路径:src/main/java/com/demo/flink/kpi/RtAlarmTop5Job.java 功能说明:本模块用于实现 实时统计告警 Top5 排行功能,根据每台设备的告警次数,实时计算每个时间窗口内告警次数最多的前 5 个设备,并将结果输出到控制台和 MySQL 数据库。

任务目标:

1)从 Kafka 读取告警事件数据

2)使用 1 分钟窗口统计每台设备的告警次数

3)计算每个窗口内告警次数最多的前 5 个设备

4)将结果输出到:

  • 🖥 控制台(用于调试和观察每个窗口的 Top 5 排行)

  • 🗄 MySQL 数据库(用于后端查询和大屏展示)

数据处理流程: Kafka → Flink SQL(统计每台设备的告警次数) → Flink SQL(Top5 排行) → Print 控制台输出 + MySQL 持久化


完成以下代码:

package com.demo.flink.kpi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 任  务:实时统计「告警 Top5 设备」(每 1 分钟更新一组)
 *
 * 数据流:
 * Kafka(device_status_topic:设备状态流)
 *   → Flink SQL(规则派生告警事件流 v_alarm_event)
 *   → Flink SQL(1 分钟窗口统计每台设备告警次数 alarm_cnt)
 *   → Flink SQL(窗口内按次数排序取 Top5:v_alarm_top5_1m)
 *   → (Print + MySQL)
 *
 * 【填写说明】共 8 处需要补全:
 *   ★★    填句型:填写一整行关键语句
 *   ★★★  填块型:填写多行完整逻辑
 */
public class RtAlarmTop5Job {

    public static void main(String[] args) throws Exception {

        // 1)创建 Flink 运行环境 =====================================

        // ★★★ TODO 1:获取流执行环境、设置并行度为1、开启30秒Checkpoint(共3行)
        StreamExecutionEnvironment env = ______;
        env.______;
        env.______;

        System.out.println(">>> 第1步完成:创建 Flink 运行环境成功 ✓");

        // 2)创建 Flink SQL 环境 ====================================

        // ★★ TODO 2:补全 SQL 环境创建的关键两行(流式模式 + 创建 tEnv)
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .______()   // 设置为流式模式
                .build();
        StreamTableEnvironment tEnv = ______;

        System.out.println(">>> 第2步完成:创建 Flink SQL 环境成功 ✓");

        // 3)注册 Kafka 源表 ========================================
        // 已给出建表框架,补全括号内的关键 WITH 配置项

        String createKafkaSource =
                "CREATE TABLE device_status (\n" +
                "  event_type STRING,\n" +
                "  device_id STRING,\n" +
                "  status INT,\n" +
                "  temperature DOUBLE,\n" +
                "  `load` DOUBLE,\n" +
                "  voltage DOUBLE,\n" +
                "  event_time STRING,\n" +
                "  ts AS TO_TIMESTAMP(event_time),\n" +
                // ★★ TODO 3:补全 Watermark 定义(允许 5 秒乱序)
                "  ______\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'device_status_topic',\n" +
                "  'properties.bootstrap.servers' = 'master:9092',\n" +
                "  'properties.group.id' = 'rt_alarm_top5_g1',\n" +
                // ★★ TODO 3(续):补全消费模式(只读最新数据)和 json 忽略解析错误 两项配置
                "  'scan.startup.mode' = '______',\n" +
                "  'format' = 'json',\n" +
                "  '______' = 'true'\n" +
                ")";
        tEnv.executeSql(createKafkaSource);

        System.out.println(">>> 第3步完成:Kafka 源表注册成功 ✓");

        // 4)创建告警事件流 =========================================

        // ★★★ TODO 4:补全 CASE WHEN 告警分类 及 WHERE 过滤条件(整块)
        // 告警规则:temperature>=80 / voltage<=200 / load>=0.90 / status=0
        String alarmEventViewSql =
                "CREATE TEMPORARY VIEW v_alarm_event AS\n" +
                "SELECT\n" +
                "  device_id,\n" +
                "  CASE\n" +
                "    WHEN ______ THEN 'HIGH_TEMP'\n" +
                "    WHEN ______ THEN 'LOW_VOLT'\n" +
                "    WHEN ______ THEN 'HIGH_LOAD'\n" +
                "    WHEN ______ THEN 'DEVICE_OFFLINE'\n" +
                "  END AS alarm_type,\n" +
                "  ts\n" +
                "FROM device_status\n" +
                "WHERE\n" +
                "  ______\n" +
                "  OR ______\n" +
                "  OR ______\n" +
                "  OR ______";
        tEnv.executeSql(alarmEventViewSql);

        System.out.println(">>> 第4步完成:告警事件流视图创建成功 ✓");

        // 5)统计每台设备告警次数 ===================================

        // ★★★ TODO 5:补全滚动窗口聚合 SQL(整块)
        // 要求:TUMBLE 窗口大小 1 分钟,GROUP BY window_start、window_end、device_id
        String cntSql =
                "CREATE TEMPORARY VIEW alarm_cnt AS\n" +
                "SELECT\n" +
                "  window_end,\n" +
                "  device_id,\n" +
                "  COUNT(*) AS cnt\n" +
                "FROM TABLE(\n" +
                // 填写 TUMBLE 窗口表函数调用(数据来源、时间字段、窗口大小)
                "  ______\n" +
                ")\n" +
                "GROUP BY ______, ______, ______";
        tEnv.executeSql(cntSql);

        System.out.println(">>> 第5步完成:设备告警次数统计视图创建成功 ✓");

        // 6)计算 Top5 ==============================================

        // ★★★ TODO 6:补全 ROW_NUMBER() 排名逻辑及外层过滤(整块)
        // 要求:PARTITION BY window_end,ORDER BY cnt DESC,只保留 rn<=5
        String top5ViewSql =
                "CREATE TEMPORARY VIEW v_alarm_top5_1m AS\n" +
                "SELECT\n" +
                "  CAST(window_end AS DATE) AS dt,\n" +
                "  window_end, device_id, cnt, rn\n" +
                "FROM (\n" +
                "  SELECT\n" +
                "    window_end, device_id, cnt,\n" +
                "    ROW_NUMBER() OVER (PARTITION BY ______ ORDER BY ______) AS rn\n" +
                "  FROM alarm_cnt\n" +
                ")\n" +
                "WHERE ______";
        tEnv.executeSql(top5ViewSql);

        System.out.println(">>> 第6步完成:Top5 视图创建成功 ✓");

        // 7)注册 MySQL Sink ========================================
        // 已给出完整建表语句,仅需补全批量写入的两项关键配置

        String createMySqlSink =
                "CREATE TABLE mysql_alarm_top5_1m (\n" +
                "  dt DATE,\n" +
                "  window_end TIMESTAMP(3),\n" +
                "  device_id STRING,\n" +
                "  cnt BIGINT,\n" +
                "  rn BIGINT,\n" +
                "  PRIMARY KEY (dt, window_end, rn) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector'='jdbc',\n" +
                "  'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
                "  'table-name'='rt_alarm_top5_1m',\n" +
                "  'username'='root',\n" +
                "  'password'='123456',\n" +
                "  'driver'='com.mysql.cj.jdbc.Driver',\n" +
                // ★★ TODO 7:补全批量写入两项配置(攒满100行 或 每隔2秒写一次)
                "  '______'='100',\n" +
                "  '______'='2s'\n" +
                ")";
        tEnv.executeSql(createMySqlSink);

        System.out.println(">>> 第7步完成:MySQL Sink 表注册成功 ✓");

        // 8)注册 Print Sink ========================================
        // 已给出字段,补全建表语句头和 WITH 配置

        String createPrintSink =
                // ★★ TODO 8:补全 CREATE TABLE 语句及 connector 配置(print)
                "______ print_alarm_top5_1m (\n" +
                "  dt DATE,\n" +
                "  window_end TIMESTAMP(3),\n" +
                "  device_id STRING,\n" +
                "  cnt BIGINT,\n" +
                "  rn BIGINT\n" +
                ") WITH (\n" +
                "  'connector'='______'\n" +
                ")";
        tEnv.executeSql(createPrintSink);

        System.out.println(">>> 第8步完成:Print Sink 表注册成功 ✓");

        // 9)输出到 Print + MySQL ===================================

        StatementSet stmtSet = tEnv.createStatementSet();

        stmtSet.addInsertSql(
                "INSERT INTO print_alarm_top5_1m " +
                "SELECT dt, window_end, device_id, cnt, rn FROM v_alarm_top5_1m"
        );
        stmtSet.addInsertSql(
                "INSERT INTO mysql_alarm_top5_1m " +
                "SELECT dt, window_end, device_id, cnt, rn FROM v_alarm_top5_1m"
        );

        System.out.println(">>> 第9步:提交Flink任务,开始实时计算...");
        stmtSet.execute();
    }
}

📌 验收结果:

-- 1.验证“告警 Top5 排行”是否正常写入
-- 作用:
--   查询最新一个窗口的 Top 排行数据
--   - 按 window_end 倒序(最新窗口在前)
--   - 按 rn 升序(第1名在前)
-- 验收点:
--   ✔ 能查到数据
--   ✔ rn 从 1 开始递增
--   ✔ window_end 为最近时间
SELECT *
FROM rt.rt_alarm_top5_1m
ORDER BY window_end DESC, rn ASC
LIMIT 5;

-- 2.查看最近 2 分钟是否持续写入(10秒一条)
SELECT COUNT(*) AS rows_2min
FROM rt.rt_kpi_alarm_10s
WHERE ts >= NOW() - INTERVAL 2 MINUTE;


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1