李翔-大数据技术

Big data technology!

练习题C

《大数据综合项目实训》练习 C


提交材料清单

  1. 截图1:在slave1验证Kafka实时数据流动

  2. 截图2:DESC 查询表结构

  3. 截图3:执行Flink后,MySQL 查询到写入数据

  4. 截图4:在浏览器访问接口返回正确 JSON 数据

  5. 截图5:浏览器能正确显示图表

  6. 项目上传:分别导出 Flink 项目、前后端项目的 Zip(压缩包)上传


一、Flink 环境启动

【题目要求】

  • 启动 ZooKeeper 集群;

  • 启动 Kafka 服务;

  • 启动 Flink 集群;

  • 运行设备数据模拟器,确认 Kafka Topic 有数据流入。

【提交截图】

  • 截图1:在slave1验证Kafka实时数据流动的截图

【操作步骤】

# 在 master/slave1/slave2 分别执行,启动 ZooKeeper
zkServer.sh start

# 检查 ZooKeeper 状态(需看到 Mode: leader 或 follower)
zkServer.sh status

# 在 master/slave1/slave2 分别执行,启动 Kafka
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties

# 启动 Flink 集群
start-cluster.sh

# 运行数据模拟器,持续向 Kafka 写入设备状态数据
python /opt/datas/01_device_status_sim.py | \
  kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic

# 在slave1,验证实时数据流动
kafka-console-consumer.sh --bootstrap-server master:9092 --topic device_status_topic

二、MySQL 数据库初始化

【题目要求】

  • 创建数据库 bigdata1/bigdata2/bigdata3

  • 创建表 rt_kpi_c,用于存储 Flink 实时统计结果;

  • 查询表结构,验证创建成功。

【提交截图】

  • 截图2:DESC 查询表结构的截图

【操作步骤】

-- 进入数据库
mysql -uroot -p123456

-- 删除数据库
DROP DATABASE IF EXISTS bigdata1;

-- 创建数据库【注意:按班级建数据库名称:bigdata1/bigdata2/bigdata3】
CREATE DATABASE IF NOT EXISTS bigdata1
  CHARACTER SET utf8mb4;

-- 选择数据库
USE bigdata1;

-- 创建 Flink 写入的结果表
DROP TABLE IF EXISTS rt_kpi_c;

CREATE TABLE rt_kpi_c (
    window_start       DATETIME  NOT NULL  COMMENT '窗口开始时间',
    low_volt_count     INT       NOT NULL  COMMENT '低电压设备数',
    PRIMARY KEY (window_start)
) ENGINE=InnoDB
DEFAULT CHARSET=utf8mb4;

-- 查询表结构
DESC rt_kpi_c;

三、Flink 实时统计任务

【题目要求】

  • 在 IDEA 中创建 Maven 项目 flink-kpi-job,导入考场提供的 pom.xml

  • 创建 FlinkJobC.java ;路径:src/main/java/com/exam/flink/FlinkJobC.java

  • 5 秒滚动窗口内统计电压 < 212V 且在线(status=1)的设备数,写入 MySQL;

  • 等待约 30 秒后,查询 MySQL 的 rt_kpi_c 表,验证数据写入成功。

【提交截图】

  • 截图3:执行Flink后,MySQL 查询到写入数据的截图

【操作步骤】

FlinkJobC.java

package com.exam.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkJobC {

    static final String KAFKA_BROKERS = "master:9092";
    static final String MYSQL_URL     =
            "jdbc:mysql://master:3306/bigdata1" +
            "?useSSL=false&serverTimezone=Asia/Shanghai";
    static final String MYSQL_PASS    = "123456";

    public static void main(String[] args) throws Exception {

        // 步骤1:屏蔽 Flink 内部日志,只输出 ERROR 级别
        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "error");

        // 步骤2:创建流执行环境,并行度设为 1
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 步骤3:在流环境上构建 Table/SQL 环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 步骤4:注册 Kafka 源表,event_time 转换为 TIMESTAMP 并声明 5 秒水位线
        tEnv.executeSql(
                "CREATE TABLE device_status (" +
                "  device_id    STRING," +
                "  status       INT," +
                "  temperature  DOUBLE," +
                "  `load`       DOUBLE," +
                "  voltage      DOUBLE," +
                "  event_time   STRING," +
                "  ts AS TO_TIMESTAMP(event_time)," +
                "  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
                ") WITH (" +
                "  'connector'                    = 'kafka'," +
                "  'topic'                        = 'device_status_topic'," +
                "  'properties.bootstrap.servers' = '" + KAFKA_BROKERS + "'," +
                "  'properties.group.id'          = 'exam_group_c'," +
                "  'scan.startup.mode'            = 'latest-offset'," +
                "  'format'                       = 'json'" +
                ")"
        );

        // 步骤5:注册 MySQL 结果表,window_start 作为逻辑主键支持 upsert
        tEnv.executeSql(
                "CREATE TABLE rt_kpi_c (" +
                "  window_start    TIMESTAMP(3)," +
                "  low_volt_count  BIGINT," +
                "  PRIMARY KEY (window_start) NOT ENFORCED" +
                ") WITH (" +
                "  'connector'  = 'jdbc'," +
                "  'url'        = '" + MYSQL_URL + "'," +
                "  'table-name' = 'rt_kpi_c'," +
                "  'username'   = 'root'," +
                "  'password'   = '" + MYSQL_PASS + "'" +
                ")"
        );

        // 步骤6:5 秒滚动窗口内统计电压 < 212V 且在线(status=1)的设备数,写入 MySQL
        tEnv.executeSql(
                "INSERT INTO rt_kpi_c " +
                "SELECT " +
                "  TUMBLE_START(ts, INTERVAL '5' SECOND) AS window_start, " +
                "  COUNT(CASE WHEN voltage < 212 AND status = 1 THEN device_id END) AS low_volt_count " +
                "FROM device_status " +
                "GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)"
        );

        System.out.println("FlinkJobC 已启动:低电压设备数实时统计");
        System.out.println("统计口径: 每 5 秒统计 voltage < 212 且 status = 1 的设备数");
        System.out.println("写入 MySQL 表: bigdata1.rt_kpi_c");
        System.out.println("请等待约 30 秒后查询 MySQL 验证结果。");
    }
}

MySQL 验证命令:

USE bigdata1;

SELECT * FROM rt_kpi_c ORDER BY window_start DESC LIMIT 10;



四、Spring Boot 接口开发

【题目要求】

  • 创建 Spring Boot 项目,名称为 Backend,并导入 pom.xml 文件;

  • 配置 application.properties 参数,连接数据库;

  • 创建类 KpiControllerC,路径为:src/main/java/com/exam/backend/controller/KpiControllerC.java

  • 使用 Spring Boot 创建接口 /kpi-c,请求方式为 GET

  • 在接口中通过 JDBC 查询 MySQL 表 rt_kpi_c

  • 查询字段包括:window_startlow_volt_count

  • 将查询结果以 JSON 列表形式返回。

【提交截图】

  • 截图4:在浏览器访问接口返回正确 JSON 数据截图

【操作步骤】

application.properties 配置文件:

# ===== 服务器配置 =====
server.address=0.0.0.0
server.port=8080

# MySQL 配置【注意数据库更改班级】
spring.datasource.url=jdbc:mysql://master:3306/bigdata1?useSSL=false&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

KpiControllerC

package com.exam.backend.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.sql.*;
import java.util.*;

// @RestController:标识这是一个 REST 控制器,方法返回值会直接序列化成 JSON 返回给前端
@RestController
public class KpiControllerC {

    // @GetMapping:映射 HTTP GET 请求,访问路径为 /kpi-c
    @GetMapping("/kpi-c")
    public List<Map<String, Object>> getKpiC() {
        // 用于存放最终返回的结果集,每条记录是一个 Map(键值对)
        List<Map<String, Object>> result = new ArrayList<>();
        try {
            // 加载 MySQL JDBC 驱动类
            Class.forName("com.mysql.cj.jdbc.Driver");

            // 建立数据库连接:连接 master 节点的 bigdata1 库,关闭 SSL,设置时区为上海
            Connection conn = DriverManager.getConnection(
                    "jdbc:mysql://master:3306/bigdata1?useSSL=false&serverTimezone=Asia/Shanghai",
                    "root", "123456"); // 数据库账号 root / 密码 123456

            // 创建 Statement 对象,用于执行 SQL 语句
            // 取最新 8 条,再按时间正序排列
            Statement stmt = conn.createStatement();

            // 执行查询:
            //   内层子查询按 window_start 倒序取最新 8 条
            //   外层再按 window_start 正序排列,保证返回结果按时间从早到晚
            ResultSet rs = stmt.executeQuery(
                    "SELECT window_start, low_volt_count FROM (" +
                    "  SELECT window_start, low_volt_count" +
                    "  FROM rt_kpi_c ORDER BY window_start DESC LIMIT 8" +
                    ") t ORDER BY window_start ASC");

            // 遍历结果集,逐行读取数据
            while (rs.next()) {
                // LinkedHashMap 保证字段插入顺序(即 JSON 中字段顺序)
                Map<String, Object> map = new LinkedHashMap<>();
                map.put("window_start",   rs.getString("window_start"));  // 窗口开始时间
                map.put("low_volt_count", rs.getInt("low_volt_count"));   // 低电压计数值
                result.add(map); // 把当前行加入结果集
            }

            // 依次关闭资源,释放数据库连接(注意:异常时不会执行,存在泄漏风险)
            rs.close();
            stmt.close();
            conn.close();
        } catch (Exception e) {
            // 捕获所有异常并打印堆栈信息(生产环境建议用日志框架记录)
            e.printStackTrace();
        }
        // 返回结果集,Spring 自动转成 JSON
        return result;
    }
}

【接口测试】

浏览器访问 http://localhost:8080/kpi-c



五、Vue + ECharts 数据可视化

【题目要求】

  • 创建 Vue 项目,名称为 Frontend

  • 创建组件 components/ChartC.vue 和主页面 App.vue,配置代理文件 vite.config.js

  • 使用 Vue3 + ECharts 展示面积图;

  • 从接口 /api/kpi-c 获取数据;

  • 展示最近 8 条统计记录中每个时间窗口的低电压设备数量;

  • 网页标签页标题为"数据大屏";

【提交截图】

  • 截图5:浏览器能正确显示图表截图

【操作步骤】

1. 创建 Vue 项目

# 切换到 D 盘
D:

# 进入代码存放文件夹【注意切换班级】
cd D:\bigdata1\test

# 创建 Vue3 项目
npm create vite@latest Frontend -- --template vue

# 进入项目并安装依赖
cd Frontend
npm install

# 安装前后端通信库和图表库
npm i axios echarts

2. 编写代码

vite.config.js
import { defineConfig } from 'vite'
import vue from '@vitejs/plugin-vue'

export default defineConfig({
  plugins: [vue()],
  server: {
    port: 5173,
    proxy: {
      '/api': {
        target: 'http://localhost:8080',
        changeOrigin: true,
        rewrite: path => path.replace(/^\/api/, '')
      }
    }
  }
})
App.vue
<template>
  <div class="chart-wrapper">
    <h1>数据可视化大屏</h1>
    <h2>操作员:Admin</h2>
    <ChartC />
  </div>
</template>

<script setup>
import ChartC from './components/ChartC.vue'
</script>

<style>
body {
  background-color: #f0f2f5;
  margin: 0;
  padding: 20px;
  font-family: "Microsoft YaHei", sans-serif;
}

.chart-wrapper {
  max-width: 1400px;
  margin: 40px auto;
  padding: 20px;
}
</style>
components/ChartC.vue
<template>
  <!-- 图表容器,ref 用于让 ECharts 挂载到这个 DOM 上 -->
  <div ref="chartRef" class="chart-container"></div>
</template>

<script setup>
import { ref, onMounted, onUnmounted } from 'vue'
import * as echarts from 'echarts'   // 引入 ECharts 图表库
import axios from 'axios'            // 引入 HTTP 请求库

const chartRef = ref()     // 图表 DOM 引用,对应模板里的 ref="chartRef"
let chart = null           // ECharts 实例,初始为空,首次渲染时创建
let timer = null           // 定时器句柄,用于定时刷新数据,卸载时需清除

// 渲染图表数据(async 因为内部要 await 请求接口)
const renderChart = async () => {
  // 请求后端实时数据,_t 传当前时间戳,每次都不同,用于防止浏览器缓存
  const res = await axios.get('/api/kpi-c', { params: { _t: Date.now() } })
  const data = res.data   // 后端返回的数组,每项形如 { window_start, low_volt_count }

  // X 轴数据:从每条记录里取出时间窗口,组成数组
  const xData = data.map(item => item.window_start)
  // Y 轴数据:从每条记录里取出低电压设备数量,组成数组
  const yData = data.map(item => item.low_volt_count)

  // 如果 chart 还没创建(为 null),就执行下面创建实例的代码
  // 只初始化一次图表实例,避免每次刷新都重复创建(否则会内存泄漏/报错)
  if (!chart) {
    chart = echarts.init(chartRef.value)
  }

  // 设置图表配置项(setOption 默认合并模式,重复调用会更新已有图表)
  chart.setOption({
    title: {
      text: '低电压设备数实时趋势',                          // 主标题
      subtext: '统计周期:每 5 秒 | 阈值:电压 < 212V',      // 副标题,说明统计规则
      left: 'center',                                     // 标题水平居中
      top: 10,                                            // 标题距顶部 10px
      subtextStyle: { fontSize: 12, color: '#888' }       // 副标题样式:小号灰字
    },
    // 控制图表绘图区位置(四周留白),避免标题和坐标轴文字重叠
    grid: {
      top: 80,        // 顶部留白,给标题腾位置
      left: 70,       // 左侧留白,给 Y 轴名称和刻度腾位置
      right: 30,
      bottom: 80      // 底部留白,给倾斜的 X 轴文字腾位置
    },
    tooltip: { trigger: 'axis' }, // 提示框:鼠标悬停时按坐标轴触发,显示该时间点数据
    xAxis: {
      type: 'category',           // 类目轴(X 轴是离散的时间窗口,不是连续数值)
      data: xData,                // X 轴刻度数据
      axisLabel: {
        rotate: 35,       // X 轴文字倾斜 35°,防止时间文字相互重叠
        fontSize: 11,
        interval: 0       // 强制显示所有 X 轴标签(默认会自动隐藏一部分)
      }
    },
    yAxis: {
      type: 'value',                                  // 数值轴(Y 轴是连续数量)
      name: '低电压设备数(台)',                       // Y 轴名称
      nameLocation: 'end',                            // 名称显示在轴的末端(顶部)
      nameGap: 10,                                    // 名称与轴的间距
      nameTextStyle: { fontSize: 12, color: '#555' }, // 名称样式
      min: 0              // Y 轴从 0 开始,避免数据被压缩、趋势失真
    },
    series: [{
      type: 'line',                       // 折线图
      name: '低电压设备数',                 // 系列名称(用于 tooltip/图例)
      data: yData,                        // Y 轴对应的数据
      smooth: true,                       // 平滑折线(曲线而非直线连接)
      itemStyle: { color: '#8E44AD' },    // 折线/数据点颜色(紫色)
      // 折线下方填充区域:使用线性渐变,从上到下由深变浅
      areaStyle: {
        color: {
          type: 'linear',                 // 线性渐变
          x: 0, y: 0, x2: 0, y2: 1,       // 渐变方向:从顶部(0,0)到底部(0,1),即垂直向下
          colorStops: [
            { offset: 0, color: 'rgba(142,68,173,0.4)' },   // 顶部:紫色,透明度 0.4(较深)
            { offset: 1, color: 'rgba(142,68,173,0.02)' }   // 底部:紫色,透明度 0.02(几乎透明)
          ]
        }
      },
      label: {
        show: true,                       // 显示每个数据点上方的数值标签
        position: 'top',                  // 标签位置在点的上方
        fontSize: 11
      }
    }]
  })
}

onMounted(() => {
  renderChart()                           // 组件挂载后立即渲染一次,避免首屏空白
  timer = setInterval(renderChart, 5000)  // 每 5 秒调用一次 renderChart,实现实时刷新
})

onUnmounted(() => {
  clearInterval(timer)                    // 组件卸载时清除定时器,防止内存泄漏和无效请求
  chart?.dispose()                        // 销毁图表实例,释放占用的资源(?. 防止 chart 为 null 报错)
})
</script>

<style scoped>
.chart-container {
  width: 100%;                               /* 容器宽度占满父级 */
  height: 400px;                             /* 图表高度固定 400px */
  min-width: 700px;                          /* 最小宽度,防止图表过窄导致 X 轴文字挤在一起 */
  background: #fff;                          /* 白色背景 */
  border: 1px solid #e0e0e0;                 /* 浅灰色边框 */
  border-radius: 15px;                       /* 圆角效果 */
  padding: 10px;                             /* 内边距,让图表不贴边 */
  box-shadow: 0 2px 8px rgba(0, 0, 0, 0.05); /* 轻微阴影,增加层次感 */
}
</style>

3. 启动 Vue 项目

npm run dev

浏览器访问 http://localhost:5173,查看面积图并截图。

六、项目提交

  • 截图

  • 导出的 Flink 项目压缩包;

  • 导出的后端项目压缩包;

  • 导出的前端项目压缩包;



发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1