1.1 启动Kafka
# 新建一个master窗口,在master/slave1/slave2启动 ZooKeeper zkServer.sh start # 在master/slave1/slave2检查 ZooKeeper状态 zkServer.sh status # 在master/slave1/slave2启动 Kafka kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties # 检查Topic主题是否存在 kafka-topics.sh --bootstrap-server master:9092 --list
1.2 写入状态流数据到 Kafka
# 在master节点中操作【新开窗口】 cd /opt/datas # 启动设备状态模拟脚本,并将数据写入 Kafka python 01_device_status_sim.py | \ kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic
下载POM文件
2.MySQL 先建库建表
启动 Mysql :
# 进入 mysql 客户端 mysql -uroot -p123456
创建数据库与表:
-- 创建数据库 rt(如果不存在则创建),并设置默认字符集为 utf8mb4(支持中文)【用于存放实时指标结果表】 CREATE DATABASE IF NOT EXISTS rt DEFAULT CHARSET utf8mb4; -- 选择数据库 USE rt;
创建数据表:
-------------------------------------------------- -- 创建表1:10秒粒度的在线设备数量(KPI) -------------------------------------------------- CREATE TABLE IF NOT EXISTS rt.rt_kpi_online_10s ( dt DATE, -- 日期 ts DATETIME, -- 时间戳(精确到秒) online_count BIGINT, -- 在线数 PRIMARY KEY (ts) -- 主键:时间唯一 ); -------------------------------------------------- -- 表2:10秒粒度的温度趋势指标(KPI) -------------------------------------------------- CREATE TABLE IF NOT EXISTS rt.rt_temp_trend_10s ( dt DATE, -- 日期 ts DATETIME, -- 时间戳(精确到秒) avg_temp DOUBLE, -- 平均温度 max_temp DOUBLE, -- 最高温度 PRIMARY KEY (ts) -- 主键:时间唯一 ); -------------------------------------------------- -- 表3:10秒粒度的告警数量(KPI) -------------------------------------------------- CREATE TABLE IF NOT EXISTS rt.rt_kpi_alarm_10s ( dt DATE, -- 日期 ts DATETIME, -- 时间戳(精确到秒) alarm_count BIGINT, -- 告警数量 PRIMARY KEY (ts) -- 主键:时间唯一 ); -------------------------------------------------- -- 表4:1分钟粒度的告警 Top5 排行 -------------------------------------------------- CREATE TABLE IF NOT EXISTS rt.rt_alarm_top5_1m ( dt DATE, -- 日期 window_end DATETIME, -- 窗口结束时间(统计时间点) device_id VARCHAR(64), -- 设备ID cnt BIGINT, -- 告警次数(该设备1分钟内触发多少次) rn BIGINT, -- 排名(Top N,第几名) PRIMARY KEY (dt, window_end, rn) -- 主键:同一时间窗口下排名唯一 );
查看创建好的表:
-- 查看创建的表 use rt; show tables;
3.1 单指标计算:RtOnlineKpiJob
文件路径:
src/main/java/com/demo/flink/kpi/RtOnlineKpiJob.java
功能说明:本作业实现“Kafka 实时数据 → 10 秒在线设备统计 → 数据落库”的完整实时指标链路。
任务目标:
1)从 Kafka 主题 device_status_topic 读取设备状态数据
2)以 10 秒为时间窗口,统计当前在线设备数量(status = 1)
3)将统计结果同时输出到:
🖥 控制台(用于开发阶段调试与观察)
🗄 MySQL 数据库(用于后端接口查询和大屏展示)
数据处理流程: Kafka → Flink SQL 窗口统计 → Print 输出 + MySQL 持久化
完成下面代码:
package com.demo.flink.kpi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 【任务】实时统计在线设备数(每 10 秒一条)
*
* 【数据流】Kafka(设备上报) → Flink(窗口统计) →(控制台打印 + 写入 MySQL)
*
* 【填写说明】共 14 处需要补全,分三种类型:
* ★ 填词型:填一个值或方法名
* ★★ 填句型:填写一整行关键语句
* ★★★ 填块型:填写多行完整逻辑
*/
public class RtOnlineKpiJob {
public static void main(String[] args) throws Exception {
// =========================================================
// 1. 创建 Flink 运行环境
// =========================================================
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// ★ TODO 1:开启 Checkpoint,每 30 秒保存一次进度(填写毫秒数)
env.enableCheckpointing(______);
System.out.println(">>> 第1步完成:创建 Flink 运行环境成功 ✓");
// =========================================================
// 2. 创建 Flink SQL 环境
// =========================================================
// ★ TODO 2:设置为流式处理模式(填写方法名,不含括号)
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.______()
.build();
// ★★ TODO 3:创建 StreamTableEnvironment(填写完整一行)
// 参考:StreamTableEnvironment.create(流环境, 配置);
StreamTableEnvironment tEnv = ________________________________;
System.out.println(">>> 第2步完成:创建 Flink SQL 环境成功 ✓");
// =========================================================
// 3. 注册 Kafka 源表
// =========================================================
String createKafkaSource =
"CREATE TABLE device_status (\n" +
" event_type STRING,\n" +
" device_id STRING,\n" +
" status INT,\n" +
" temperature DOUBLE,\n" +
" `load` DOUBLE,\n" +
" voltage DOUBLE,\n" +
" event_time STRING,\n" +
// ★★ TODO 4:填写时间戳派生字段(整行)
// 要求:把字符串 event_time 转成时间戳,字段名为 ts
// 参考:字段名 AS TO_TIMESTAMP(原字段名),
" ______,\n" +
// ★★ TODO 5:填写 Watermark 声明(整行)
// 要求:基于 ts 字段,允许数据最多晚到 5 秒
// 参考:WATERMARK FOR 时间字段 AS 时间字段 - INTERVAL '秒' SECOND
" ______\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'device_status_topic',\n" +
" 'properties.bootstrap.servers' = 'master:9092',\n" +
" 'properties.group.id' = 'rt_online_kpi_g1',\n" +
// ★ TODO 6:填写消费模式(只读最新数据,不回放历史)
" 'scan.startup.mode' = '______',\n" +
" 'format' = 'json',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")";
tEnv.executeSql(createKafkaSource);
System.out.println(">>> 第3步完成:Kafka 源表注册成功 ✓");
// =========================================================
// 4. 编写统计 SQL:每 10 秒统计一次"在线设备数"
// =========================================================
String kpiSql =
"SELECT\n" +
" CAST(window_end AS DATE) AS dt,\n" +
" window_end AS ts,\n" +
// ★★ TODO 7:统计不重复在线设备数(整行)
// 要求:对 device_id 去重计数,结果别名为 online_count
// 参考:COUNT(DISTINCT 字段) AS 别名
" ______\n" +
"FROM TABLE(\n" +
// ★★ TODO 8:填写完整的 TUMBLE 窗口(整行)
// 要求:
// - 窗口类型:TUMBLE(滚动窗口)
// - 数据来源:device_status 表
// - 时间字段:ts
// - 窗口大小:10 秒
// 参考:窗口函数(TABLE 表名, DESCRIPTOR(时间字段), INTERVAL '大小' SECOND)
" ______\n" +
")\n" +
// ★ TODO 9:填写过滤条件的值(status = ? 表示在线)
"WHERE status = ______\n" +
"GROUP BY window_start, window_end";
// ★★ TODO 10:将统计结果注册为临时视图(整行)
// 要求:视图名为 v_kpi_online_10s,供后面 INSERT 使用
// 参考:tEnv.executeSql("CREATE TEMPORARY VIEW 视图名 AS \n" + sql变量);
______;
System.out.println(">>> 第4步完成:统计视图创建成功 ✓");
// =========================================================
// 5. 注册 MySQL Sink 表
// =========================================================
String createMySqlSink =
"CREATE TABLE mysql_kpi_online_10s (\n" +
" dt DATE,\n" +
" ts TIMESTAMP(3),\n" +
" online_count BIGINT\n" +
") WITH (\n" +
" 'connector'='jdbc',\n" +
" 'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
" 'table-name'='rt_kpi_online_10s',\n" +
" 'username'='root',\n" +
" 'password'='123456',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n" +
// ★ TODO 11:填写批量写入配置的值
// 要求:攒满 100 行 或 每隔 2 秒,触发一次写入(以先到为准)
" 'sink.buffer-flush.max-rows'='______',\n" +
" 'sink.buffer-flush.interval'='______'\n" +
")";
tEnv.executeSql(createMySqlSink);
System.out.println(">>> 第5步完成:MySQL Sink 表注册成功 ✓");
// =========================================================
// 6. 注册 Print Sink
// =========================================================
// 说明:print 不是存储,只是把结果输出到控制台,方便你看"有没有算出来"
String createPrintSink =
"CREATE TABLE print_kpi_online_10s (\n" +
" dt DATE,\n" +
" ts TIMESTAMP(3),\n" +
" online_count BIGINT\n" +
") WITH (\n" +
// ★ TODO 12:填写连接器名称(输出到控制台)
" 'connector'='______'\n" +
")";
tEnv.executeSql(createPrintSink);
System.out.println(">>> 第6步完成:Print Sink 表注册成功 ✓");
// =========================================================
// 7. 把同一份结果同时写两处:控制台 + MySQL
// =========================================================
StatementSet stmtSet = tEnv.createStatementSet();
// ★★ TODO 13:补全写入控制台的 INSERT 语句
// 要求:从视图 v_kpi_online_10s 查出 dt、ts、online_count,写入 print_kpi_online_10s
stmtSet.addInsertSql(
"INSERT INTO ______ SELECT dt, ts, online_count FROM ______"
);
// ★★ TODO 14:补全写入 MySQL 的 INSERT 语句
// 要求:从视图 v_kpi_online_10s 查出 dt、ts、online_count,写入 mysql_kpi_online_10s
stmtSet.addInsertSql(
"INSERT INTO ______ SELECT dt, ts, online_count FROM ______"
);
System.out.println(">>> 第7步:提交Flink任务,开始实时计算...");
// 运行作业:
// - 程序不会自动结束(Kafka 数据一直来,就一直算)
// - 想停止:在控制台按 Ctrl + C 或在 Flink Web UI 取消作业
stmtSet.execute();
}
}📌 验收结果:
-- 1.在Mysql中查询最新5条在线设备数(验证实时写入是否正常) SELECT * FROM rt.rt_kpi_online_10s ORDER BY ts DESC LIMIT 5; -- 2.统计最近2分钟数据条数(验证是否持续输出,10秒≈1条) SELECT COUNT(*) AS rows_2min FROM rt.rt_kpi_online_10s WHERE ts >= NOW() - INTERVAL 2 MINUTE;
3.2 单指标计算:RtTempTrendJob
文件路径:
src/main/java/com/demo/flink/kpi/RtTempTrendJob.java
功能说明:本作业用于实现 实时温度趋势统计指标,通过使用 HOP 窗口对温度数据进行实时统计和处理。
任务目标:
1)从 Kafka 读取设备状态数据。
2)使用 30 秒窗口,每 10 秒滑动一次(HOP 窗口),实时计算温度趋势。
3)统计温度指标:
平均温度(
avg_temp)最高温度(
max_temp)
4)将统计结果同时输出到:
🖥 控制台(用于调试和观察结果)
🗄 MySQL(用于后端接口查询和大屏展示)
数据处理流程: Kafka → HOP 窗口统计 → Print 输出 + MySQL 持久化
完成下面代码:
package com.demo.flink.kpi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 任 务:实时统计温度趋势
* 规则说明:
* - 窗口长度:30秒
* - 每10秒滑动一次(HOP窗口)
*
* 数据流:
* Kafka → Flink SQL(HOP窗口统计) → (Print + MySQL)
*
* 【填写说明】共 14 处需要补全,分三种类型:
* ★ 填词型:填一个值或方法名
* ★★ 填句型:填写一整行关键语句
* ★★★ 填块型:填写多行完整逻辑
*/
public class RtTempTrendJob {
public static void main(String[] args) throws Exception {
// 1)创建 Flink 运行环境 =====================================
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// ★ TODO 1:设置并行度为 1(填写数字)
env.setParallelism(______);
// ★ TODO 2:开启 Checkpoint,每 30 秒保存一次进度(填写毫秒数)
env.enableCheckpointing(______);
System.out.println(">>> 第1步完成:创建 Flink 运行环境成功 ✓");
// 2)创建 Flink SQL 环境 ====================================
// ★ TODO 3:设置为流式处理模式(填写方法名,不含括号)
EnvironmentSettings settings =
EnvironmentSettings.newInstance()
.______()
.build();
// ★★ TODO 4:创建 StreamTableEnvironment(填写完整一行)
// 参考:StreamTableEnvironment.create(流环境, 配置);
StreamTableEnvironment tEnv =
_______________________________________;
System.out.println(">>> 第2步完成:创建 Flink SQL 环境成功 ✓");
// 3)把 Kafka Topic 映射成 Flink SQL 的"表" ==================
String createKafkaSource =
"CREATE TABLE device_status (\n" +
" event_type STRING,\n" +
" device_id STRING,\n" +
" status INT,\n" +
" temperature DOUBLE,\n" +
" `load` DOUBLE,\n" +
" voltage DOUBLE,\n" +
" event_time STRING,\n" +
// ★★ TODO 5:填写时间戳派生字段(整行含逗号)
// 要求:把字符串 event_time 转成时间戳,字段名为 ts
// 参考:字段名 AS TO_TIMESTAMP(原字段名),
" ______,\n" +
// ★★ TODO 6:填写 Watermark 声明(整行,无逗号)
// 要求:基于 ts 字段,允许数据最多晚到 5 秒
// 参考:WATERMARK FOR 时间字段 AS 时间字段 - INTERVAL '秒' SECOND
" ______\n" +
") WITH (\n" +
" 'connector'='kafka',\n" +
" 'topic'='device_status_topic',\n" +
" 'properties.bootstrap.servers'='master:9092',\n" +
" 'properties.group.id'='rt_temp_trend_g1',\n" +
// ★ TODO 7:填写消费模式(只读最新数据,不回放历史)
" 'scan.startup.mode'='______',\n" +
" 'format'='json',\n" +
" 'json.ignore-parse-errors'='true'\n" +
")";
tEnv.executeSql(createKafkaSource);
System.out.println(">>> 第3步完成:Kafka 源表注册成功 ✓");
// 4)编写趋势统计 SQL(HOP窗口) ============================
// HOP 窗口说明:
// - 第一个时间参数:滑动步长(每隔多久触发一次)
// - 第二个时间参数:窗口长度(每次覆盖多长时间的数据)
// 本任务:每 10 秒触发,覆盖最近 30 秒数据
String trendSql =
"SELECT\n" +
" CAST(window_end AS DATE) AS dt,\n" +
" window_end AS ts,\n" +
// ★★ TODO 8:填写平均温度聚合(整行含逗号)
// 要求:计算平均温度,别名为 avg_temp
// 参考:聚合函数(字段名) AS 别名,
" ______,\n" +
// ★★ TODO 9:填写最高温度聚合(整行,无逗号)
// 要求:计算最高温度,别名为 max_temp
" ______\n" +
"FROM TABLE(\n" +
// ★★★ TODO 10:填写完整的 HOP 窗口(整块,共2行)
// 要求:
// - 窗口类型:HOP(滑动窗口)
// - 数据来源:device_status 表,时间字段:ts
// - 第一个时间参数:滑动步长 10 秒
// - 第二个时间参数:窗口长度 30 秒
// 参考:
// HOP(TABLE 表名, DESCRIPTOR(时间字段),
// INTERVAL '步长' SECOND, INTERVAL '长度' SECOND)
" ______\n" +
" ______\n" +
")\n" +
"GROUP BY window_start, window_end";
// ★★ TODO 11:将统计结果注册为临时视图(整行)
// 要求:视图名为 v_temp_trend_hop,供后面 INSERT 使用
// 参考:tEnv.executeSql("CREATE TEMPORARY VIEW 视图名 AS \n" + sql变量);
______;
System.out.println(">>> 第4步完成:统计视图创建成功 ✓");
// 5)定义 MySQL Sink 表 =====================================
// 注意:rt_temp_trend_10s 必须提前在 MySQL 建好表结构
String createMySqlSink =
"CREATE TABLE mysql_temp_trend_10s (\n" +
" dt DATE,\n" +
" ts TIMESTAMP(3),\n" +
" avg_temp DOUBLE,\n" +
" max_temp DOUBLE\n" +
") WITH (\n" +
" 'connector'='jdbc',\n" +
" 'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
" 'table-name'='rt_temp_trend_10s',\n" +
" 'username'='root',\n" +
" 'password'='123456',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n" +
// ★ TODO 12:填写批量写入配置的值
// 要求:攒满 100 行 或 每隔 2 秒,触发一次写入(以先到为准)
" 'sink.buffer-flush.max-rows'='______',\n" +
" 'sink.buffer-flush.interval'='______'\n" +
")";
tEnv.executeSql(createMySqlSink);
System.out.println(">>> 第5步完成:MySQL Sink 表注册成功 ✓");
// 6)定义 Print Sink(控制台输出) ==========================
String createPrintSink =
"CREATE TABLE print_temp_trend_10s (\n" +
" dt DATE,\n" +
" ts TIMESTAMP(3),\n" +
" avg_temp DOUBLE,\n" +
" max_temp DOUBLE\n" +
") WITH (\n" +
// ★ TODO 13:填写连接器名称(输出到控制台)
" 'connector'='______'\n" +
")";
tEnv.executeSql(createPrintSink);
System.out.println(">>> 第6步完成:Print Sink 表注册成功 ✓");
// 7)同时输出两份:Print + MySQL ==========================
StatementSet stmtSet = tEnv.createStatementSet();
// ★★ TODO 14:补全写入控制台的 INSERT 语句
// 要求:从视图 v_temp_trend_hop 查出 dt、ts、avg_temp、max_temp,写入 print_temp_trend_10s
stmtSet.addInsertSql(
"INSERT INTO ______ SELECT dt, ts, avg_temp, max_temp FROM ______"
);
// ★★ TODO 15:补全写入 MySQL 的 INSERT 语句
// 要求:从视图 v_temp_trend_hop 查出 dt、ts、avg_temp、max_temp,写入 mysql_temp_trend_10s
stmtSet.addInsertSql(
"INSERT INTO ______ SELECT dt, ts, avg_temp, max_temp FROM ______"
);
System.out.println(">>> 第7步:提交Flink任务,开始实时计算...");
// ★ TODO 16:执行 StatementSet,启动作业(填写方法名)
// 提示:stmtSet.方法名();
stmtSet.______();
}
}📌 验收结果:
-- 1.在 MySQL 中查询最新5条温度趋势数据(验证实时写入是否正常) SELECT * FROM rt.rt_temp_trend_10s ORDER BY ts DESC LIMIT 5; -- 2.统计最近2分钟数据条数(验证是否持续输出,约10秒1条) SELECT COUNT(*) AS rows_2min FROM rt.rt_temp_trend_10s WHERE ts >= NOW() - INTERVAL 2 MINUTE;
3.3 单指标计算:RtAlarmKpiJob
文件路径:
src/main/java/com/demo/flink/kpi/RtAlarmKpiJob.java
功能说明:本模块用于实现 实时告警数量统计功能(10 秒粒度),根据设备的状态数据实时计算告警数量,并将结果输出到 MySQL 数据库和控制台。
任务目标:
1)从 Kafka 设备状态流中识别“告警事件”
2)使用 10 秒滚动窗口(TUMBLE)统计告警数量
3)将统计结果写入 MySQL 指标表
数据处理流程:
Kafka(设备状态流) → Flink SQL(规则派生告警事件流) → Flink SQL(10秒 TUMBLE 窗口统计) → Print 控制台输出 + MySQL 持久化
完成下面代码:
package com.demo.flink.kpi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 任 务:实时统计告警数量(每10秒)
*
* 数据流:
* Kafka(device_status_topic)
* → Flink SQL(规则派生告警事件流 v_alarm_event)
* → Flink SQL(10s TUMBLE 统计告警数量)
* → (Print + MySQL)
*
* 【填写说明】共 17 处需要补全,分三种类型:
* ★ 填词型:填一个值或方法名
* ★★ 填句型:填写一整行关键语句
* ★★★ 填块型:填写多行完整逻辑
*/
public class RtAlarmKpiJob {
public static void main(String[] args) throws Exception {
// 1)创建 Flink 运行环境 =====================================
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// ★ TODO 1:设置并行度为 1(填写数字)
env.setParallelism(______);
// ★ TODO 2:开启 Checkpoint,每 10 秒保存一次进度(填写毫秒数)
env.enableCheckpointing(______);
System.out.println(">>> 第1步完成:创建 Flink 运行环境成功 ✓");
// 2)创建 Flink SQL 环境 ====================================
// ★ TODO 3:设置为流式处理模式(填写方法名,不含括号)
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.______()
.build();
// ★★ TODO 4:创建 StreamTableEnvironment(填写完整一行)
// 参考:StreamTableEnvironment.create(流环境, 配置);
StreamTableEnvironment tEnv = ____________________________;
System.out.println(">>> 第2步完成:创建 Flink SQL 环境成功 ✓");
// 3)把 Kafka Topic 映射成 Flink SQL 的"表" ==================
String createKafkaSource =
"CREATE TABLE device_status (\n" +
" event_type STRING,\n" +
" device_id STRING,\n" +
" status INT,\n" +
" temperature DOUBLE,\n" +
" `load` DOUBLE,\n" +
" voltage DOUBLE,\n" +
" event_time STRING,\n" +
// ★★ TODO 5:填写时间戳派生字段(整行含逗号)
// 要求:把字符串 event_time 转成时间戳,字段名为 ts
// 参考:字段名 AS TO_TIMESTAMP(原字段名),
" ______,\n" +
// ★★ TODO 6:填写 Watermark 声明(整行,无逗号)
// 要求:基于 ts 字段,允许数据最多晚到 5 秒
// 参考:WATERMARK FOR 时间字段 AS 时间字段 - INTERVAL '秒' SECOND
" ______\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'device_status_topic',\n" +
" 'properties.bootstrap.servers' = 'master:9092',\n" +
" 'properties.group.id' = 'rt_alarm_kpi_g1',\n" +
// ★ TODO 7:填写消费模式(只读最新数据,不回放历史)
" 'scan.startup.mode' = '______',\n" +
" 'format' = 'json',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")";
tEnv.executeSql(createKafkaSource);
System.out.println(">>> 第3步完成:Kafka 源表注册成功 ✓");
// 4)第一步:由状态流生成"告警事件流" =========================
// 规则说明:
// - WHERE:过滤出异常记录(只保留满足告警条件的数据)
// - CASE:把异常原因分类成告警类型 alarm_type
// 告警规则:温度>=80 / 电压<=200 / 负载>=0.90 / 设备离线(status=0)
String createAlarmEventView =
"CREATE TEMPORARY VIEW v_alarm_event AS\n" +
"SELECT\n" +
" device_id,\n" +
" CASE\n" +
// ★★★ TODO 8:填写 CASE WHEN 的四个告警分类(整块,共4行)
// 要求:
// 温度>=80 → 'HIGH_TEMP'
// 电压<=200 → 'LOW_VOLT'
// 负载>=0.90 → 'HIGH_LOAD'
// status=0 → 'DEVICE_OFFLINE'
// 参考:WHEN 条件 THEN '告警类型'
" WHEN ______ THEN '______'\n" +
" WHEN ______ THEN '______'\n" +
" WHEN ______ THEN '______'\n" +
" WHEN ______ THEN '______'\n" +
" END AS alarm_type,\n" +
" ts\n" +
"FROM device_status\n" +
"WHERE\n" +
// ★★★ TODO 9:填写 WHERE 过滤条件(整块,共4行)
// 要求:满足上述任意一个告警条件即保留(用 OR 连接)
" ______\n" +
" OR ______\n" +
" OR ______\n" +
" OR ______";
tEnv.executeSql(createAlarmEventView);
System.out.println(">>> 第4步完成:告警事件流视图创建成功 ✓");
// 5)第二步:统计 10 秒内告警数量(10s TUMBLE) ===============
String kpiSql =
"SELECT\n" +
" CAST(window_end AS DATE) AS dt,\n" +
" window_end AS ts,\n" +
// ★★ TODO 10:统计告警总条数(整行)
// 要求:对所有告警事件计数,别名为 alarm_count
// 提示:不需要去重,直接 COUNT(*) 即可
" ______\n" +
"FROM TABLE(\n" +
// ★★ TODO 11:填写完整的 TUMBLE 窗口(整行)
// 要求:对 v_alarm_event 做 10 秒滚动窗口,时间字段为 ts
// 参考:TUMBLE(TABLE 表名, DESCRIPTOR(时间字段), INTERVAL '秒' SECOND)
" ______\n" +
")\n" +
"GROUP BY window_start, window_end";
// ★★ TODO 12:将统计结果注册为临时视图(整行)
// 要求:视图名为 v_kpi_alarm_10s,供后面 INSERT 使用
// 参考:tEnv.executeSql("CREATE TEMPORARY VIEW 视图名 AS \n" + sql变量);
______;
System.out.println(">>> 第5步完成:告警统计视图创建成功 ✓");
// 6)定义 MySQL Sink 表 =====================================
// 注意:MySQL 中 rt_kpi_alarm_10s 必须提前建好表结构
String createMySqlSink =
"CREATE TABLE mysql_kpi_alarm_10s (\n" +
" dt DATE,\n" +
" ts TIMESTAMP(3),\n" +
" alarm_count BIGINT\n" +
") WITH (\n" +
" 'connector'='jdbc',\n" +
" 'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
" 'table-name'='rt_kpi_alarm_10s',\n" +
" 'username'='root',\n" +
" 'password'='123456',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n" +
// ★ TODO 13:填写批量写入配置的值
// 要求:攒满 100 行 或 每隔 2 秒,触发一次写入(以先到为准)
" 'sink.buffer-flush.max-rows'='______',\n" +
" 'sink.buffer-flush.interval'='______'\n" +
")";
tEnv.executeSql(createMySqlSink);
System.out.println(">>> 第6步完成:MySQL Sink 表注册成功 ✓");
// 7)定义 Print Sink(控制台输出) ===========================
// 用于课堂观察统计结果(不落库,只打印)
String createPrintSink =
"CREATE TABLE print_kpi_alarm_10s (\n" +
" dt DATE,\n" +
" ts TIMESTAMP(3),\n" +
" alarm_count BIGINT\n" +
") WITH (\n" +
// ★ TODO 14:填写连接器名称(输出到控制台)
" 'connector'='______'\n" +
")";
tEnv.executeSql(createPrintSink);
System.out.println(">>> 第7步完成:Print Sink 表注册成功 ✓");
// 8)同时输出两份:Print + MySQL ============================
// StatementSet:一次提交多个 INSERT 任务
StatementSet stmtSet = tEnv.createStatementSet();
// ★★ TODO 15:补全写入控制台的 INSERT 语句
// 要求:从视图 v_kpi_alarm_10s 查出 dt、ts、alarm_count,写入 print_kpi_alarm_10s
stmtSet.addInsertSql(
"INSERT INTO ______ SELECT dt, ts, alarm_count FROM ______"
);
// ★★ TODO 16:补全写入 MySQL 的 INSERT 语句
// 要求:从视图 v_kpi_alarm_10s 查出 dt、ts、alarm_count,写入 mysql_kpi_alarm_10s
stmtSet.addInsertSql(
"INSERT INTO ______ SELECT dt, ts, alarm_count FROM ______"
);
System.out.println(">>> 第8步:提交Flink任务,开始实时计算...");
// ★ TODO 17:执行 StatementSet,启动作业(填写方法名)
// 提示:stmtSet.方法名();
stmtSet.______();
}
}📌 验收结果:
-- 1.在 MySQL 中查询最新5条告警数量数据(验证实时写入是否正常) SELECT * FROM rt.rt_kpi_alarm_10s ORDER BY ts DESC LIMIT 5; -- 2.统计最近2分钟数据条数(验证是否持续输出,约10秒1条) SELECT COUNT(*) AS rows_2min FROM rt.rt_kpi_alarm_10s WHERE ts >= NOW() - INTERVAL 2 MINUTE;
3.4 单指标计算:RtAlarmTop5Job
路径:
src/main/java/com/demo/flink/kpi/RtAlarmTop5Job.java功能说明:本模块用于实现 实时统计告警 Top5 排行功能,根据每台设备的告警次数,实时计算每个时间窗口内告警次数最多的前 5 个设备,并将结果输出到控制台和 MySQL 数据库。
任务目标:
1)从 Kafka 读取告警事件数据
2)使用 1 分钟窗口统计每台设备的告警次数
3)计算每个窗口内告警次数最多的前 5 个设备
4)将结果输出到:
🖥 控制台(用于调试和观察每个窗口的 Top 5 排行)
🗄 MySQL 数据库(用于后端查询和大屏展示)
数据处理流程: Kafka → Flink SQL(统计每台设备的告警次数) → Flink SQL(Top5 排行) → Print 控制台输出 + MySQL 持久化
完成以下代码:
package com.demo.flink.kpi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 任 务:实时统计「告警 Top5 设备」(每 1 分钟更新一组)
*
* 数据流:
* Kafka(device_status_topic:设备状态流)
* → Flink SQL(规则派生告警事件流 v_alarm_event)
* → Flink SQL(1 分钟窗口统计每台设备告警次数 alarm_cnt)
* → Flink SQL(窗口内按次数排序取 Top5:v_alarm_top5_1m)
* → (Print + MySQL)
*
* 【填写说明】共 8 处需要补全:
* ★★ 填句型:填写一整行关键语句
* ★★★ 填块型:填写多行完整逻辑
*/
public class RtAlarmTop5Job {
public static void main(String[] args) throws Exception {
// 1)创建 Flink 运行环境 =====================================
// ★★★ TODO 1:获取流执行环境、设置并行度为1、开启30秒Checkpoint(共3行)
StreamExecutionEnvironment env = ______;
env.______;
env.______;
System.out.println(">>> 第1步完成:创建 Flink 运行环境成功 ✓");
// 2)创建 Flink SQL 环境 ====================================
// ★★ TODO 2:补全 SQL 环境创建的关键两行(流式模式 + 创建 tEnv)
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.______() // 设置为流式模式
.build();
StreamTableEnvironment tEnv = ______;
System.out.println(">>> 第2步完成:创建 Flink SQL 环境成功 ✓");
// 3)注册 Kafka 源表 ========================================
// 已给出建表框架,补全括号内的关键 WITH 配置项
String createKafkaSource =
"CREATE TABLE device_status (\n" +
" event_type STRING,\n" +
" device_id STRING,\n" +
" status INT,\n" +
" temperature DOUBLE,\n" +
" `load` DOUBLE,\n" +
" voltage DOUBLE,\n" +
" event_time STRING,\n" +
" ts AS TO_TIMESTAMP(event_time),\n" +
// ★★ TODO 3:补全 Watermark 定义(允许 5 秒乱序)
" ______\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'device_status_topic',\n" +
" 'properties.bootstrap.servers' = 'master:9092',\n" +
" 'properties.group.id' = 'rt_alarm_top5_g1',\n" +
// ★★ TODO 3(续):补全消费模式(只读最新数据)和 json 忽略解析错误 两项配置
" 'scan.startup.mode' = '______',\n" +
" 'format' = 'json',\n" +
" '______' = 'true'\n" +
")";
tEnv.executeSql(createKafkaSource);
System.out.println(">>> 第3步完成:Kafka 源表注册成功 ✓");
// 4)创建告警事件流 =========================================
// ★★★ TODO 4:补全 CASE WHEN 告警分类 及 WHERE 过滤条件(整块)
// 告警规则:temperature>=80 / voltage<=200 / load>=0.90 / status=0
String alarmEventViewSql =
"CREATE TEMPORARY VIEW v_alarm_event AS\n" +
"SELECT\n" +
" device_id,\n" +
" CASE\n" +
" WHEN ______ THEN 'HIGH_TEMP'\n" +
" WHEN ______ THEN 'LOW_VOLT'\n" +
" WHEN ______ THEN 'HIGH_LOAD'\n" +
" WHEN ______ THEN 'DEVICE_OFFLINE'\n" +
" END AS alarm_type,\n" +
" ts\n" +
"FROM device_status\n" +
"WHERE\n" +
" ______\n" +
" OR ______\n" +
" OR ______\n" +
" OR ______";
tEnv.executeSql(alarmEventViewSql);
System.out.println(">>> 第4步完成:告警事件流视图创建成功 ✓");
// 5)统计每台设备告警次数 ===================================
// ★★★ TODO 5:补全滚动窗口聚合 SQL(整块)
// 要求:TUMBLE 窗口大小 1 分钟,GROUP BY window_start、window_end、device_id
String cntSql =
"CREATE TEMPORARY VIEW alarm_cnt AS\n" +
"SELECT\n" +
" window_end,\n" +
" device_id,\n" +
" COUNT(*) AS cnt\n" +
"FROM TABLE(\n" +
// 填写 TUMBLE 窗口表函数调用(数据来源、时间字段、窗口大小)
" ______\n" +
")\n" +
"GROUP BY ______, ______, ______";
tEnv.executeSql(cntSql);
System.out.println(">>> 第5步完成:设备告警次数统计视图创建成功 ✓");
// 6)计算 Top5 ==============================================
// ★★★ TODO 6:补全 ROW_NUMBER() 排名逻辑及外层过滤(整块)
// 要求:PARTITION BY window_end,ORDER BY cnt DESC,只保留 rn<=5
String top5ViewSql =
"CREATE TEMPORARY VIEW v_alarm_top5_1m AS\n" +
"SELECT\n" +
" CAST(window_end AS DATE) AS dt,\n" +
" window_end, device_id, cnt, rn\n" +
"FROM (\n" +
" SELECT\n" +
" window_end, device_id, cnt,\n" +
" ROW_NUMBER() OVER (PARTITION BY ______ ORDER BY ______) AS rn\n" +
" FROM alarm_cnt\n" +
")\n" +
"WHERE ______";
tEnv.executeSql(top5ViewSql);
System.out.println(">>> 第6步完成:Top5 视图创建成功 ✓");
// 7)注册 MySQL Sink ========================================
// 已给出完整建表语句,仅需补全批量写入的两项关键配置
String createMySqlSink =
"CREATE TABLE mysql_alarm_top5_1m (\n" +
" dt DATE,\n" +
" window_end TIMESTAMP(3),\n" +
" device_id STRING,\n" +
" cnt BIGINT,\n" +
" rn BIGINT,\n" +
" PRIMARY KEY (dt, window_end, rn) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector'='jdbc',\n" +
" 'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
" 'table-name'='rt_alarm_top5_1m',\n" +
" 'username'='root',\n" +
" 'password'='123456',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n" +
// ★★ TODO 7:补全批量写入两项配置(攒满100行 或 每隔2秒写一次)
" '______'='100',\n" +
" '______'='2s'\n" +
")";
tEnv.executeSql(createMySqlSink);
System.out.println(">>> 第7步完成:MySQL Sink 表注册成功 ✓");
// 8)注册 Print Sink ========================================
// 已给出字段,补全建表语句头和 WITH 配置
String createPrintSink =
// ★★ TODO 8:补全 CREATE TABLE 语句及 connector 配置(print)
"______ print_alarm_top5_1m (\n" +
" dt DATE,\n" +
" window_end TIMESTAMP(3),\n" +
" device_id STRING,\n" +
" cnt BIGINT,\n" +
" rn BIGINT\n" +
") WITH (\n" +
" 'connector'='______'\n" +
")";
tEnv.executeSql(createPrintSink);
System.out.println(">>> 第8步完成:Print Sink 表注册成功 ✓");
// 9)输出到 Print + MySQL ===================================
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(
"INSERT INTO print_alarm_top5_1m " +
"SELECT dt, window_end, device_id, cnt, rn FROM v_alarm_top5_1m"
);
stmtSet.addInsertSql(
"INSERT INTO mysql_alarm_top5_1m " +
"SELECT dt, window_end, device_id, cnt, rn FROM v_alarm_top5_1m"
);
System.out.println(">>> 第9步:提交Flink任务,开始实时计算...");
stmtSet.execute();
}
}📌 验收结果:
-- 1.验证“告警 Top5 排行”是否正常写入 -- 作用: -- 查询最新一个窗口的 Top 排行数据 -- - 按 window_end 倒序(最新窗口在前) -- - 按 rn 升序(第1名在前) -- 验收点: -- ✔ 能查到数据 -- ✔ rn 从 1 开始递增 -- ✔ window_end 为最近时间 SELECT * FROM rt.rt_alarm_top5_1m ORDER BY window_end DESC, rn ASC LIMIT 5; -- 2.查看最近 2 分钟是否持续写入(10秒一条) SELECT COUNT(*) AS rows_2min FROM rt.rt_kpi_alarm_10s WHERE ts >= NOW() - INTERVAL 2 MINUTE;
