李翔-大数据技术

Big data technology!

Flink实时分析完整实验【2026-4-9】

1. 启动实验环境

1.1 启动Kafka

# 新建一个master窗口,在master/slave1/slave2启动 ZooKeeper
zkServer.sh start

# 在master/slave1/slave2检查 ZooKeeper状态
zkServer.sh status

# 在master/slave1/slave2启动 Kafka
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties

#  检查Topic主题device_status_topic是否存在
kafka-topics.sh --bootstrap-server master:9092 --list

# 如果主题不存在,执行下面的命令创建
# 在 Kafka 集群中创建名为 device_status_topic 的主题,设置3个分区用于并行处理,副本数为1(单机环境)
kafka-topics.sh --bootstrap-server master:9092 --create \
  --topic device_status_topic \
  --partitions 3 --replication-factor 1


1.2 写入状态流数据到 Kafka

# 在master节点中操作【新开窗口】
cd /opt/datas
# 启动设备状态模拟脚本,并将数据写入 Kafka
python 01_device_status_sim.py | \
kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic

2.MySQL 先建库建表

启动 Mysql :

# 进入 mysql 客户端
mysql -uroot -p123456


创建数据库与表:

-- 创建数据库 rt(如果不存在则创建),并设置默认字符集为 utf8mb4(支持中文)【用于存放实时指标结果表】
CREATE DATABASE IF NOT EXISTS rt DEFAULT CHARSET utf8mb4;

-- 选择数据库
USE rt;


创建数据表:

--------------------------------------------------
-- 创建表1:10秒粒度的在线设备数量(KPI)
--------------------------------------------------
CREATE TABLE IF NOT EXISTS rt.rt_kpi_online_10s (
  dt DATE,               -- 日期
  ts DATETIME,           -- 时间戳(精确到秒)
  online_count BIGINT,   -- 在线数
  PRIMARY KEY (ts)       -- 主键:时间唯一
);

--------------------------------------------------
-- 表2:10秒粒度的温度趋势指标(KPI)
--------------------------------------------------
CREATE TABLE IF NOT EXISTS rt.rt_temp_trend_10s (
  dt DATE,              -- 日期
  ts DATETIME,          -- 时间戳(精确到秒)
  avg_temp DOUBLE,      -- 平均温度
  max_temp DOUBLE,      -- 最高温度
  PRIMARY KEY (ts)      -- 主键:时间唯一
);

--------------------------------------------------
-- 表3:10秒粒度的告警数量(KPI)
--------------------------------------------------
CREATE TABLE IF NOT EXISTS rt.rt_kpi_alarm_10s (
  dt DATE,              -- 日期
  ts DATETIME,          -- 时间戳(精确到秒)
  alarm_count BIGINT,   -- 告警数量
  PRIMARY KEY (ts)      -- 主键:时间唯一
);

--------------------------------------------------
-- 表4:1分钟粒度的告警 Top5 排行
--------------------------------------------------
CREATE TABLE IF NOT EXISTS rt.rt_alarm_top5_1m (
  dt DATE,                          -- 日期
  window_end DATETIME,              -- 窗口结束时间(统计时间点)
  device_id VARCHAR(64),            -- 设备ID
  cnt BIGINT,                       -- 告警次数(该设备1分钟内触发多少次)
  rn BIGINT,                        -- 排名(Top N,第几名)
  PRIMARY KEY (dt, window_end, rn)  -- 主键:同一时间窗口下排名唯一
);


查看创建好的表:

-- 查看创建的表
use rt;
show tables;

8.3 代码设计

本方案基于前面“单指标 Job 独立运行”的实现方式进行升级, 通过统一入口程序 + 指标模块注册机制,实现多指标统一计算与统一提交, 更贴近企业真实生产环境的实时计算架构。


下载POM文件

imgflink-pom.zip



一、模块命名规范

1)推荐命名

  • 主程序入口:

    • RtMetricsSqlJob:统一入口,一键启动所有指标

  • 指标模块:Module(不可直接运行,只负责 SQL 注册)

    • OnlineKpi10sModule

    • TempTrendHopModule

    • AlarmKpi10sModule

    • AlarmTop5_1mModule

📌 解释:

Job = 能跑的主程序入口 Module = 插件模块,只负责把 SQL 注册进去


二、项目结构

src/main/java/com/demo/flink/
 ├─ RtMetricsSqlJob.java                 # ✅ 主程序入口
 └─ module/
    ├─ OnlineKpi10sModule.java           # ✅ 在线KPI模块
    ├─ TempTrendHopModule.java           # ✅ 温度趋势模块
    ├─ AlarmKpi10sModule.java            # ✅ 告警KPI模块
    └─ AlarmTop5_1mModule.java           # ✅ Top5模块

说明:放到 module/ 子包里,项目结构更清爽。


三、主程序职责

主程序 RtMetricsSqlJob 作为统一实时指标作业入口, 在同一个 Flink 作业中完成全部四个指标计算与写入。

主要完成以下 5 件事:

1)创建 Kafka Source 表(device_status / v_alarm_event,仅创建一次) 2)创建 MySQL Sink 表(4 张指标表,仅创建一次) 3)创建 StatementSet,用于统一提交多个 INSERT 4)调用各指标模块的 register() 方法 5)统一 execute(),实现多指标并行计算



8.3.1 在线设备数模块(OnlineKpi10sModule)

路径:src/main/java/com/demo/flink/module/OnlineKpi10sModule.java

package com.demo.flink.module;

import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 模块:在线设备数 KPI(10秒)
 * 作用:
 * ① 计算每10秒在线设备数量
 * ② 将结果写入 MySQL 表
 *
 * 注意:
 * - 这里只负责“计算逻辑 + 写入逻辑”
 * - Kafka源表、MySQL表由主程序统一创建
 */
public class OnlineKpi10sModule {

    public static void register(StreamTableEnvironment tEnv, StatementSet stmtSet) {

        // =====================================================
        // 1.创建临时视图:计算在线设备数(10秒窗口)
        // =====================================================
        // 说明:
        // - 使用 TUMBLE 滚动窗口(每10秒统计一次)
        // - 只统计 status = 1(在线设备)
        // - COUNT(DISTINCT device_id):去重统计设备数
        String viewSql =
                "CREATE TEMPORARY VIEW v_kpi_online_10s AS \n" +
                        "SELECT\n" +
                        "  CAST(window_end AS DATE) AS dt,            -- 日期(用于分区)\n" +
                        "  window_end AS ts,                          -- 窗口结束时间\n" +
                        "  COUNT(DISTINCT device_id) AS online_count  -- 在线设备数\n" +
                        "FROM TABLE(\n" +
                        "  TUMBLE(TABLE device_status, DESCRIPTOR(ts), INTERVAL '10' SECOND)\n" +
                        ")\n" +
                        "WHERE status = 1\n" +
                        "GROUP BY window_start, window_end";

        // 执行SQL,生成临时视图(只在当前会话有效)
        tEnv.executeSql(viewSql);

        // =====================================================
        // 2.写入 MySQL 表
        // =====================================================
        // 说明:
        // - 将计算结果插入 mysql_kpi_online_10s 表
        // - StatementSet 可以一次提交多个任务(推荐做法)
        stmtSet.addInsertSql(
                "INSERT INTO mysql_kpi_online_10s " +
                        "SELECT dt, ts, online_count FROM v_kpi_online_10s"
        );
    }
}


8.3.2 温度趋势模块(TempTrendHopModule)

路径:src/main/java/com/demo/flink/module/TempTrendHopModule.java

package com.demo.flink.module;

import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 模块:温度趋势(30秒窗口,每10秒滑动)
 * 作用:
 * ① 计算温度平均值和最大值
 * ② 形成连续的温度变化趋势
 *
 * 注意:
 * - 使用滑动窗口(HOP)
 * - Source / Sink 由主程序统一创建
 */
public class TempTrendHopModule {

    public static void register(StreamTableEnvironment tEnv, StatementSet stmtSet) {

        // =====================================================
        // 1.创建临时视图:温度趋势计算
        // =====================================================
        // 说明:
        // - 窗口大小:30秒
        // - 滑动步长:10秒(每10秒输出一次结果)
        // - AVG:平均温度
        // - MAX:最高温度
        String viewSql =
                "CREATE TEMPORARY VIEW v_temp_trend_hop AS \n" +
                        "SELECT\n" +
                        "  CAST(window_end AS DATE) AS dt,     -- 日期\n" +
                        "  window_end AS ts,                   -- 窗口结束时间\n" +
                        "  AVG(temperature) AS avg_temp,       -- 平均温度\n" +
                        "  MAX(temperature) AS max_temp        -- 最高温度\n" +
                        "FROM TABLE(\n" +
                        "  HOP(TABLE device_status, DESCRIPTOR(ts),\n" +
                        "      INTERVAL '10' SECOND, INTERVAL '30' SECOND)\n" +
                        ")\n" +
                        "GROUP BY window_start, window_end";

        // 执行SQL,生成临时视图
        tEnv.executeSql(viewSql);


        // =====================================================
        // 2.写入 MySQL 表
        // =====================================================
        // 说明:
        // - 每10秒写入一次温度趋势数据
        // - 用于前端折线图展示
        stmtSet.addInsertSql(
                "INSERT INTO mysql_temp_trend_10s " +
                        "SELECT dt, ts, avg_temp, max_temp FROM v_temp_trend_hop"
        );
    }
}


8.3.3 告警数模块(AlarmKpi10sModule)

路径:src/main/java/com/demo/flink/module/AlarmKpi10sModule.java

package com.demo.flink.module;

import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 模块:告警数量 KPI(10秒)
 * 作用:
 * ① 统计每10秒内的告警数量
 * ② 反映系统异常情况
 *
 * 注意:
 * - 数据来源是 alarm_event(已过滤出的异常数据)
 * - Source / Sink 由主程序统一创建
 */
public class AlarmKpi10sModule {

    public static void register(StreamTableEnvironment tEnv, StatementSet stmtSet) {

        // =====================================================
        // 1.创建临时视图:告警数量统计
        // =====================================================
        // 说明:
        // - 使用 TUMBLE 滚动窗口(每10秒统计一次)
        // - COUNT(*):统计告警总数
        String viewSql =
                "CREATE TEMPORARY VIEW v_kpi_alarm_10s AS \n" +
                        "SELECT\n" +
                        "  CAST(window_end AS DATE) AS dt,   -- 日期\n" +
                        "  window_end AS ts,                 -- 窗口结束时间\n" +
                        "  COUNT(*) AS alarm_count           -- 告警数量\n" +
                        "FROM TABLE(\n" +
                        "  TUMBLE(TABLE v_alarm_event, DESCRIPTOR(ts), INTERVAL '10' SECOND)\n" +
                        ")\n" +
                        "GROUP BY window_start, window_end";

        // 执行SQL,生成临时视图
        tEnv.executeSql(viewSql);


        // =====================================================
        // 2.写入 MySQL 表
        // =====================================================
        // 说明:
        // - 每10秒输出一次告警统计结果
        // - 可用于大屏告警指标展示
        stmtSet.addInsertSql(
                "INSERT INTO mysql_kpi_alarm_10s " +
                        "SELECT dt, ts, alarm_count FROM v_kpi_alarm_10s"
        );
    }
}

8.3.4 告警Top5模块(AlarmTop5_1mModule)

路径:src/main/java/com/demo/flink/module/AlarmTop5_1mModule.java

package com.demo.flink.module;

import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 模块:告警 Top5(1分钟窗口)
 * 作用:
 * ① 统计每台设备1分钟内的告警次数
 * ② 找出每个时间窗口内告警次数最多的前5台设备
 *
 * 注意:
 * - 本模块分两步完成:先统计,再排名
 * - Source / Sink 由主程序统一创建
 */
public class AlarmTop5_1mModule {

    public static void register(StreamTableEnvironment tEnv, StatementSet stmtSet) {

        // =====================================================
        // 1.创建临时视图:统计每台设备每1分钟内的告警次数
        // =====================================================
        // 说明:
        // - 使用 TUMBLE 滚动窗口,窗口大小为1分钟
        // - 按 device_id 分组
        // - COUNT(*) 表示该设备在这个窗口内触发了多少次告警
        String cntView =
                "CREATE TEMPORARY VIEW v_alarm_cnt_1m AS \n" +
                        "SELECT\n" +
                        "  window_end,                     -- 窗口结束时间\n" +
                        "  device_id,                      -- 设备编号\n" +
                        "  COUNT(*) AS cnt                 -- 告警次数\n" +
                        "FROM TABLE(\n" +
                        "  TUMBLE(TABLE v_alarm_event, DESCRIPTOR(ts), INTERVAL '1' MINUTE)\n" +
                        ")\n" +
                        "GROUP BY window_start, window_end, device_id";

        // 执行SQL,生成“每台设备每分钟告警次数”视图
        tEnv.executeSql(cntView);


        // =====================================================
        // 2.创建临时视图:对每个窗口内的设备进行排名,取前5名
        // =====================================================
        // 说明:
        // - ROW_NUMBER():给每个窗口内的设备按告警次数排序编号
        // - PARTITION BY window_end:每个窗口单独排名
        // - ORDER BY cnt DESC:按告警次数从大到小排序
        // - WHERE rn <= 5:只保留前5名
        String top5View =
                "CREATE TEMPORARY VIEW v_alarm_top5_1m AS \n" +
                        "SELECT\n" +
                        "  CAST(window_end AS DATE) AS dt,   -- 日期\n" +
                        "  window_end,                      -- 窗口结束时间\n" +
                        "  device_id,                       -- 设备编号\n" +
                        "  cnt,                             -- 告警次数\n" +
                        "  rn                               -- 排名\n" +
                        "FROM (\n" +
                        "  SELECT\n" +
                        "    window_end,\n" +
                        "    device_id,\n" +
                        "    cnt,\n" +
                        "    ROW_NUMBER() OVER (PARTITION BY window_end ORDER BY cnt DESC) AS rn\n" +
                        "  FROM v_alarm_cnt_1m\n" +
                        ")\n" +
                        "WHERE rn <= 5";

        // 执行SQL,生成 Top5 结果视图
        tEnv.executeSql(top5View);

        // =====================================================
        // 3.写入 MySQL 表
        // =====================================================
        // 说明:
        // - 将每个窗口内告警次数前5的设备写入 MySQL
        // - 可用于大屏 Top5 排行榜展示
        stmtSet.addInsertSql(
                "INSERT INTO mysql_alarm_top5_1m " +
                        "SELECT dt, window_end, device_id, cnt, rn FROM v_alarm_top5_1m"
        );
    }
}


8.3.5 主程序 RtMetricsSqlJob 实现(统一入口,一键启动)

路径:src/main/java/com/demo/flink/RtMetricsSqlJob.java

package com.demo.flink;

import com.demo.flink.module.AlarmKpi10sModule;
import com.demo.flink.module.AlarmTop5_1mModule;
import com.demo.flink.module.OnlineKpi10sModule;
import com.demo.flink.module.TempTrendHopModule;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Job:实时指标统一入口(模块化组合)
 *
 * 设计目标:只启动 1 个 Flink 作业,同时计算 4 个实时指标,并持续写入 MySQL
 *
 * 核心思路:
 * 1)主程序统一创建 Source(Kafka 表)与 Sink(MySQL 表)
 * 2)各指标模块只负责注册:View + Insert SQL(不再重复建 Source/Sink)
 * 3)最后由 StatementSet.execute() 一次性提交所有 Insert(只 execute 一次)
 */
public class RtMetricsSqlJob {

    public static void main(String[] args) throws Exception {

        // 设置日志级别为 "ERROR",减少不必要的日志输出
        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "error");

        // =========================================================
        // 1.Flink 执行环境(相当于“运行引擎”)
        //    - 并行度设为 1:教学演示更容易观察(日志更少、数据更直观)
        //    - 开启 checkpoint:流作业容错能力(断点恢复、保证一致性)
        // =========================================================
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(30000); // 30s 一次(别太频繁,避免压力过大)

        System.out.println("====== 1. Flink 执行环境初始化完成 ======");

        // =========================================================
        // 2.Table 环境(Flink SQL 的入口)
        //    后续所有 executeSql(...) 都是向这个 SQL 环境注册表 / 视图 / 写入任务
        // =========================================================
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        System.out.println("====== 2. Flink  Table 环境(Flink SQL 的入口)初始化完成 ======");

        // =========================================================
        // 3.统一创建 Kafka Source(只建一次,所有模块共享)
        //    表名:device_status
        //    - topic:device_status_topic
        //    - ts:从 event_time 解析得到事件时间
        //    - watermark:允许最大 5 秒乱序(教学常用配置)
        // =========================================================
        String createDeviceStatus =
                "CREATE TABLE device_status (\n" +
                        "  event_type STRING,\n" +
                        "  device_id STRING,\n" +
                        "  status INT,\n" +
                        "  temperature DOUBLE,\n" +
                        "  `load` DOUBLE,\n" +
                        "  voltage DOUBLE,\n" +
                        "  event_time STRING,\n" +
                        "  ts AS TO_TIMESTAMP(event_time),\n" +
                        "  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'device_status_topic',\n" +
                        "  'properties.bootstrap.servers' = 'master:9092',\n" +
                        "  'properties.group.id' = 'rt_metrics_all_g1',\n" +
                        "  'scan.startup.mode' = 'latest-offset',\n" +
                        "  'format' = 'json',\n" +
                        "  'json.ignore-parse-errors' = 'true'\n" +
                        ")";
        tEnv.executeSql(createDeviceStatus);

        System.out.println("====== 3. Kafka Source 初始化完成 ======");

        // =========================================================
        // 4.统一生成“告警事件流”(从状态流推导)
        //     目的:不依赖 alarm_event_topic,只用 device_status 就能产生告警事件
        //     输出字段:
        //       - device_id:设备
        //       - alarm_type:告警类型(根据规则判定)
        //       - ts:事件时间(沿用 device_status 的 ts)
        // =========================================================
        String createAlarmEventView =
                "CREATE TEMPORARY VIEW v_alarm_event AS \n" +
                        "SELECT\n" +
                        "  device_id,\n" +
                        "  CASE\n" +
                        "    WHEN temperature >= 80 THEN 'HIGH_TEMP'\n" +
                        "    WHEN voltage <= 200 THEN 'LOW_VOLT'\n" +
                        "    WHEN `load` >= 0.90 THEN 'HIGH_LOAD'\n" +
                        "    WHEN status = 0 THEN 'DEVICE_OFFLINE'\n" +
                        "  END AS alarm_type,\n" +
                        "  ts\n" +
                        "FROM device_status\n" +
                        "WHERE temperature >= 80 OR voltage <= 200 OR `load` >= 0.90 OR status = 0";
        tEnv.executeSql(createAlarmEventView);
 
        System.out.println("====== 4.“告警事件流”视图生成完成 ======");

        // =========================================================
        // 5.统一创建 MySQL Sink(只建一次,所有模块共享)
        //    说明:
        //    - Flink JDBC Sink 若产生更新流(update/delete),必须声明 PRIMARY KEY
        // =========================================================

        // 5.1 在线设备数指标(10秒滚动窗口结果写入 MySQL)
        String mysqlOnline =
                "CREATE TABLE mysql_kpi_online_10s (\n" +
                        "  dt DATE,\n" +
                        "  ts TIMESTAMP(3),\n" +
                        "  online_count BIGINT\n" +
                        ") WITH (\n" +
                        "  'connector'='jdbc',\n" +
                        "  'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
                        "  'table-name'='rt_kpi_online_10s',\n" +
                        "  'username'='root',\n" +
                        "  'password'='123456',\n" +
                        "  'driver'='com.mysql.cj.jdbc.Driver',\n" +
                        "  'sink.buffer-flush.max-rows'='100',\n" +
                        "  'sink.buffer-flush.interval'='2s'\n" +
                        ")";
        tEnv.executeSql(mysqlOnline);

        // 5.2 告警数量指标表(10秒滚动窗口结果写入 MySQL)
        String mysqlAlarm =
                "CREATE TABLE mysql_kpi_alarm_10s (\n" +
                        "  dt DATE,\n" +
                        "  ts TIMESTAMP(3),\n" +
                        "  alarm_count BIGINT\n" +
                        ") WITH (\n" +
                        "  'connector'='jdbc',\n" +
                        "  'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
                        "  'table-name'='rt_kpi_alarm_10s',\n" +
                        "  'username'='root',\n" +
                        "  'password'='123456',\n" +
                        "  'driver'='com.mysql.cj.jdbc.Driver',\n" +
                        "  'sink.buffer-flush.max-rows'='100',\n" +
                        "  'sink.buffer-flush.interval'='2s'\n" +
                        ")";
        tEnv.executeSql(mysqlAlarm);

        // 5.3 温度趋势指标表(30秒窗口、每10秒输出一次结果,写入 MySQL)
        String mysqlTrend =
                "CREATE TABLE mysql_temp_trend_10s (\n" +
                        "  dt DATE,\n" +
                        "  ts TIMESTAMP(3),\n" +
                        "  avg_temp DOUBLE,\n" +
                        "  max_temp DOUBLE\n" +
                        ") WITH (\n" +
                        "  'connector'='jdbc',\n" +
                        "  'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
                        "  'table-name'='rt_temp_trend_10s',\n" +
                        "  'username'='root',\n" +
                        "  'password'='123456',\n" +
                        "  'driver'='com.mysql.cj.jdbc.Driver',\n" +
                        "  'sink.buffer-flush.max-rows'='100',\n" +
                        "  'sink.buffer-flush.interval'='2s'\n" +
                        ")";
        tEnv.executeSql(mysqlTrend);

        // 5.4 告警 Top5 排行表(1分钟窗口结果写入 MySQL):Rank 会产生更新流 → 必须声明主键
        String mysqlTop5 =
                "CREATE TABLE mysql_alarm_top5_1m (\n" +
                        "  dt DATE,\n" +
                        "  window_end TIMESTAMP(3),\n" +
                        "  device_id STRING,\n" +
                        "  cnt BIGINT,\n" +
                        "  rn BIGINT,\n" +
                        "  PRIMARY KEY (dt, window_end, rn) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        "  'connector'='jdbc',\n" +
                        "  'url'='jdbc:mysql://master:3306/rt?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
                        "  'table-name'='rt_alarm_top5_1m',\n" +
                        "  'username'='root',\n" +
                        "  'password'='123456',\n" +
                        "  'driver'='com.mysql.cj.jdbc.Driver',\n" +
                        "  'sink.buffer-flush.max-rows'='100',\n" +
                        "  'sink.buffer-flush.interval'='2s'\n" +
                        ")";
        tEnv.executeSql(mysqlTop5);

        System.out.println("====== 5. MySQL Sink(四个表) 创建完成 ======");

        // =========================================================
        // 6. 定义StatementSet,作用:收集多个 INSERT 语句(最后统一提交)
        // =========================================================
        StatementSet stmtSet = tEnv.createStatementSet();
 
        System.out.println("====== 6. 定义StatementSet完成 ======");

        // =========================================================
        // 7.注册模块:模块只负责(View + Insert)
        //    - 每个模块负责:创建视图(View) + 注册写入任务(Insert)
        //    - 执行后:
        //        tEnv 中新增一个临时视图(逻辑表)
        //        stmtSet 中新增一条写入 MySQL 的任务
        // =========================================================
        OnlineKpi10sModule.register(tEnv, stmtSet);
        TempTrendHopModule.register(tEnv, stmtSet);
        AlarmKpi10sModule.register(tEnv, stmtSet);
        AlarmTop5_1mModule.register(tEnv, stmtSet);
 
        System.out.println("====== 7.1 在线设备数统计模块:已加入执行任务(等待执行)");
        System.out.println("====== 7.2 温度趋势统计模块:已加入执行任务(等待执行)");
  System.out.println("====== 7.3 告警数量统计模块:已加入执行任务(等待执行)");
  System.out.println("====== 7.4 告警Top5统计模块:已加入执行任务(等待执行)");

        // =========================================================
        // 8.统一执行:真正提交作业(IDEA 运行时,这一行开始“常驻运行”)
        // =========================================================
        System.out.println("====== 8. 实时数据分析作业开始持续执行.........");
        stmtSet.execute();
    }
}

总结:

  • 以前:每个 Job 自己建环境、自己 execute → 资源占用大、难统一管理

  • 现在:主程序统一建环境 + 模块只负责注册 → 更省资源、更好维护、更适合分工演示


8.3.6 IDEA运行,验收结果


(1) 选择数据库

USE rt;

(2) 推荐教学查询方式(按时间倒序)

这样更适合实时项目观察:

SELECT * FROM rt.rt_kpi_online_10s
ORDER BY ts DESC
LIMIT 5;

SELECT * FROM rt.rt_kpi_alarm_10s
ORDER BY ts DESC
LIMIT 5;

SELECT * FROM rt.rt_temp_trend_10s
ORDER BY ts DESC
LIMIT 8;

SELECT * FROM rt.rt_alarm_top5_1m
ORDER BY window_end DESC, rn ASC
LIMIT 5;



(4) 检查各表的数据量

SELECT COUNT(*) FROM rt.rt_kpi_online_10s;
SELECT COUNT(*) FROM rt.rt_kpi_alarm_10s;
SELECT COUNT(*) FROM rt.rt_temp_trend_10s;
SELECT COUNT(*) FROM rt.rt_alarm_top5_1m;

如果 count > 0:

✅ Flink 写入成功 ✅ Kafka 消费正常 ✅ Sink 正常



8.4 打包与部署(服务器长期运行)


8.4.3 本地/服务器打包

IDEA → Maven → flink-rt-metrics → Lifecycle → clean → package

# 运行打包程序后,生成target目录,内含 flink-rt-metrics-1.0.0.jar 包

8.4.4 上传 JAR 到服务器

复制 flink-rt-metrics-1.0.0.jar 到 master节点的 /opt/jars/


8.5 提交 Flink 作业

8.5.1  运行方式一:集群模式

1)使用 集群 模式提交Flink实时作业

# 启动 Flink 集群,启动 JobManager(作业管理器)和 TaskManager(任务管理器):
start-cluster.sh

# 验证集群启动:
# 访问 Flink 的 Web UI 来验证集群是否正常运行,地址是 http://<jobmanager_host>:8081。
http://master:8081

# 方法1:运行 Flink 作业
flink run -c com.demo.flink.RtMetricsSqlJob /opt/jars/flink-rt-metrics-1.0.0.jar

# 方法2:Flink 作业只使用最小的资源,适用于小规模测试和开发环境。
flink run -c com.demo.flink.RtMetricsSqlJob \
  /opt/jars/flink-rt-metrics-1.0.0.jar \
  --jobmanager.memory.process.size 512m \
  --taskmanager.memory.process.size 512m \
  --taskmanager.numberOfTaskSlots 1 \
  --parallelism 1


Flink 任务被成功提交:

image-20260409094013653

2)检查Flink 作业状态

2.1 检查作业状态

flink list

image-20260409094029580

flink list 输出来看,作业 05ec8f6eefe9cfafb020c2e739a4d709 已经处于 运行中RUNNING


2.2 使用 Flink Web UI,查看作业的状态和监控 Flink 集群的健康状况

# 浏览器访问
192.168.36.100:8081

image-20260409094047315

3)停止任务

流处理任务(Kafka → Flink → MySQL)

特点:

  • 一直运行

  • 不会自动结束

  • 必须手动停止

第一步:查任务

flink list

会看到类似:

------------------ Running/Restarting Jobs -------------------
19.03.2026 15:17:18 : be708ed0831289e210b4f5072d37a197

👉 记住这个 JobId,如上:be708ed0831289e210b4f5072d37a197

第二步:停止任务

flink cancel JobId
# 举例:flink cancel be708ed0831289e210b4f5072d37a197  


8.5.2 运行模式二:flink on yarn

注意:本步骤使用 YARN per-job 模式提交实时作业


1)⚠ 提交前注意事项

1️⃣ 检查 8081 端口是否被占用

ss -tunlp | grep 8081

说明:

  • 如果当前机器已有 Flink 集群占用 8081(如 Standalone Session 或其他 YARN 作业)

  • 可能导致新提交的作业启动失败(端口冲突)


2️⃣ 如有占用,先关闭已有 Flink 集群

如果之前启动过 Standalone 模式:

$FLINK_HOME/bin/stop-cluster.sh

或确认是否有正在运行的 YARN 作业:

yarn application -list

如需停止某个应用:

yarn application -kill <application_id>


2)使用 YARN per-job 模式提交Flink实时作业

# 使用 YARN Per-Job 模式提交 Flink 作业
# 提交 Flink 实时指标统一计算作业到 YARN 上运行
# 运行流程:本地客户端 → 向 YARN 申请资源 → 启动 Flink per-job 集群 → 运行 RtMetricsSqlJob

# -t yarn-per-job      :指定运行模式为 YARN 独立作业模式(每次提交都会启动一个独立的 Flink 集群)
# -c                   :指定程序的主类(入口类)
# com.demo.flink.RtMetricsSqlJob :要运行的实时多指标统一计算作业
# /opt/jars/flink-rt-metrics-1.0.0.jar :已打包好的可执行 Jar 文件路径

flink run -t yarn-per-job \
  -c com.demo.flink.RtMetricsSqlJob \
  /opt/jars/flink-rt-metrics-1.0.0.jar

作业成功运行(YARN 模式)

运行成功后,终端会输出如下关键信息:

image-20260409094115997

Found Web Interface slave1:8081 of application 'application_1771941120543_0001'

信息解读

  • Flink 已经在 YARN 上成功启动了一个 per-job 集群

  • 这个集群的 JobManager(ApplicationMaster)运行在 slave1

  • 当前集群的 Web UI 地址为:slave1:8081

  • 本次 YARN 应用 ID 为:application_1771941120543_0001

含义总结

  • YARN 已分配资源,并成功启动 Flink 集群, 现在可以通过 8081 访问这个集群的监控页面


3)查看作业运行状态

1️⃣ 打开 Flink Web UI

浏览器访问:

http://slave1:8081

重点查看:

  • Jobs 页面:作业是否为 RUNNING

  • Checkpoints 页面:是否正常执行(如已开启)


2️⃣ 验证 MySQL 是否持续写入

在数据库中执行:

SELECT * FROM rt.rt_kpi_online_10s ORDER BY ts DESC LIMIT 5;

确认:

  • 有最新时间数据

  • 数据持续增加


3️⃣ 停止作业(如需)

yarn application -kill application_1771941120543_0001

查看yarn任务列表:yarn application -list

查看yarn任务web UI:slave1:8088


8.6 验收结果

-- 查看Mysql的数据(最新结果)在变化
SELECT COUNT(*) FROM rt.rt_kpi_online_10s;
SELECT COUNT(*) FROM rt.rt_kpi_alarm_10s;
SELECT COUNT(*) FROM rt.rt_temp_trend_10s;
SELECT COUNT(*) FROM rt.rt_alarm_top5_1m;

image-20260409094135462




发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1