李翔-大数据技术

Big data technology!

第7章-实验一:模型训练

第7章 设备异常预测实训 — 基于 Spark ML 的随机森林应用


一、实验背景(应用场景)

问题引入: 企业生产中,大量设备需要长期稳定运行。一旦发生故障,轻则生产中断,重则造成重大经济损失。因此,提前预测设备异常、防患于未然,成为现代企业的迫切需求。

解决思路: 引入机器学习技术,让计算机从历史数据中自动学习设备运行规律,从而实现故障预警。


二、教学目标

实验一:设备异常预测 使用 Spark ML 对设备历史数据进行建模,以温度、电压等运行特征为输入,训练随机森林分类模型,实现设备异常状态预测。

实验二:实时数据预测与展示 使用 Spark Streaming 对实时设备数据流进行处理,调用已训练好的预测模型完成设备异常实时识别,并将预测结果写入数据库进行展示与应用。


三、教学重难点

教学重点:

  1. 设备异常预测的重要性 通过机器学习预测设备故障,帮助企业提前发现问题,减少停机时间和维修费用。

  2. 机器学习应用 使用随机森林等算法进行设备异常预测,学会将设备数据转化为预测模型的输入。

  3. 数据处理和特征工程 学会如何清洗数据、选择和处理特征,帮助模型更好地进行预测。

  4. 模型训练与评估 学习如何训练模型并通过评估指标判断模型的准确性和效果。

  5. 实时预测与展示 在实时数据流中使用已训练的模型进行预测,并将预测结果存储和展示。


教学难点:

  1. 模型选择与调优 如何选择合适的机器学习模型,并调整参数以获得最佳预测效果。

  2. 数据不平衡问题 如何处理设备异常数据较少的问题,确保模型能够准确识别异常设备。

  3. 特征工程 从多个设备特征中提取有用的信息并进行处理,帮助模型进行精准预测。

  4. 评估指标的理解 理解不同评估指标的含义,并选择合适的评估标准来衡量模型效果。

  5. 实时数据处理与应用 如何在实时数据流中快速应用预测模型并保持高效和准确。

  6. 数据存储与展示 将预测结果写入数据库并展示在大屏上,确保数据展示及时准确。



四、数据结构说明

讲清:

字段说明
status在线状态
temperature温度
load负载
voltage电压
label是否异常

重点讲:

label = 1 代表异常 模型的任务是学会判断“哪些特征组合容易异常”



五、随机森林基础

什么是随机森林?

随机森林(Random Forest) 是一种常用的机器学习算法,用来解决分类和回归问题。它由多个决策树组成,可以帮助我们做出更准确的预测。

分类 = 选类别;回归 = 算数字

随机森林做分类:判断是否患病、是否违约 【判断:是/否】

随机森林做回归:预测房价、预测收入、预测数值【预测:连续数值】

随机森林 = 多棵决策树一起“投票”做决定


img


六、实验一:使用历史数据训练模型

6.1 工作场景

本实验以设备异常预测为背景,使用随机森林算法对设备的运行数据进行预测,预测设备是否出现异常情况。数据来源于Hive,经过特征工程处理后,使用Spark进行模型训练、评估和预测,最终将预测结果写入ClickHouse数据库供后续展示与分析。

设备状态包括温度、负载、电压等多种监测指标,而异常预测是智能设备管理系统中的核心任务之一。通过准确预测异常,企业可以及时采取措施,避免设备故障带来的损失,提高运营效率。

6.2 实验目标

  1. 数据读取与清洗:从Hive中读取训练数据,了解数据的分布和结构,进行必要的数据清洗与预处理。

  2. 特征工程:将设备的状态信息(如温度、负载、电压)转化为特征向量,为机器学习模型提供输入。

  3. 模型训练:使用随机森林算法训练模型,并通过调整参数(如树的数量和深度)优化模型性能。

  4. 模型检验:对测试集数据进行预测,输出预测结果(如是否异常)以及预测的概率。

  5. 模型评估:计算模型的各项性能指标(如准确率、精确率、召回率、F1分数),评估其预测能力。

  6. 混淆矩阵分析:通过混淆矩阵分析预测的错误情况,了解哪些设备被误判为异常或正常,进而分析模型性能。

  7. 模型保存与版本管理:保存训练好的模型,进行版本管理,便于后续使用最新模型进行预测。

  8. 结果存储与展示:将预测结果写入ClickHouse数据库。

通过这些步骤,实验实现了设备异常预测的全流程,包括数据读取、模型训练、评估、预测和结果存储,最终可利用训练好的模型为将来的实时数据做预测。


9021568c-a374-480b-bbb1-755a490a4334


完整实施的过程

6.3 启动环境

# 启动 Hadoop
start-dfs.sh   # 在master操作
start-yarn.sh  # 在slave1操作


6.4 上传历史数据

# 在HDFS上创建数据目录
hdfs dfs -mkdir  /datas

# 把master上的历史数据上传到HDFS上
hdfs dfs -put /opt/datas/device_status_with_label_2026_01.csv /datas/


6.5 ClickHouse 表设计

启动ClickHouse客户端

# 启动多行模式(在master中操作)
clickhouse-client -m

创建ClickHouse 表

-- 1.创建报表数据库(如果不存在)
CREATE DATABASE IF NOT EXISTS iot_report;

-- 2.创建预测结果表
CREATE TABLE IF NOT EXISTS iot_report.dws_device_pred_detail
(
    model_version String,        -- 模型版本号
    device_id String,            -- 设备ID
    event_time DateTime,         -- 数据时间
    label Nullable(UInt8),       -- 真实标签,允许为 NULL
    prediction UInt8,            -- 预测结果
    prob1 Float64,               -- 异常概率
    created_at DateTime          -- 写入时间
)
ENGINE = MergeTree
ORDER BY (device_id, event_time);

6.6 离线训练 Job

历史标注数据 → 随机森林训练与评估 → 模型存储(HDFS latest) → 离线预测结果落库(用于验证/回放)

项目路径:

image.pngimage-20260215165502790

完整代码:

package com.demo.spark

// ========================================
// 工具库导入区
// ========================================

// Spark SQL:处理表格数据(DataFrame)的核心工具
//   - DataFrame 可以理解为"分布式的 Excel 表格"
//   - SaveMode 控制写入数据库时的行为(覆盖/追加/报错)
//   - SparkSession 是程序进入 Spark 世界的"大门"
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

// Spark 内置函数库,col() / lit() / cast() 等都来自这里
import org.apache.spark.sql.functions._

// Spark ML:机器学习模块
//   - VectorAssembler:把多列特征"打包"成一个向量(模型只认向量)
//   - RandomForestClassifier:随机森林分类器(还没训练的"空白工具")
//   - RandomForestClassificationModel:训练好的随机森林模型(会预测了)
//   - MulticlassClassificationEvaluator:评估模型好坏(算准确率等)
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// vector_to_array:把 Vector 类型转成普通数组,方便取概率值
import org.apache.spark.ml.functions.vector_to_array

// Java 时间工具,用来生成"模型版本号"
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

/**
 * ========================================================
 * 第7章:用随机森林预测设备异常
 * ========================================================
 *
 * 【任务目标】
 *   有一堆历史设备数据(温度/负载/电压 + 已知是不是异常),
 *   训练一个模型,以后给它新数据,它能告诉我们"这台设备有没有异常"。
 *
 * 【整体流程 - 8 个步骤】
 *   1. 读数据      → 从 CSV 文件加载历史设备数据(包括设备的温度、负载、电压等特征,以及标签信息)。
 *   2. 特征工程    → 将数据表中的特征列(如温度、负载、电压等)打包成一个单一的特征向量,便于模型处理。
 *   3. 训练模型    → 使用历史数据训练随机森林模型,学习数据中的规律,以便后续进行预测。
 *   4. 模型预测    → 用训练好的模型对新的测试数据进行预测,判断设备是否异常。
 *   5. 模型评估    → 评估模型在测试集上的表现,计算准确率、精确率、召回率等指标,了解模型的预测效果。
 *   6. 混淆矩阵    → 分析模型预测结果的误差,通过混淆矩阵查看预测错误的类别,以及模型正确预测的情况。
 *   7. 模型保存    → 把训练好的模型存到磁盘,以后能再用
 *   8. 结果入库    → 将测试集的预测结果(包括设备ID、预测结果、异常概率等)写入数据库
 *
 * 【关键概念铺垫】
 *   - 监督学习:用"已知答案"的数据训练模型,以后让它对没答案的数据下判断
 *   - 标签(label):就是"已知答案",这里 0=正常,1=异常
 *   - 特征(features):用来下判断的依据,这里是温度/负载/电压等
 *   - 训练集 vs 测试集:用一部分数据学规律,另一部分数据考试,看学得好不好
 */
object DeviceAbnormalPredictJob {

  def main(args: Array[String]): Unit = {

    // =================================================================
    // 0.参数配置区(把"会变的东西"集中放最前面,方便修改)
    // =================================================================

    // 训练数据的位置
    // 定义历史数据的位置
    val csvPath = "hdfs:///datas/device_status_with_label_2026_01.csv"

    // 是否只用"在线设备"的数据来训练
    // 离线设备数据本来就异常,容易误导模型,所以一般只看在线的
    val filterOnlineOnly = true

    // ClickHouse 数据库的连接参数(用于把预测结果存进去,后面接大屏)
    val ckUrl = "jdbc:clickhouse://master:8123/iot_report?jdbcCompliant=false"
    val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver"
    val ckPredTable = "dws_device_pred_detail"

    // ========== 随机森林的 3 个核心参数 ==========
    // 这 3 个参数会直接影响模型的准确率和速度,要重点理解:

    // numTrees:森林里有多少棵决策树
    //   - 太少(比如 5 棵):投票不充分,容易出错
    //   - 太多(比如 1000 棵):效果提升不大,但训练超慢
    //   - 经验值:50~200 棵之间
    val numTrees = 80

    // maxDepth:每棵树最多能"问"多少层问题
    //   - 太浅(比如 3 层):树太简单,学不到复杂规律 → 欠拟合
    //   - 太深(比如 20 层):树死记硬背训练数据 → 过拟合,新数据预测不准
    //   - 经验值:5~10 层
    val maxDepth = 8

    // seed:随机种子
    //   随机森林训练时有很多"随机"步骤(抽样、选特征),
    //   设置同一个 seed,每次跑出来的结果就完全一样,方便调试和复现
    val seed = 42

    // 模型保存路径(HDFS 是 Hadoop 分布式文件系统,可以理解为大数据版的 U 盘)
    val modelBasePath = "hdfs:///model/device_abnormal"

    // 设置 HDFS 操作权限,以 root 身份读写,避免权限报错
    System.setProperty("HADOOP_USER_NAME", "root")

    // =================================================================
    // 创建 SparkSession:启动 Spark 程序的"开机按钮"
    // =================================================================
    // 任何 Spark 程序的第一步,都是建一个 SparkSession 对象。
    // 它代表"我和 Spark 集群之间的一次会话",后面所有操作都从它开始。
    val spark = SparkSession.builder()
      .appName("RandomForest-Abnormal")  // 程序名称,在 Spark UI 里能看到
      .master("local[*]")                // local[*] = 在本机跑,用所有 CPU 核心
                                         // 上线时改成 yarn 等集群模式
      .getOrCreate()                     // 获取已有的或创建新的 Session

    // 只打印 ERROR 级别日志,屏蔽掉 INFO/WARN 的"日志洪流"
    spark.sparkContext.setLogLevel("ERROR")

    // 引入隐式转换,这样才能用 $"列名" 这种简写语法
    import spark.implicits._

    // =================================================================
    // 步骤 1:读取训练数据
    // =================================================================
    println("\n================= 1:读取训练数据 =================")

    // spark.read 是一个"读数据的工具",通过链式调用配置读取选项
    val raw = spark.read
      .option("header", "true")        // 第一行是列名(温度、负载...),不是数据
      .option("inferSchema", "true")   // 自动猜测每列的类型(数字 / 字符串 / ...)
                                       // 不写这行,默认所有列都是字符串
      .csv(csvPath)                    // 读取指定路径的 CSV 文件

    // printSchema() 打印表的结构(每列的名字和类型)
    // 用来确认"读进来的数据格式对不对"
    println("【1.1 训练用的原始数据的结构(Schema)】")
    raw.printSchema()

    // show(n) 显示前 n 行数据,truncate=false 表示不截断长内容
    println("【1.2 原始数据前5条】")
    raw.show(5, truncate = false)

    // ========== 数据清洗 ==========
    // 真实数据经常有问题(类型不对、有空值),不能直接拿去训练。
    // 这里做两件事:
    //   1) 用 .cast("int") 强制把列转成正确的数据类型
    //   2) 用 .na.drop() 删除任何含空值的行
    val df0 = raw.select(
      col("device_id"),                                       // 设备ID(字符串)
      col("event_time"),                                      // 事件时间
      col("status").cast("int").as("status"),                 // 在线状态(0或1)
      col("temperature").cast("double").as("temperature"),    // 温度(小数)
      col("load").cast("double").as("load"),                  // 负载(小数)
      col("voltage").cast("double").as("voltage"),            // 电压(小数)
      col("label").cast("int").as("label")                    // 标签:0=正常,1=异常
    ).na.drop()

    // 根据开关决定是否过滤离线数据
    // 如果 filterOnlineOnly 为 true,就只保留在线设备;否则保留全部数据。  
    // $"status" === 1 是 Spark 的写法,等价于 SQL 的 status = 1
    val df: DataFrame =
      if (filterOnlineOnly) df0.filter($"status" === 1) else df0

    // ========== 检查标签分布 ==========
    // 训练前一定要看正负样本比例。如果 99% 都是正常,1% 异常,
    // 模型可能会偷懒"全部预测正常"也能拿 99% 准确率,但毫无用处!
    // 这种情况叫"类别不平衡",需要专门处理(本代码暂不处理)。
    println("【1.3 label 分布(0=正常,1=异常)】")
    df.groupBy("label").count().orderBy("label").show()
 
    // 【1.3 label 分布(0=正常,1=异常)】
    //  +-----+-----+
    //  |label|count|
    //  +-----+-----+
    //  |    0|  800|
    //  |    1|  200|
    //  +-----+-----+  

    // =================================================================
    // 步骤 2:特征工程 —— 把表格列打包成"特征向量"
    // =================================================================
    println("\n================= 2:特征工程 =================")
    // 把多列特征合并成一个特征向量列 features,方便 Spark ML 训练模型使用。  

    // 【为什么要这一步?】
    // Spark ML 的所有模型(包括随机森林)都有一个硬性要求:
    // 输入只能是【一列】,这一列的每个格子里装的是【一个向量】。
    // 向量:一组按顺序排列的数值集合,如数组 [1.0,30.35,0.57,218.0]
    // 所以即使你有 4 个特征,也不能直接喂给模型,
    // 必须先把这 4 列【打包成一列】才行。这就叫"向量装配"。

    // 选定要打包的特征列(就是 4 个字符串名)
    val featureCols = Array("status", "temperature", "load", "voltage")

    // ========== 创建 VectorAssembler "组装机" ==========
    // VectorAssembler 翻译过来就是"向量组装器",
    // 它专门干一件事:把多列数值合成一列向量。
    //
    // assembler 是变量名(可以随便起),代表"一台配置好的向量组装机"。
    // 用 .setXxx() 给它"上岗培训":
    val assembler = new VectorAssembler()
      .setInputCols(featureCols)   // 告诉它:要打包哪几列
      .setOutputCol("features")    // 告诉它:打包后的新列叫 "features"

    // ========== 用组装机处理数据 ==========
    // 第①件事:assembler.transform(df) ——【打包】
    //   把 df 交给组装机,它在表的【末尾追加一列】features,原来的列一个不少。
    //
    //   df 进去时是 7 列:
    //     device_id | event_time | status | temperature | load | voltage | label
    //
    //   transform 后变成 8 列:
    //     device_id | event_time | status | temperature | load | voltage | label | features
    //                                                                              ↑↑↑↑↑↑↑↑
    //                                                                          [1.0,30.35,0.57,218.0]
    //   注意:transform 只【加】列,不删列!
    //
    // 然后 .select(...) 只保留我们后面用得到的 4 列,
    // 原来的 status/temperature/... 已经包在 features 里,不需要了
    val data = assembler.transform(df)
      .select("device_id", "event_time", "features", "label")

    // 显示前 5 条,确认 features 列已经生成
    // 你会看到形如 [1.0,30.35,0.57,218.0] 这样的向量,
    // 顺序和 featureCols 里写的一样:[status, temperature, load, voltage]
    println("【2.1 features 向量生成后前5条】")
    data.show(5, truncate = false)

    // =================================================================
    // 步骤 3:训练随机森林模型
    // =================================================================
    println("\n================= 3:训练随机森林模型 =================")

    // ========== 切分训练集 / 测试集 ==========
    // 【为什么要切分?】
    // 不能用同一份数据"训练+考试",那样模型会作弊(死记硬背)。
    // 所以要留一部分数据【藏起来】,训练完再拿出来考它,看真本事。
    //
    // randomSplit(Array(0.8, 0.2), seed) 表示:
    //   - 随机把 data 拆成两份,80% 用来训练,20% 用来测试
    //   - seed 就是让随机分数据每次都分得一样,方便老师、同学、自己重复运行时得到相同结果。
    //
    // val Array(trainDF, testDF) = ... 是 Scala 的"模式匹配"语法,
    // 把返回的数组直接拆给两个变量
 
    val Array(trainDF, testDF) = data.randomSplit(Array(0.8, 0.2), seed)

    println(s"【3.1 训练集数量】${trainDF.count()},【测试集数量】${testDF.count()}")

    // ========== 创建一个"还没训练"的随机森林分类器 ==========
    // 注意:此时 rf 是个【空模型】,还啥都不会,只是"配置好了规则"
    val rf = new RandomForestClassifier()
      .setLabelCol("label")          // 标签列叫什么(模型要学的"答案")
      .setFeaturesCol("features")    // 特征列叫什么(模型用来判断的依据)
      .setNumTrees(numTrees)         // 用 80 棵树
      .setMaxDepth(maxDepth)         // 每棵树最深 8 层
      .setSeed(seed)                 // 随机种子

    // ========== 训练!这是机器学习的核心一行 ==========
    // rf.fit(trainDF) 的意思是:
    // "rf 这个空白分类器,你给我吃下 trainDF 这份数据,自己去学规律,
    //  学完吐一个【训练好的模型】出来给我"
    //
    // 这一行内部做了什么?(简化版)
    //   1. 从 trainDF 有放回抽 80 份子样本 (Bootstrap)
    //   2. 每份子样本独立(训练)长出一棵决策树,
    //      每次判断时, 只随机看部分特征, 从中选最能区分正常/异常的问题
    //   3. 80 棵树训练完,组成一个"森林" → 这就是随机森林模型
    //
    // ⚠️ 注意区分两个动词:
    //   - fit  = 训练,让"空白工具"通过数据学规律 → 产出"会做事的模型"
    //   - transform = 让"会做事的模型"对数据做处理(比如预测)
    val model = rf.fit(trainDF)
    println(s"✅ 3.2 模型训练完成(numTrees=$numTrees, maxDepth=$maxDepth)")

    // =================================================================
    // 步骤 4:模型预测
    // =================================================================
    println("\n================= 4:模型预测 =================")

    // ========== 让训练好的模型对测试集做判断 ==========
    // model.transform(testDF):把 testDF 每一行交给模型预测,
    // 在testDF原表后面【追加 3 列】结果:
    //   - rawPrediction:原始打分(80 棵树的加权得票,未归一化)
    //   - probability  :概率向量,如 [0.85, 0.15] 表示 85% 正常 / 15% 异常
    //   - prediction   :最终判定(取概率大的那类),0=正常,1=异常
    //
    // 注意:testDF 里有 label 列,但模型预测时【不看】它,
    // 这样后面才能用 label 和 prediction 公平对比,看猜得准不准。
    val pred = model.transform(testDF)
      // ========== 提取异常概率 ==========
      // probability 是一个长度为 2 的向量,例如 [0.85, 0.15]:
      //   - 第 0 个数 = 预测为"正常(0)"的概率
      //   - 第 1 个数 = 预测为"异常(1)"的概率
      //
      // 我们更关心"异常的概率",所以用 vector_to_array() 把向量转成数组,
      // 再用 .getItem(1) 取第 1 个元素,新增成一列叫 prob1。
      // 这样以后做"高风险报警"时,可以拿 prob1 > 某阈值 来过滤
      .withColumn("prob1", vector_to_array(col("probability")).getItem(1))

    println("【4.1 预测结果前10条(prediction + prob1)】")
    // 显示对比:label=真实答案,prediction=模型预测,prob1=模型自信度
    pred.select("device_id", "event_time", "label", "prediction", "prob1", "probability")
      .show(10, truncate = false)

    // =================================================================
    // 步骤 5:模型评估
    // =================================================================
    println("\n================= 5:模型评估 =================")

    // ========== 定义一个【小函数】用来算各种指标 ==========
    // 因为 4 个指标的算法都一样,只是名字不同,所以包装成函数复用。
    // def 在 Scala 里就是"定义函数"的关键字。
    def eval(metric: String): Double = {
      new MulticlassClassificationEvaluator()
        .setLabelCol("label")            // 真实答案在 label 列
        .setPredictionCol("prediction")  // 预测结果在 prediction列
        .setMetricName(metric)           // 算哪个指标(传进来的字符串)
        .evaluate(pred)                  // 对 pred 这份预测数据做评估
    }

    // ========== 4 个核心指标解释 ==========
    // 假设 100 个设备,实际 30 个异常 70 个正常:
    //
    // 1️⃣ Accuracy(准确率)= 猜对的总数 / 总数
    //    - 直观,但样本不平衡时会骗人
    //    - 比如全猜正常也能拿 70%
    //
    // 2️⃣ Precision(精确率)= 猜是异常的里面,真异常的比例
    //    - 关心:"我说异常的,有多少是真异常?"(不冤枉好设备)
    //
    // 3️⃣ Recall(召回率)= 实际异常的里面,被我抓出来的比例
    //    - 关心:"真异常的,我抓出来了多少?"(不放过坏设备)
    //
    // 4️⃣ F1 = Precision 和 Recall 的调和平均
    //    - 综合考虑两个,适合不平衡数据
    val accuracy  = eval("accuracy")
    val precision = eval("weightedPrecision")
    val recall    = eval("weightedRecall")
    val f1        = eval("f1")

    // f"..."  是 Scala 的格式化字符串,%.4f 表示保留 4 位小数
    println(f"【5.1 Accuracy(准确率) 】 = $accuracy%.4f")
    println(f"【5.2 Precision(精确率)】 = $precision%.4f")
    println(f"【5.3 Recall(召回率)  】 = $recall%.4f")
    println(f"【5.4 F1                 】 = $f1%.4f")

    // =================================================================
    // 步骤 6:混淆矩阵分析
    // =================================================================
    println("\n================= 6:混淆矩阵分析 =================")
    println("【6.1 混淆矩阵(label=真实值,prediction=预测值)】")

    // ========== 什么是混淆矩阵? ==========
    // 是一张 2×2 的小表,把 4 种可能的"真实 vs 预测"组合数出来:
    //
    //                    预测=正常(0)         预测=异常(1)
    //   真实=正常(0)    TN 真负例(对了)      FP 假正例(错把正常当异常)
    //   真实=异常(1)    FN 假负例(漏报)      TP 真正例(对了)
    //
    // 通过 groupBy + count 就能把这 4 种数量分别数出来
    pred.groupBy("label", "prediction")
      .count()
      .orderBy("label", "prediction")
      .show()

    // ========== 怎么读这张表? ==========
    // 比如下面这样的输出:
    //   +-----+----------+------+
    //   |label|prediction|count |
    //   +-----+----------+------+
    //   |  0  |    0     |281039|  ← TN:正常被正确识别(对)
    //   |  0  |    1     |  1285|  ← FP:正常被误报为异常(错)→ 误报浪费维修人力
    //   |  1  |    0     |  1587|  ← FN:异常被漏掉了      (错)→ 漏报后果严重!
    //   |  1  |    1     |134824|  ← TP:异常被成功抓出    (对)
    //   +-----+----------+------+
    //
    // 工业场景下,FN(漏报)往往比 FP(误报)更致命,
    // 因为漏报意味着设备真坏了没人发现 → 可能酿成事故

    // =================================================================
    // 步骤 7:模型保存与版本管理
    // =================================================================
    println("\n================= 7:模型保存与版本管理 =================")

    // ========== 为什么要做版本管理? ==========
    // 训练好的模型不是终点,它要被"部署"上线、被反复调用。
    // 但模型会不断更新(新数据 → 新模型),需要管理多个版本:
    //   - 出问题了能回滚到上个版本
    //   - 知道哪个版本对应哪天的数据
    //   - 线上系统总是用 latest 这个固定别名,后台静悄悄换模型

    // 用当前时间戳生成版本号,如 "rf_20260128_153045"
    val ver = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"))
    val modelVersion = s"rf_$ver"        // s"..." 是字符串模板,$变量 会被替换
    val modelPath = s"$modelBasePath/$modelVersion"

    println(s"【7.1 本次模型版本号】$modelVersion")
    println(s"【7.2 模型保存路径】$modelPath")

    // ========== 保存模型(策略 1:版本归档) ==========
    // model.write.overwrite() = 如果路径已存在则覆盖,否则会报错
    // 每次训练都会留一份带时间戳的归档 → 历史可追溯
    model.write.overwrite().save(modelPath)
    println(s"【7.3 模型版本保存成功】$modelPath")

    // ========== 保存模型(策略 2:latest 别名) ==========
    // 同时再存一份到固定路径 .../latest,
    // 这样线上代码只需 load(".../latest") 就能拿到最新模型,不用改代码
    val latestPath = s"$modelBasePath/latest"
    model.write.overwrite().save(latestPath)
    println(s"   latest 已更新:$latestPath(version=$modelVersion)")

    // ========== 验证模型可以重新加载 ==========
    // 保存了不一定就能用,要 load 回来再做一次预测验证。
    // 这是"上线前自检",防止模型保存损坏导致线上故障
    val loaded = RandomForestClassificationModel.load(modelPath)
    val pred2 = loaded.transform(testDF)

    println("【7.4 加载模型验证(前5条)】")
    pred2.select("label", "prediction", "probability").show(5, truncate = false)

    println(s"✅ 7.5 模型加载验证成功:$modelVersion")

    // =================================================================
    // 步骤 8:预测结果入库 
    // =================================================================
    println("\n================= 8:预测结果入库 =================")

    // ========== 为什么要入库? ==========
    // 模型在 Spark 里跑出来的 pred 只存在内存里,程序一停就没了。
    // 业务大屏需要长期、稳定地查询预测结果,所以要存进数据库(ClickHouse)。

    val createdAt = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))

    // ========== 选出要存的列,并给它们改成数据库表对应的格式 ==========
    // lit(xxx) = "literal" 字面量,把固定值变成一列(每行都填一样的值)
    // 比如 lit(modelVersion) 就是给每行都填上当前模型版本号
    val predOut = pred.select(
      lit(modelVersion).as("model_version"),         // 模型版本号(每行都一样)
      col("device_id"),                              // 设备ID
      col("event_time"),                             // 事件时间
      col("label").cast("int").as("label"),          // 真实标签
      col("prediction").cast("int").as("prediction"),// 模型预测
      col("prob1").cast("double").as("prob1"),       // 异常概率
      lit(createdAt).as("created_at")                // 入库时间
    )

    println("【8.1 入库数据预览(前10条)】")
    predOut.show(10, truncate = false)

    // ========== 通过 JDBC 写入 ClickHouse ==========
    // SaveMode.Append = 追加模式(不删原表数据,只往后接)
    //   其他可选:Overwrite(覆盖)、ErrorIfExists(报错)、Ignore(忽略)
    predOut.write
      .format("jdbc")                          // 用 JDBC 协议写
      .option("url", ckUrl)                    // 数据库地址
      .option("driver", ckDriver)              // 驱动类
      .option("dbtable", ckPredTable)          // 写入哪张表
      .mode(SaveMode.Append)                   // 追加模式
      .save()                                  // 执行写入

    println(s"✅ 8.2 预测结果已写入 ClickHouse:$ckPredTable(model_version=$modelVersion)")

    // ========== 关闭 Spark 会话,释放资源 ==========
    // 不写也行,程序结束自动会关。但显式关闭是好习惯,
    // 尤其在长时间运行的任务里,可以及时释放集群资源给别人用
    spark.stop()
  }
}


6.7 验证结果

查看Clickhouse数据表

-- 查看 event_time 最新的10条预测记录
SELECT *
FROM iot_report.dws_device_pred_detail
ORDER BY event_time DESC
LIMIT 10;


-- 查看结果
┌─model_version──────┬─device_id──┬──────────event_time─┬─label─┬─prediction─┬────────────────prob1─┬──────────created_at─┐
│ rf_20260215_192856 │ device-0012026-01-30 23:59:00 │     0 │          00.0134462883636347862026-02-15 19:29:08
│ rf_20260215_190927 │ device-0112026-01-30 23:59:00 │     1 │          1 │    0.9504035291044342026-02-15 19:09:40
│ rf_20260215_192856 │ device-0052026-01-30 23:59:00 │     1 │          1 │                    12026-02-15 19:29:08
│ rf_20260215_190927 │ device-0012026-01-30 23:59:00 │     0 │          00.0134462883636347862026-02-15 19:09:40
│ rf_20260215_192856 │ device-0112026-01-30 23:59:00 │     1 │          1 │    0.9504035291044342026-02-15 19:29:08
│ rf_20260215_191430 │ device-0232026-01-30 23:59:00 │     0 │          00.0068018224551317632026-02-15 19:14:43
│ rf_20260215_192856 │ device-0152026-01-30 23:59:00 │     0 │          00.0080666356931232522026-02-15 19:29:08
│ rf_20260215_191923 │ device-0112026-01-30 23:59:00 │     1 │          1 │    0.9504035291044342026-02-15 19:19:34
│ rf_20260215_191923 │ device-0012026-01-30 23:59:00 │     0 │          00.0134462883636347862026-02-15 19:19:34
│ rf_20260215_191923 │ device-0232026-01-30 23:59:00 │     0 │          00.0068018224551317632026-02-15 19:19:34
└────────────────────┴────────────┴─────────────────────┴───────┴────────────┴──────────────────────┴─────────────────────┘

解释:

一、字段含义解释

字段含义
model_version使用的模型版本号
device_id设备编号
event_time设备数据时间
label真实标签(0=正常,1=异常)
prediction模型预测结果
prob1预测为异常的概率
created_at预测结果写入数据库时间

二、从整体上看这 10 条数据

① 同一个时间点的数据

👉 这些记录来自同一个设备数据时间(2026-01-30 23:59:00),属于同一批测试数据,而不是实时数据。

② 存在多个模型版本

👉 不同模型版本分别对同一批数据进行了预测,因此同一设备会出现多条记录。


三、逐条逻辑解释

1️⃣ 正常设备示例(device-001)

👉 真实是正常,模型预测也为正常,异常概率仅 1.3%,判断非常稳定。

2️⃣ 高风险异常示例(device-011)

👉 真实是异常,模型预测为异常,异常概率达 95%,模型判断较有信心。

3️⃣ 极端异常示例(device-005)

👉 所有决策树都投票为异常,因此模型给出 100% 的异常概率。

4️⃣ 低风险正常示例(device-023)

👉 真实是正常,模型预测为正常,异常概率不足 1%,属于非常安全的样本。

-- 查看异常概率在 0.95~0.99 区间的前10条记录(按概率从高到低排序)
SELECT *
FROM iot_report.dws_device_pred_detail
WHERE prob1 BETWEEN 0.95 AND 0.99
ORDER BY prob1 DESC
LIMIT 10;


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1