1.1 启动Kafka
# 新建一个master窗口,在master/slave1/slave2启动 ZooKeeper
zkServer.sh start
# 在master/slave1/slave2检查 ZooKeeper状态
zkServer.sh status
# 在master/slave1/slave2启动 Kafka
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties
# 检查Topic主题是否存在
kafka-topics.sh --bootstrap-server master:9092 --list
1.2 写入状态流数据到 Kafka
# 在master节点中操作【新开窗口】
cd /opt/datas
# 启动设备状态模拟脚本,并将数据写入 Kafka
python 01_device_status_sim.py | \
kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic
2.MySQL 先建库建表
启动 Mysql :
# 进入 mysql 客户端
mysql -uroot -p123456
查看数据库与表:
-- 创建数据库 rt(如果不存在则创建),并设置默认字符集为 utf8mb4(支持中文)【用于存放实时指标结果表】
show databases;
-- 选择数据库
USE rt;
-- 查看数据表
show tables;
8.3 代码设计
本方案基于前面“单指标 Job 独立运行”的实现方式进行升级, 通过统一入口程序 + 指标模块注册机制,实现多指标统一计算与统一提交, 更贴近企业真实生产环境的实时计算架构。
一、模块命名规范
1)推荐命名
主程序入口:
RtMetricsSqlJob:统一入口,一键启动所有指标指标模块:
Module(不可直接运行,只负责 SQL 注册)OnlineKpi10sModuleTempTrendHopModuleAlarmKpi10sModuleAlarmTop5_1mModule
📌 解释:
Job = 能跑的主程序入口 Module = 插件模块,只负责把 SQL 注册进去
二、项目结构
src/main/java/com/demo/flink/
├─ RtMetricsSqlJob.java # ✅ 主程序入口
└─ module/
├─ OnlineKpi10sModule.java # ✅ 在线KPI模块
├─ TempTrendHopModule.java # ✅ 温度趋势模块
├─ AlarmKpi10sModule.java # ✅ 告警KPI模块
└─ AlarmTop5_1mModule.java # ✅ Top5模块
说明:放到
module/子包里,项目结构更清爽。
三、主程序职责
主程序 RtMetricsSqlJob 作为统一实时指标作业入口, 在同一个 Flink 作业中完成全部四个指标计算与写入。
主要完成以下 5 件事:
1)创建 Kafka Source 表(device_status / v_alarm_event,仅创建一次) 2)创建 MySQL Sink 表(4 张指标表,仅创建一次) 3)创建 StatementSet,用于统一提交多个 INSERT 4)调用各指标模块的 register() 方法 5)统一 execute(),实现多指标并行计算
8.3.1 在线设备数模块(OnlineKpi10sModule)
路径:
src/main/java/com/demo/flink/module/OnlineKpi10sModule.java
package com.demo.flink.module;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 模块:在线设备数 KPI(10秒)
* 作用:
* ① 计算每10秒在线设备数量
* ② 将结果写入 MySQL 表
*
* 注意:
* - 这里只负责“计算逻辑 + 写入逻辑”
* - Kafka源表、MySQL表由主程序统一创建
*/
public class OnlineKpi10sModule {
public static void register(StreamTableEnvironment tEnv, StatementSet stmtSet) {
// =====================================================
// 1.创建临时视图:计算在线设备数(10秒窗口)
// =====================================================
// 说明:
// - 使用 TUMBLE 滚动窗口(每10秒统计一次)
// - 只统计 status = 1(在线设备)
// - COUNT(DISTINCT device_id):去重统计设备数
String viewSql =
// TODO-1:补全临时视图名
"CREATE TEMPORARY VIEW ________________ AS \n" +
"SELECT\n" +
" CAST(window_end AS DATE) AS dt, -- 日期(用于分区)\n" +
" window_end AS ts, -- 窗口结束时间\n" +
// TODO-2:补全去重计数字段
" COUNT(DISTINCT ________________) AS online_count -- 在线设备数\n" +
"FROM TABLE(\n" +
// TODO-3:补全窗口秒数
" TUMBLE(TABLE device_status, DESCRIPTOR(ts), INTERVAL '________________' SECOND)\n" +
")\n" +
// TODO-4:补全在线状态值
"WHERE status = ________________\n" +
"GROUP BY window_start, window_end";
// 执行SQL,生成临时视图(只在当前会话有效)
tEnv.executeSql(viewSql);
// =====================================================
// 2.写入 MySQL 表
// =====================================================
// 说明:
// - 将计算结果插入 mysql_kpi_online_10s 表
// - StatementSet 可以一次提交多个任务(推荐做法)
stmtSet.addInsertSql(
// TODO-5:补全目标MySQL表名
"INSERT INTO ________________ " +
"SELECT dt, ts, online_count FROM v_kpi_online_10s"
);
}
}8.3.2 温度趋势模块(TempTrendHopModule)
路径:
src/main/java/com/demo/flink/module/TempTrendHopModule.java
package com.demo.flink.module;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 模块:温度趋势(30秒窗口,每10秒滑动)
* 作用:
* ① 计算温度平均值和最大值
* ② 形成连续的温度变化趋势
*
* 注意:
* - 使用滑动窗口(HOP)
* - Source / Sink 由主程序统一创建
*/
public class TempTrendHopModule {
public static void register(StreamTableEnvironment tEnv, StatementSet stmtSet) {
// =====================================================
// 1.创建临时视图:温度趋势计算
// =====================================================
// 说明:
// - 窗口大小:30秒
// - 滑动步长:10秒(每10秒输出一次结果)
// - AVG:平均温度
// - MAX:最高温度
String viewSql =
// TODO-1:补全临时视图名
"CREATE TEMPORARY VIEW ________________ AS \n" +
"SELECT\n" +
" CAST(window_end AS DATE) AS dt, -- 日期\n" +
" window_end AS ts, -- 窗口结束时间\n" +
// TODO-2:补全平均温度计算表达式
" ________________ AS avg_temp, -- 平均温度\n" +
// TODO-3:补全最高温度计算表达式
" ________________ AS max_temp -- 最高温度\n" +
"FROM TABLE(\n" +
" HOP(TABLE device_status, DESCRIPTOR(ts),\n" +
// TODO-4:补全滑动步长秒数与窗口大小秒数
" INTERVAL '________________' SECOND, INTERVAL '________________' SECOND)\n" +
")\n" +
"GROUP BY window_start, window_end";
// 执行SQL,生成临时视图
tEnv.executeSql(viewSql);
// =====================================================
// 2.写入 MySQL 表
// =====================================================
// 说明:
// - 每10秒写入一次温度趋势数据
// - 用于前端折线图展示
stmtSet.addInsertSql(
// TODO-5:补全目标MySQL表名
"INSERT INTO ________________ " +
// 如果你在 TODO-1 修改了视图名,这里也要同步修改
"SELECT dt, ts, avg_temp, max_temp FROM v_temp_trend_hop"
);
}
}8.3.3 告警数模块(AlarmKpi10sModule)
路径:
src/main/java/com/demo/flink/module/AlarmKpi10sModule.java
package com.demo.flink.module;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 模块:告警数量 KPI(10秒)
* 作用:
* ① 统计每10秒内的告警数量
* ② 反映系统异常情况
*
* 注意:
* - 数据来源是 alarm_event(已过滤出的异常数据)
* - Source / Sink 由主程序统一创建
*/
public class AlarmKpi10sModule {
public static void register(StreamTableEnvironment tEnv, StatementSet stmtSet) {
// =====================================================
// 1.创建临时视图:告警数量统计
// =====================================================
// 说明:
// - 使用 TUMBLE 滚动窗口(每10秒统计一次)
// - COUNT(*):统计告警总数
String viewSql =
// TODO-1:补全临时视图名
"CREATE TEMPORARY VIEW ________________ AS \n" +
"SELECT\n" +
" CAST(window_end AS DATE) AS dt, -- 日期\n" +
" window_end AS ts, -- 窗口结束时间\n" +
// TODO-2:补全告警数量聚合表达式
" ________________ AS alarm_count -- 告警数量\n" +
"FROM TABLE(\n" +
// TODO-3:补全窗口来源表名(告警事件视图)
" TUMBLE(TABLE ________________, DESCRIPTOR(ts), INTERVAL '10' SECOND)\n" +
")\n" +
"GROUP BY window_start, window_end";
// 执行SQL,生成临时视图
tEnv.executeSql(viewSql);
// =====================================================
// 2.写入 MySQL 表
// =====================================================
// 说明:
// - 每10秒输出一次告警统计结果
// - 可用于大屏告警指标展示
stmtSet.addInsertSql(
// TODO-4:补全目标MySQL表名
"INSERT INTO ________________ " +
// TODO-5:补全查询来源视图名(应与 TODO-1 一致)
"SELECT dt, ts, alarm_count FROM ________________"
);
}
}8.3.4 告警Top5模块(AlarmTop5_1mModule)
路径:
src/main/java/com/demo/flink/module/AlarmTop5_1mModule.java
package com.demo.flink.module;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 模块:告警 Top5(1分钟窗口)
* 作用:
* ① 统计每台设备1分钟内的告警次数
* ② 找出每个时间窗口内告警次数最多的前5台设备
*
* 注意:
* - 本模块分两步完成:先统计,再排名
* - Source / Sink 由主程序统一创建
*/
public class AlarmTop5_1mModule {
public static void register(StreamTableEnvironment tEnv, StatementSet stmtSet) {
// =====================================================
// 1.创建临时视图:统计每台设备每1分钟内的告警次数
// =====================================================
// 说明:
// - 使用 TUMBLE 滚动窗口,窗口大小为1分钟
// - 按 device_id 分组
// - COUNT(*) 表示该设备在这个窗口内触发了多少次告警
String cntView =
// TODO-1:补全“每分钟告警次数统计”视图名
"CREATE TEMPORARY VIEW ________________ AS \n" +
"SELECT\n" +
" window_end, -- 窗口结束时间\n" +
" device_id, -- 设备编号\n" +
// TODO-2:补全告警次数聚合表达式
" ________________ AS cnt -- 告警次数\n" +
"FROM TABLE(\n" +
// TODO-3:补全告警来源表(告警事件视图)
" TUMBLE(TABLE ________________, DESCRIPTOR(ts), INTERVAL '1' MINUTE)\n" +
")\n" +
"GROUP BY window_start, window_end, device_id";
// 执行SQL,生成“每台设备每分钟告警次数”视图
tEnv.executeSql(cntView);
// =====================================================
// 2.创建临时视图:对每个窗口内的设备进行排名,取前5名
// =====================================================
// 说明:
// - ROW_NUMBER():给每个窗口内的设备按告警次数排序编号
// - PARTITION BY window_end:每个窗口单独排名
// - ORDER BY cnt DESC:按告警次数从大到小排序
// - WHERE rn <= 5:只保留前5名
String top5View =
// TODO-4:补全 Top5 结果视图名
"CREATE TEMPORARY VIEW ________________ AS \n" +
"SELECT\n" +
" CAST(window_end AS DATE) AS dt, -- 日期\n" +
" window_end, -- 窗口结束时间\n" +
" device_id, -- 设备编号\n" +
" cnt, -- 告警次数\n" +
" rn -- 排名\n" +
"FROM (\n" +
" SELECT\n" +
" window_end,\n" +
" device_id,\n" +
" cnt,\n" +
" ROW_NUMBER() OVER (PARTITION BY window_end ORDER BY cnt DESC) AS rn\n" +
// 这里默认读取 v_alarm_cnt_1m;如果你在 TODO-1 改了名称,这里要同步修改
" FROM v_alarm_cnt_1m\n" +
")\n" +
"WHERE rn <= 5";
// 执行SQL,生成 Top5 结果视图
tEnv.executeSql(top5View);
// =====================================================
// 3.写入 MySQL 表
// =====================================================
// 说明:
// - 将每个窗口内告警次数前5的设备写入 MySQL
// - 可用于大屏 Top5 排行榜展示
stmtSet.addInsertSql(
// TODO-5:补全写入目标表名
"INSERT INTO ________________ " +
"SELECT dt, window_end, device_id, cnt, rn FROM v_alarm_top5_1m"
);
}
}8.3.5 主程序 RtMetricsSqlJob 实现(统一入口,一键启动)
路径:
src/main/java/com/demo/flink/RtMetricsSqlJob.java
package com.demo.flink;
import com.demo.flink.module.AlarmKpi10sModule;
import com.demo.flink.module.AlarmTop5_1mModule;
import com.demo.flink.module.OnlineKpi10sModule;
import com.demo.flink.module.TempTrendHopModule;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* Job:实时指标统一入口(模块化组合)
*
* 设计目标:只启动 1 个 Flink 作业,同时计算 4 个实时指标,并持续写入 MySQL
*
* 核心思路:
* 1)主程序统一创建 Source(Kafka 表)与 Sink(MySQL 表)
* 2)各指标模块只负责注册:View + Insert SQL(不再重复建 Source/Sink)
* 3)最后由 StatementSet.execute() 一次性提交所有 Insert(只 execute 一次)
*/
public class RtMetricsSqlJob {
public static void main(String[] args) throws Exception {
// 设置日志级别为 "ERROR",减少不必要的日志输出
// TODO-1:补全日志级别字符串
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "________________");
// =========================================================
// 1.Flink 执行环境(相当于“运行引擎”)
// - 并行度设为 1:教学演示更容易观察(日志更少、数据更直观)
// - 开启 checkpoint:流作业容错能力(断点恢复、保证一致性)
// =========================================================
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO-2:补全并行度
env.setParallelism(________________);
// TODO-3:补全 checkpoint 间隔(毫秒)
env.enableCheckpointing(________________); // 30s 一次(别太频繁,避免压力过大)
System.out.println("====== 1. Flink 执行环境初始化完成 ======");
// =========================================================
// 2.Table 环境(Flink SQL 的入口)
// 后续所有 executeSql(...) 都是向这个 SQL 环境注册表 / 视图 / 写入任务
// =========================================================
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
System.out.println("====== 2. Flink Table 环境(Flink SQL 的入口)初始化完成 ======");
// =========================================================
// 3.统一创建 Kafka Source(只建一次,所有模块共享)
// 表名:device_status
// - topic:device_status_topic
// - ts:从 event_time 解析得到事件时间
// - watermark:允许最大 5 秒乱序(教学常用配置)
// =========================================================
String createDeviceStatus =
"CREATE TABLE device_status (\n" +
" event_type STRING,\n" +
" device_id STRING,\n" +
" status INT,\n" +
" temperature DOUBLE,\n" +
" `load` DOUBLE,\n" +
" voltage DOUBLE,\n" +
" event_time STRING,\n" +
// TODO-4:补全事件时间转换表达式
" ts AS ________________,\n" +
// TODO-5:补全 watermark 乱序秒数
" WATERMARK FOR ts AS ts - INTERVAL '________________' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
// TODO-6:补全 Kafka topic 名
" 'topic' = '________________',\n" +
" 'properties.bootstrap.servers' = 'master:9092',\n" +
" 'properties.group.id' = 'rt_metrics_all_g1',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")";
tEnv.executeSql(createDeviceStatus);
System.out.println("====== 3. Kafka Source 初始化完成 ======");
// =========================================================
// 4.统一生成“告警事件流”(从状态流推导)
// 目的:不依赖 alarm_event_topic,只用 device_status 就能产生告警事件
// 输出字段:
// - device_id:设备
// - alarm_type:告警类型(根据规则判定)
// - ts:事件时间(沿用 device_status 的 ts)
// =========================================================
String createAlarmEventView =
// TODO-7:补全临时视图名
"CREATE TEMPORARY VIEW ________________ AS \n" +
"SELECT\n" +
" device_id,\n" +
" CASE\n" +
" WHEN temperature >= 80 THEN 'HIGH_TEMP'\n" +
" WHEN voltage <= 200 THEN 'LOW_VOLT'\n" +
" WHEN `load` >= 0.90 THEN 'HIGH_LOAD'\n" +
" WHEN status = 0 THEN 'DEVICE_OFFLINE'\n" +
" END AS alarm_type,\n" +
" ts\n" +
"FROM device_status\n" +
"WHERE temperature >= 80 OR voltage <= 200 OR `load` >= 0.90 OR status = 0";
tEnv.executeSql(createAlarmEventView);
System.out.println("====== 4.“告警事件流”视图生成完成 ======");
// =========================================================
// 5.统一创建 MySQL Sink(只建一次,所有模块共享)
// 说明:
// - Flink JDBC Sink 若产生更新流(update/delete),必须声明 PRIMARY KEY
// =========================================================
// 5.1 在线设备数指标(10秒滚动窗口结果写入 MySQL)
String mysqlOnline =
"CREATE TABLE mysql_kpi_online_10s (\n" +
" dt DATE,\n" +
" ts TIMESTAMP(3),\n" +
" online_count BIGINT\n" +
") WITH (\n" +
" 'connector'='jdbc',\n" +
// TODO-8:补全 JDBC URL
" 'url'='________________',\n" +
" 'table-name'='rt_kpi_online_10s',\n" +
" 'username'='root',\n" +
" 'password'='123456',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n" +
" 'sink.buffer-flush.max-rows'='100',\n" +
" 'sink.buffer-flush.interval'='2s'\n" +
")";
tEnv.executeSql(mysqlOnline);
// 5.2 告警数量指标表(10秒滚动窗口结果写入 MySQL)
String mysqlAlarm =
"CREATE TABLE mysql_kpi_alarm_10s (\n" +
" dt DATE,\n" +
" ts TIMESTAMP(3),\n" +
" alarm_count BIGINT\n" +
") WITH (\n" +
" 'connector'='jdbc',\n" +
" 'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
" 'table-name'='rt_kpi_alarm_10s',\n" +
" 'username'='root',\n" +
" 'password'='123456',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n" +
" 'sink.buffer-flush.max-rows'='100',\n" +
" 'sink.buffer-flush.interval'='2s'\n" +
")";
tEnv.executeSql(mysqlAlarm);
// 5.3 温度趋势指标表(30秒窗口、每10秒输出一次结果,写入 MySQL)
String mysqlTrend =
"CREATE TABLE mysql_temp_trend_10s (\n" +
" dt DATE,\n" +
" ts TIMESTAMP(3),\n" +
" avg_temp DOUBLE,\n" +
" max_temp DOUBLE\n" +
") WITH (\n" +
" 'connector'='jdbc',\n" +
" 'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
" 'table-name'='rt_temp_trend_10s',\n" +
" 'username'='root',\n" +
" 'password'='123456',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n" +
" 'sink.buffer-flush.max-rows'='100',\n" +
" 'sink.buffer-flush.interval'='2s'\n" +
")";
tEnv.executeSql(mysqlTrend);
// 5.4 告警 Top5 排行表(1分钟窗口结果写入 MySQL):Rank 会产生更新流 → 必须声明主键
String mysqlTop5 =
"CREATE TABLE mysql_alarm_top5_1m (\n" +
" dt DATE,\n" +
" window_end TIMESTAMP(3),\n" +
" device_id STRING,\n" +
" cnt BIGINT,\n" +
" rn BIGINT,\n" +
// TODO-9:补全主键字段列表
" PRIMARY KEY (________________) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector'='jdbc',\n" +
" 'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
" 'table-name'='rt_alarm_top5_1m',\n" +
" 'username'='root',\n" +
" 'password'='123456',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n" +
" 'sink.buffer-flush.max-rows'='100',\n" +
" 'sink.buffer-flush.interval'='2s'\n" +
")";
tEnv.executeSql(mysqlTop5);
System.out.println("====== 5. MySQL Sink(四个表) 创建完成 ======");
// =========================================================
// 6. 定义StatementSet,作用:收集多个 INSERT 语句(最后统一提交)
// =========================================================
StatementSet stmtSet = tEnv.createStatementSet();
System.out.println("====== 6. 定义StatementSet完成 ======");
// =========================================================
// 7.注册模块:模块只负责(View + Insert)
// - 每个模块负责:创建视图(View) + 注册写入任务(Insert)
// - 执行后:
// tEnv 中新增一个临时视图(逻辑表)
// stmtSet 中新增一条写入 MySQL 的任务
// =========================================================
OnlineKpi10sModule.register(tEnv, stmtSet);
TempTrendHopModule.register(tEnv, stmtSet);
AlarmKpi10sModule.register(tEnv, stmtSet);
// TODO-10:补全告警 Top5 模块注册语句
________________;
System.out.println("====== 7.1 在线设备数统计模块:已加入执行任务(等待执行)");
System.out.println("====== 7.2 温度趋势统计模块:已加入执行任务(等待执行)");
System.out.println("====== 7.3 告警数量统计模块:已加入执行任务(等待执行)");
System.out.println("====== 7.4 告警Top5统计模块:已加入执行任务(等待执行)");
// =========================================================
// 8.统一执行:真正提交作业(IDEA 运行时,这一行开始“常驻运行”)
// =========================================================
System.out.println("====== 8. 实时数据分析作业开始持续执行.........");
stmtSet.execute();
}
}总结:
以前:每个 Job 自己建环境、自己 execute → 资源占用大、难统一管理
现在:主程序统一建环境 + 模块只负责注册 → 更省资源、更好维护、更适合分工演示
8.3.6 IDEA运行,验收结果
(1) 选择数据库
USE rt;
(2) 推荐教学查询方式(按时间倒序)
这样更适合实时项目观察:
SELECT * FROM rt.rt_kpi_online_10s ORDER BY ts DESC LIMIT 5; SELECT * FROM rt.rt_kpi_alarm_10s ORDER BY ts DESC LIMIT 5; SELECT * FROM rt.rt_temp_trend_10s ORDER BY ts DESC LIMIT 8; SELECT * FROM rt.rt_alarm_top5_1m ORDER BY window_end DESC, rn ASC LIMIT 5;
(4) 检查各表的数据量
SELECT COUNT(*) FROM rt.rt_kpi_online_10s; SELECT COUNT(*) FROM rt.rt_kpi_alarm_10s; SELECT COUNT(*) FROM rt.rt_temp_trend_10s; SELECT COUNT(*) FROM rt.rt_alarm_top5_1m;
如果 count > 0:
✅ Flink 写入成功 ✅ Kafka 消费正常 ✅ Sink 正常
8.4 打包与部署(服务器长期运行)
8.4.3 本地/服务器打包
IDEA → Maven → flink-rt-metrics → Lifecycle → clean → package
# 运行打包程序后,生成target目录,内含 flink-rt-metrics-1.0.0.jar 包
8.4.4 上传 JAR 到服务器
复制 flink-rt-metrics-1.0.0.jar 到 master节点的 /opt/jars/
8.5 提交 Flink 作业
8.5.1 运行方式一:集群模式
1)使用 集群 模式提交Flink实时作业
# 启动 Flink 集群,启动 JobManager(作业管理器)和 TaskManager(任务管理器): start-cluster.sh # 验证集群启动: # 访问 Flink 的 Web UI 来验证集群是否正常运行,地址是 http://<jobmanager_host>:8081。 http://master:8081 # 方法1:运行 Flink 作业 flink run -c com.demo.flink.RtMetricsSqlJob /opt/jars/flink-rt-metrics-1.0.0.jar # 方法2:Flink 作业只使用最小的资源,适用于小规模测试和开发环境。 flink run -c com.demo.flink.RtMetricsSqlJob \ /opt/jars/flink-rt-metrics-1.0.0.jar \ --jobmanager.memory.process.size 512m \ --taskmanager.memory.process.size 512m \ --taskmanager.numberOfTaskSlots 1 \ --parallelism 1
Flink 任务被成功提交:

2)检查Flink 作业状态
2.1 检查作业状态
flink list

从 flink list 输出来看,作业 05ec8f6eefe9cfafb020c2e739a4d709 已经处于 运行中(RUNNING)
2.2 使用 Flink Web UI,查看作业的状态和监控 Flink 集群的健康状况
# 浏览器访问 192.168.36.100:8081

3)停止任务
流处理任务(Kafka → Flink → MySQL)
特点:
一直运行
不会自动结束
必须手动停止
第一步:查任务
flink list
会看到类似:
------------------ Running/Restarting Jobs -------------------
19.03.2026 15:17:18 : be708ed0831289e210b4f5072d37a197
👉 记住这个 JobId,如上:be708ed0831289e210b4f5072d37a197
第二步:停止任务
flink cancel JobId # 举例:flink cancel be708ed0831289e210b4f5072d37a197
8.5.2 运行模式二:flink on yarn
注意:本步骤使用 YARN per-job 模式提交实时作业
1)⚠ 提交前注意事项
1️⃣ 检查 8081 端口是否被占用
ss -tunlp | grep 8081
说明:
如果当前机器已有 Flink 集群占用 8081(如 Standalone Session 或其他 YARN 作业)
可能导致新提交的作业启动失败(端口冲突)
2️⃣ 如有占用,先关闭已有 Flink 集群
如果之前启动过 Standalone 模式:
$FLINK_HOME/bin/stop-cluster.sh
或确认是否有正在运行的 YARN 作业:
yarn application -list如需停止某个应用:
yarn application -kill <application_id>
2)使用 YARN per-job 模式提交Flink实时作业
# 使用 YARN Per-Job 模式提交 Flink 作业 # 提交 Flink 实时指标统一计算作业到 YARN 上运行 # 运行流程:本地客户端 → 向 YARN 申请资源 → 启动 Flink per-job 集群 → 运行 RtMetricsSqlJob # -t yarn-per-job :指定运行模式为 YARN 独立作业模式(每次提交都会启动一个独立的 Flink 集群) # -c :指定程序的主类(入口类) # com.demo.flink.RtMetricsSqlJob :要运行的实时多指标统一计算作业 # /opt/jars/flink-rt-metrics-1.0.0.jar :已打包好的可执行 Jar 文件路径 flink run -t yarn-per-job \ -c com.demo.flink.RtMetricsSqlJob \ /opt/jars/flink-rt-metrics-1.0.0.jar
作业成功运行(YARN 模式)
运行成功后,终端会输出如下关键信息:

Found Web Interface slave1:8081 of application 'application_1771941120543_0001'
信息解读
Flink 已经在 YARN 上成功启动了一个 per-job 集群
这个集群的 JobManager(ApplicationMaster)运行在 slave1
当前集群的 Web UI 地址为:
slave1:8081本次 YARN 应用 ID 为:
application_1771941120543_0001
含义总结
YARN 已分配资源,并成功启动 Flink 集群, 现在可以通过 8081 访问这个集群的监控页面
3)查看作业运行状态
1️⃣ 打开 Flink Web UI
浏览器访问:
http://slave1:8081
重点查看:
Jobs 页面:作业是否为 RUNNING
Checkpoints 页面:是否正常执行(如已开启)
2️⃣ 验证 MySQL 是否持续写入
在数据库中执行:
SELECT * FROM rt.rt_kpi_online_10s ORDER BY ts DESC LIMIT 5;
确认:
有最新时间数据
数据持续增加
3️⃣ 停止作业(如需)
yarn application -kill application_1771941120543_0001
查看yarn任务列表:
yarn application -list查看yarn任务web UI:
slave1:8088
8.6 验收结果
-- 查看Mysql的数据(最新结果)在变化 SELECT COUNT(*) FROM rt.rt_kpi_online_10s; SELECT COUNT(*) FROM rt.rt_kpi_alarm_10s; SELECT COUNT(*) FROM rt.rt_temp_trend_10s; SELECT COUNT(*) FROM rt.rt_alarm_top5_1m;

