提交材料清单
截图1:在slave1验证Kafka实时数据流动
截图2:DESC 查询表结构
截图3:执行Flink后,MySQL 查询到写入数据
截图4:在浏览器访问接口返回正确 JSON 数据
截图5:浏览器能正确显示图表
项目上传:分别导出 Flink 项目、前后端项目的 Zip(压缩包)上传
一、Flink 环境启动
【题目要求】
启动 ZooKeeper 集群;
启动 Kafka 服务;
启动 Flink 集群;
运行设备数据模拟器,确认 Kafka Topic 有数据流入。
【提交截图】
截图1:在slave1验证Kafka实时数据流动的截图
【操作步骤】
# 在 master/slave1/slave2 分别执行,启动 ZooKeeper zkServer.sh start # 检查 ZooKeeper 状态(需看到 Mode: leader 或 follower) zkServer.sh status # 在 master/slave1/slave2 分别执行,启动 Kafka kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties # 启动 Flink 集群 start-cluster.sh # 运行数据模拟器,持续向 Kafka 写入设备状态数据 python /opt/datas/01_device_status_sim.py | \ kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic # 在slave1,验证实时数据流动 kafka-console-consumer.sh --bootstrap-server master:9092 --topic device_status_topic
二、MySQL 数据库初始化
【题目要求】
创建数据库
bigdata1/bigdata2/bigdata3;创建表
rt_kpi_e,用于存储 Flink 实时统计结果;查询表结构,验证创建成功。
【提交截图】
截图2:DESC 查询表结构的截图
【操作步骤】
-- 进入数据库 mysql -uroot -p123456 -- 删除数据库 DROP DATABASE IF EXISTS bigdata1; -- 创建数据库【注意:按班级建数据库名称:bigdata1/bigdata2/bigdata3】 CREATE DATABASE IF NOT EXISTS bigdata1 CHARACTER SET utf8mb4; -- 选择数据库 USE bigdata1; -- 创建 Flink 写入的结果表 DROP TABLE IF EXISTS rt_kpi_e; CREATE TABLE rt_kpi_e ( window_start DATETIME NOT NULL COMMENT '窗口开始时间', high_load_count INT NOT NULL COMMENT '高负载设备数(负载>0.8)', PRIMARY KEY (window_start) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; -- 查询表结构 DESC rt_kpi_e;
三、Flink 实时统计任务
【题目要求】
在 IDEA 中创建 Maven 项目
flink-kpi-job,导入考场提供的pom.xml;创建
FlinkJobE.java;路径:src/main/java/com/exam/flink/FlinkJobE.java5 秒滚动窗口内统计负载 > 0.82 且在线(status=1)的设备数,写入 MySQL;
等待约 30 秒后,查询 MySQL 的
rt_kpi_e表,验证数据写入成功。
【提交截图】
截图3:执行Flink后,MySQL 查询到写入数据的截图
【操作步骤】
【FlinkJobE.java】
package com.exam.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkJobE {
static final String KAFKA_BROKERS = "master:9092";
static final String MYSQL_URL =
"jdbc:mysql://master:3306/bigdata1" +
"?useSSL=false&serverTimezone=Asia/Shanghai";
static final String MYSQL_PASS = "123456";
public static void main(String[] args) throws Exception {
// 步骤1:屏蔽 Flink 内部日志,只输出 ERROR 级别
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "error");
// 步骤2:创建流执行环境,并行度设为 1
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 步骤3:在流环境上构建 Table/SQL 环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 步骤4:注册 Kafka 源表,event_time 转换为 TIMESTAMP 并声明 5 秒水位线
tEnv.executeSql(
"CREATE TABLE device_status (" +
" device_id STRING," +
" status INT," +
" temperature DOUBLE," +
" `load` DOUBLE," +
" voltage DOUBLE," +
" event_time STRING," +
" ts AS TO_TIMESTAMP(event_time)," +
" WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'device_status_topic'," +
" 'properties.bootstrap.servers' = '" + KAFKA_BROKERS + "'," +
" 'properties.group.id' = 'exam_group_e'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'format' = 'json'" +
")"
);
// 步骤5:注册 MySQL 结果表,window_start 作为逻辑主键支持 upsert
tEnv.executeSql(
"CREATE TABLE rt_kpi_e (" +
" window_start TIMESTAMP(3)," +
" high_load_count BIGINT," +
" PRIMARY KEY (window_start) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = '" + MYSQL_URL + "'," +
" 'table-name' = 'rt_kpi_e'," +
" 'username' = 'root'," +
" 'password' = '" + MYSQL_PASS + "'" +
")"
);
// 步骤6:5 秒滚动窗口内统计负载 > 0.82 且在线(status=1)的设备数,写入 MySQL
tEnv.executeSql(
"INSERT INTO rt_kpi_e " +
"SELECT " +
" TUMBLE_START(ts, INTERVAL '5' SECOND) AS window_start, " +
" COUNT(CASE WHEN `load` > 0.82 AND status = 1 THEN device_id END) AS high_load_count " +
"FROM device_status " +
"GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)"
);
System.out.println("✅ Flink 任务已启动,正在写入 MySQL...");
System.out.println(" 请等待约 30 秒后查询 MySQL 验证结果。");
}
}MySQL 验证命令:
USE bigdata1; SELECT * FROM rt_kpi_e ORDER BY window_start DESC LIMIT 10;
四、Spring Boot 接口开发
【题目要求】
创建 Spring Boot 项目,名称为
Backend,并导入pom.xml文件;配置
application.properties参数,连接数据库;创建类
KpiControllerE,路径为:src/main/java/com/exam/backend/controller/KpiControllerE.java使用 Spring Boot 创建接口
/kpi-e,请求方式为GET;在接口中通过 JDBC 查询 MySQL 表
rt_kpi_e;查询字段包括:
window_start、high_load_count;将查询结果以 JSON 列表形式返回。
【提交截图】
截图4:在浏览器访问接口返回正确 JSON 数据截图
【操作步骤】
application.properties 配置文件:
# ===== 服务器配置 ===== server.address=0.0.0.0 server.port=8080 # MySQL 配置【注意数据库更改班级】 spring.datasource.url=jdbc:mysql://master:3306/bigdata1?useSSL=false&serverTimezone=Asia/Shanghai spring.datasource.username=root spring.datasource.password=123456 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
【KpiControllerE】
package com.exam.backend.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.sql.*;
import java.util.*;
// @RestController:标识这是一个 REST 控制器,方法返回值会直接序列化成 JSON 返回给前端
@RestController
public class KpiControllerE {
// @GetMapping:映射 HTTP GET 请求,访问路径为 /kpi-e
@GetMapping("/kpi-e")
public List<Map<String, Object>> getKpiE() {
// 用于存放最终返回的结果集,每条记录是一个 Map(键值对)
List<Map<String, Object>> result = new ArrayList<>();
try {
// 加载 MySQL JDBC 驱动类
Class.forName("com.mysql.cj.jdbc.Driver");
// 建立数据库连接:连接 master 节点的 bigdata1 库,关闭 SSL,设置时区为上海
Connection conn = DriverManager.getConnection(
"jdbc:mysql://master:3306/bigdata1?useSSL=false&serverTimezone=Asia/Shanghai",
"root", "123456"); // 数据库账号 root / 密码 123456
// 创建 Statement 对象,用于执行 SQL 语句
// 取最新 8 条,再按时间正序排列
Statement stmt = conn.createStatement();
// 执行查询:
// 内层子查询按 window_start 倒序取最新 8 条
// 外层再按 window_start 正序排列,保证返回结果按时间从早到晚
ResultSet rs = stmt.executeQuery(
"SELECT window_start, high_load_count FROM (" +
" SELECT window_start, high_load_count" +
" FROM rt_kpi_e ORDER BY window_start DESC LIMIT 8" +
") t ORDER BY window_start ASC");
// 遍历结果集,逐行读取数据
while (rs.next()) {
// LinkedHashMap 保证字段插入顺序(即 JSON 中字段顺序)
Map<String, Object> map = new LinkedHashMap<>();
map.put("window_start", rs.getString("window_start")); // 窗口开始时间
map.put("high_load_count", rs.getInt("high_load_count")); // 高负载计数值
result.add(map); // 把当前行加入结果集
}
// 依次关闭资源,释放数据库连接(注意:异常时不会执行,存在泄漏风险)
rs.close();
stmt.close();
conn.close();
} catch (Exception e) {
// 捕获所有异常并打印堆栈信息(生产环境建议用日志框架记录)
e.printStackTrace();
}
// 返回结果集,Spring 自动转成 JSON
return result;
}
}【接口测试】
浏览器访问 http://localhost:8080/kpi-e
五、Vue + ECharts 数据可视化
【题目要求】
创建 Vue 项目,名称为
Frontend;创建组件
components/ChartE.vue和主页面App.vue,配置代理文件vite.config.js;从接口
/api/kpi-e获取数据;展示最近 8 条统计记录中每个时间窗口的高负载设备数量;
使用 Vue3 + ECharts 展示折线图,并在数值超过 5 的数据点上标注红色警告点;
网页标签页标题为"数据大屏";
【提交截图】
截图5:浏览器能正确显示图表截图
【操作步骤】
1. 创建 Vue 项目
# 切换到 D 盘 D: # 进入代码存放文件夹【注意切换班级】 cd D:\bigdata1\test # 创建 Vue3 项目 npm create vite@latest Frontend -- --template vue # 进入项目并安装依赖 cd Frontend npm install # 安装前后端通信库和图表库 npm i axios echarts
2. 编写代码
【vite.config.js】
import { defineConfig } from 'vite'
import vue from '@vitejs/plugin-vue'
export default defineConfig({
plugins: [vue()],
server: {
port: 5173,
proxy: {
'/api': {
target: 'http://localhost:8080',
changeOrigin: true,
rewrite: path => path.replace(/^\/api/, '')
}
}
}
})【App.vue】
<template>
<div class="chart-wrapper">
<h1>数据可视化大屏</h1>
<h2>操作员:Admin</h2>
<ChartE />
</div>
</template>
<script setup>
import ChartE from './components/ChartE.vue'
</script>
<style>
body {
background-color: #f0f2f5;
margin: 0;
padding: 20px;
font-family: "Microsoft YaHei", sans-serif;
}
.chart-wrapper {
max-width: 1400px;
margin: 40px auto;
padding: 20px;
}
</style>【components/ChartE.vue】
<template>
<!-- 图表容器,ref 用于让 ECharts 挂载到这个 DOM 上 -->
<div ref="chartRef" class="chart-container"></div>
</template>
<script setup>
import { ref, onMounted, onUnmounted } from 'vue'
import * as echarts from 'echarts' // 引入 ECharts 图表库
import axios from 'axios' // 引入 HTTP 请求库
const chartRef = ref() // 图表 DOM 引用,对应模板里的 ref="chartRef"
let chart = null // ECharts 实例,初始为空,首次渲染时创建
let timer = null // 定时器句柄,用于定时刷新数据,卸载时需清除
// 异常阈值:高负载设备数超过此值时标注红色警告点
const ALARM_THRESHOLD = 6
// 渲染图表数据(async 因为内部要 await 请求接口)
const renderChart = async () => {
// 请求后端实时数据,_t 传当前时间戳,每次都不同,用于防止浏览器缓存
const res = await axios.get('/api/kpi-e', { params: { _t: Date.now() } })
const data = res.data // 后端返回的数组,每项形如 { window_start, high_load_count }
// X 轴数据:从每条记录里取出时间窗口,组成数组
const xData = data.map(item => item.window_start)
// Y 轴数据:从每条记录里取出高负载设备数量,组成数组
const yData = data.map(item => item.high_load_count)
// 找出超过阈值的点,生成红色标注(三步链式处理)
const markPoints = data
// ① 先把每条数据转成 { 下标, 值 },下标用于后面定位到 X 轴的第几个点
.map((item, index) => ({
index,
value: item.high_load_count
}))
// ② 过滤:只保留值 >= 阈值的点(即需要告警的点)
.filter(p => p.value >= ALARM_THRESHOLD)
// ③ 把每个告警点转成 ECharts 标注点所需的格式
.map(p => ({
coord: [p.index, p.value], // 标注坐标:[X 轴第几个点, Y 值]
value: p.value, // 标注上显示的数值
itemStyle: { color: '#E74C3C' }, // 标注颜色(红色)
symbol: 'pin', // 标注形状:图钉
symbolSize: 36, // 标注大小
label: { color: '#fff', fontSize: 11 } // 标注内文字样式(白字)
}))
// 如果 chart 还没创建(为 null),就执行下面创建实例的代码
// 只初始化一次图表实例,避免每次刷新都重复创建(否则会内存泄漏/报错)
if (!chart) {
chart = echarts.init(chartRef.value)
}
// 设置图表配置项(setOption 默认合并模式,重复调用会更新已有图表)
chart.setOption({
title: {
text: '高负载设备数实时趋势', // 主标题
subtext: '统计周期:每 5 秒 | 阈值:负载 > 0.82 | 红色标注:超过 5 台', // 副标题,说明统计规则
left: 'center', // 标题水平居中
top: 10, // 标题距顶部 10px
subtextStyle: { fontSize: 12, color: '#888' } // 副标题样式:小号灰字
},
// 控制图表绘图区位置(四周留白),避免标题和坐标轴文字重叠
grid: {
top: 80, // 顶部留白,给标题腾位置
left: 70, // 左侧留白,给 Y 轴名称和刻度腾位置
right: 30,
bottom: 80 // 底部留白,给倾斜的 X 轴文字腾位置
},
tooltip: { trigger: 'axis' }, // 提示框:鼠标悬停时按坐标轴触发,显示该时间点数据
xAxis: {
type: 'category', // 类目轴(X 轴是离散的时间窗口,不是连续数值)
data: xData, // X 轴刻度数据
axisLabel: {
rotate: 35, // X 轴文字倾斜 35°,防止时间文字相互重叠
fontSize: 11,
interval: 0 // 强制显示所有 X 轴标签(默认会自动隐藏一部分)
}
},
yAxis: {
type: 'value', // 数值轴(Y 轴是连续数量)
name: '高负载设备数(台)', // Y 轴名称
nameLocation: 'end', // 名称显示在轴的末端(顶部)
nameGap: 10, // 名称与轴的间距
nameTextStyle: { fontSize: 12, color: '#555' }, // 名称样式
min: 0 // Y 轴从 0 开始,避免数据被压缩、趋势失真
},
series: [{
type: 'line', // 折线图
name: '高负载设备数', // 系列名称(用于 tooltip/图例)
data: yData, // Y 轴对应的数据
smooth: true, // 平滑折线(曲线而非直线连接)
itemStyle: { color: '#2980B9' }, // 折线/数据点颜色(蓝色)
areaStyle: { opacity: 0.12 }, // 折线下方填充区域,透明度 0.12(淡淡阴影)
label: {
show: true, // 显示每个数据点上方的数值标签
position: 'top',
fontSize: 11
},
// 标注点:把上面算出的告警点(红色图钉)标在折线上
markPoint: {
data: markPoints
},
// 标注线:在 Y = 阈值处画一条横向警戒线
markLine: {
silent: true, // 静默:鼠标悬停不响应,不弹 tooltip
lineStyle: { color: '#E74C3C', type: 'dashed' }, // 线样式:红色虚线
data: [{ yAxis: ALARM_THRESHOLD }], // 线的位置:Y 轴等于阈值处
label: { formatter: '警戒线 {c} 台', color: '#E74C3C' } // 线上文字,{c} 自动替换为阈值
}
}]
})
}
onMounted(() => {
renderChart() // 组件挂载后立即渲染一次,避免首屏空白
timer = setInterval(renderChart, 5000) // 每 5 秒调用一次 renderChart,实现实时刷新
})
onUnmounted(() => {
clearInterval(timer) // 组件卸载时清除定时器,防止内存泄漏和无效请求
chart?.dispose() // 销毁图表实例,释放占用的资源(?. 防止 chart 为 null 报错)
})
</script>
<style scoped>
.chart-container {
width: 100%; /* 容器宽度占满父级 */
height: 400px; /* 图表高度固定 400px */
min-width: 700px; /* 最小宽度,防止图表过窄导致 X 轴文字挤在一起 */
background: #fff; /* 白色背景 */
border: 1px solid #e0e0e0; /* 浅灰色边框 */
border-radius: 15px; /* 圆角效果 */
padding: 10px; /* 内边距,让图表不贴边 */
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.05); /* 轻微阴影,增加层次感 */
}
</style>3. 启动 Vue 项目
npm run dev
浏览器访问 http://localhost:5173,查看折线图并截图。
六、项目提交
截图
导出的 Flink 项目压缩包;
导出的后端项目压缩包;
导出的前端项目压缩包;