一、实验背景(应用场景)
问题引入: 企业生产中,大量设备需要长期稳定运行。一旦发生故障,轻则生产中断,重则造成重大经济损失。因此,提前预测设备异常、防患于未然,成为现代企业的迫切需求。
解决思路: 引入机器学习技术,让计算机从历史数据中自动学习设备运行规律,从而实现故障预警。
二、教学目标
实验一:设备异常预测 使用 Spark ML 对设备历史数据进行建模,以温度、电压等运行特征为输入,训练随机森林分类模型,实现设备异常状态预测。
实验二:实时数据预测与展示 使用 Spark Streaming 对实时设备数据流进行处理,调用已训练好的预测模型完成设备异常实时识别,并将预测结果写入数据库进行展示与应用。
三、教学重难点
教学重点:
设备异常预测的重要性: 通过机器学习预测设备故障,帮助企业提前发现问题,减少停机时间和维修费用。
机器学习应用: 使用随机森林等算法进行设备异常预测,学会将设备数据转化为预测模型的输入。
数据处理和特征工程: 学会如何清洗数据、选择和处理特征,帮助模型更好地进行预测。
模型训练与评估: 学习如何训练模型并通过评估指标判断模型的准确性和效果。
实时预测与展示: 在实时数据流中使用已训练的模型进行预测,并将预测结果存储和展示。
教学难点:
模型选择与调优: 如何选择合适的机器学习模型,并调整参数以获得最佳预测效果。
数据不平衡问题: 如何处理设备异常数据较少的问题,确保模型能够准确识别异常设备。
特征工程: 从多个设备特征中提取有用的信息并进行处理,帮助模型进行精准预测。
评估指标的理解: 理解不同评估指标的含义,并选择合适的评估标准来衡量模型效果。
实时数据处理与应用: 如何在实时数据流中快速应用预测模型并保持高效和准确。
数据存储与展示: 将预测结果写入数据库并展示在大屏上,确保数据展示及时准确。
四、数据结构说明
讲清:
字段 说明 status 在线状态 temperature 温度 load 负载 voltage 电压 label 是否异常
重点讲:
label = 1 代表异常 模型的任务是学会判断“哪些特征组合容易异常”
五、随机森林基础
什么是随机森林?
随机森林(Random Forest) 是一种常用的机器学习算法,用来解决分类和回归问题。它由多个决策树组成,可以帮助我们做出更准确的预测。
分类 = 选类别;回归 = 算数字
随机森林做分类:判断是否患病、是否违约 【判断:是/否】
随机森林做回归:预测房价、预测收入、预测数值【预测:连续数值】
随机森林 = 多棵决策树一起“投票”做决定

六、实验一:使用历史数据训练模型
6.1 工作场景
本实验以设备异常预测为背景,使用随机森林算法对设备的运行数据进行预测,预测设备是否出现异常情况。数据来源于Hive,经过特征工程处理后,使用Spark进行模型训练、评估和预测,最终将预测结果写入ClickHouse数据库供后续展示与分析。
设备状态包括温度、负载、电压等多种监测指标,而异常预测是智能设备管理系统中的核心任务之一。通过准确预测异常,企业可以及时采取措施,避免设备故障带来的损失,提高运营效率。
6.2 实验目标
数据读取与清洗:从Hive中读取训练数据,了解数据的分布和结构,进行必要的数据清洗与预处理。
特征工程:将设备的状态信息(如温度、负载、电压)转化为特征向量,为机器学习模型提供输入。
模型训练:使用随机森林算法训练模型,并通过调整参数(如树的数量和深度)优化模型性能。
模型检验:对测试集数据进行预测,输出预测结果(如是否异常)以及预测的概率。
模型评估:计算模型的各项性能指标(如准确率、精确率、召回率、F1分数),评估其预测能力。
混淆矩阵分析:通过混淆矩阵分析预测的错误情况,了解哪些设备被误判为异常或正常,进而分析模型性能。
模型保存与版本管理:保存训练好的模型,进行版本管理,便于后续使用最新模型进行预测。
结果存储与展示:将预测结果写入ClickHouse数据库。
通过这些步骤,实验实现了设备异常预测的全流程,包括数据读取、模型训练、评估、预测和结果存储,最终可利用训练好的模型为将来的实时数据做预测。

完整实施的过程
6.3 启动环境
# 启动 Hadoop start-dfs.sh # 在master操作 start-yarn.sh # 在slave1操作
6.4 上传历史数据
# 在HDFS上创建数据目录 hdfs dfs -mkdir /datas # 把master上的历史数据上传到HDFS上 hdfs dfs -put /opt/datas/device_status_with_label_2026_01.csv /datas/
6.5 ClickHouse 表设计
启动ClickHouse客户端
# 启动多行模式(在master中操作) clickhouse-client -m
创建ClickHouse 表
-- 1.创建报表数据库(如果不存在) CREATE DATABASE IF NOT EXISTS iot_report; -- 2.创建预测结果表 CREATE TABLE IF NOT EXISTS iot_report.dws_device_pred_detail ( model_version String, -- 模型版本号 device_id String, -- 设备ID event_time DateTime, -- 数据时间 label Nullable(UInt8), -- 真实标签,允许为 NULL prediction UInt8, -- 预测结果 prob1 Float64, -- 异常概率 created_at DateTime -- 写入时间 ) ENGINE = MergeTree ORDER BY (device_id, event_time);
6.6 离线训练 Job
历史标注数据 → 随机森林训练与评估 → 模型存储(HDFS latest) → 离线预测结果落库(用于验证/回放)
项目路径:


完整代码:
package com.demo.spark
// ========================================
// 工具库导入区
// ========================================
// Spark SQL:处理表格数据(DataFrame)的核心工具
// - DataFrame 可以理解为"分布式的 Excel 表格"
// - SaveMode 控制写入数据库时的行为(覆盖/追加/报错)
// - SparkSession 是程序进入 Spark 世界的"大门"
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
// Spark 内置函数库,col() / lit() / cast() 等都来自这里
import org.apache.spark.sql.functions._
// Spark ML:机器学习模块
// - VectorAssembler:把多列特征"打包"成一个向量(模型只认向量)
// - RandomForestClassifier:随机森林分类器(还没训练的"空白工具")
// - RandomForestClassificationModel:训练好的随机森林模型(会预测了)
// - MulticlassClassificationEvaluator:评估模型好坏(算准确率等)
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// vector_to_array:把 Vector 类型转成普通数组,方便取概率值
import org.apache.spark.ml.functions.vector_to_array
// Java 时间工具,用来生成"模型版本号"
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
/**
* ========================================================
* 第7章:用随机森林预测设备异常
* ========================================================
*
* 【任务目标】
* 有一堆历史设备数据(温度/负载/电压 + 已知是不是异常),
* 训练一个模型,以后给它新数据,它能告诉我们"这台设备有没有异常"。
*
* 【整体流程 - 8 个步骤】
* 1. 读数据 → 从 CSV 文件加载历史设备数据(包括设备的温度、负载、电压等特征,以及标签信息)。
* 2. 特征工程 → 将数据表中的特征列(如温度、负载、电压等)打包成一个单一的特征向量,便于模型处理。
* 3. 训练模型 → 使用历史数据训练随机森林模型,学习数据中的规律,以便后续进行预测。
* 4. 模型预测 → 用训练好的模型对新的测试数据进行预测,判断设备是否异常。
* 5. 模型评估 → 评估模型在测试集上的表现,计算准确率、精确率、召回率等指标,了解模型的预测效果。
* 6. 混淆矩阵 → 分析模型预测结果的误差,通过混淆矩阵查看预测错误的类别,以及模型正确预测的情况。
* 7. 模型保存 → 把训练好的模型存到磁盘,以后能再用
* 8. 结果入库 → 将测试集的预测结果(包括设备ID、预测结果、异常概率等)写入数据库
*
* 【关键概念铺垫】
* - 监督学习:用"已知答案"的数据训练模型,以后让它对没答案的数据下判断
* - 标签(label):就是"已知答案",这里 0=正常,1=异常
* - 特征(features):用来下判断的依据,这里是温度/负载/电压等
* - 训练集 vs 测试集:用一部分数据学规律,另一部分数据考试,看学得好不好
*/
object DeviceAbnormalPredictJob {
def main(args: Array[String]): Unit = {
// =================================================================
// 0.参数配置区(把"会变的东西"集中放最前面,方便修改)
// =================================================================
// 训练数据的位置
// 定义历史数据的位置
val csvPath = "hdfs:///datas/device_status_with_label_2026_01.csv"
// 是否只用"在线设备"的数据来训练
// 离线设备数据本来就异常,容易误导模型,所以一般只看在线的
val filterOnlineOnly = true
// ClickHouse 数据库的连接参数(用于把预测结果存进去,后面接大屏)
val ckUrl = "jdbc:clickhouse://master:8123/iot_report?jdbcCompliant=false"
val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver"
val ckPredTable = "dws_device_pred_detail"
// ========== 随机森林的 3 个核心参数 ==========
// 这 3 个参数会直接影响模型的准确率和速度,要重点理解:
// numTrees:森林里有多少棵决策树
// - 太少(比如 5 棵):投票不充分,容易出错
// - 太多(比如 1000 棵):效果提升不大,但训练超慢
// - 经验值:50~200 棵之间
val numTrees = 80
// maxDepth:每棵树最多能"问"多少层问题
// - 太浅(比如 3 层):树太简单,学不到复杂规律 → 欠拟合
// - 太深(比如 20 层):树死记硬背训练数据 → 过拟合,新数据预测不准
// - 经验值:5~10 层
val maxDepth = 8
// seed:随机种子
// 随机森林训练时有很多"随机"步骤(抽样、选特征),
// 设置同一个 seed,每次跑出来的结果就完全一样,方便调试和复现
val seed = 42
// 模型保存路径(HDFS 是 Hadoop 分布式文件系统,可以理解为大数据版的 U 盘)
val modelBasePath = "hdfs:///model/device_abnormal"
// 设置 HDFS 操作权限,以 root 身份读写,避免权限报错
System.setProperty("HADOOP_USER_NAME", "root")
// =================================================================
// 创建 SparkSession:启动 Spark 程序的"开机按钮"
// =================================================================
// 任何 Spark 程序的第一步,都是建一个 SparkSession 对象。
// 它代表"我和 Spark 集群之间的一次会话",后面所有操作都从它开始。
val spark = SparkSession.builder()
.appName("RandomForest-Abnormal") // 程序名称,在 Spark UI 里能看到
.master("local[*]") // local[*] = 在本机跑,用所有 CPU 核心
// 上线时改成 yarn 等集群模式
.getOrCreate() // 获取已有的或创建新的 Session
// 只打印 ERROR 级别日志,屏蔽掉 INFO/WARN 的"日志洪流"
spark.sparkContext.setLogLevel("ERROR")
// 引入隐式转换,这样才能用 $"列名" 这种简写语法
import spark.implicits._
// =================================================================
// 步骤 1:读取训练数据
// =================================================================
println("\n================= 1:读取训练数据 =================")
// spark.read 是一个"读数据的工具",通过链式调用配置读取选项
val raw = spark.read
.option("header", "true") // 第一行是列名(温度、负载...),不是数据
.option("inferSchema", "true") // 自动猜测每列的类型(数字 / 字符串 / ...)
// 不写这行,默认所有列都是字符串
.csv(csvPath) // 读取指定路径的 CSV 文件
// printSchema() 打印表的结构(每列的名字和类型)
// 用来确认"读进来的数据格式对不对"
println("【1.1 训练用的原始数据的结构(Schema)】")
raw.printSchema()
// show(n) 显示前 n 行数据,truncate=false 表示不截断长内容
println("【1.2 原始数据前5条】")
raw.show(5, truncate = false)
// ========== 数据清洗 ==========
// 真实数据经常有问题(类型不对、有空值),不能直接拿去训练。
// 这里做两件事:
// 1) 用 .cast("int") 强制把列转成正确的数据类型
// 2) 用 .na.drop() 删除任何含空值的行
val df0 = raw.select(
col("device_id"), // 设备ID(字符串)
col("event_time"), // 事件时间
col("status").cast("int").as("status"), // 在线状态(0或1)
col("temperature").cast("double").as("temperature"), // 温度(小数)
col("load").cast("double").as("load"), // 负载(小数)
col("voltage").cast("double").as("voltage"), // 电压(小数)
col("label").cast("int").as("label") // 标签:0=正常,1=异常
).na.drop()
// 根据开关决定是否过滤离线数据
// 如果 filterOnlineOnly 为 true,就只保留在线设备;否则保留全部数据。
// $"status" === 1 是 Spark 的写法,等价于 SQL 的 status = 1
val df: DataFrame =
if (filterOnlineOnly) df0.filter($"status" === 1) else df0
// ========== 检查标签分布 ==========
// 训练前一定要看正负样本比例。如果 99% 都是正常,1% 异常,
// 模型可能会偷懒"全部预测正常"也能拿 99% 准确率,但毫无用处!
// 这种情况叫"类别不平衡",需要专门处理(本代码暂不处理)。
println("【1.3 label 分布(0=正常,1=异常)】")
df.groupBy("label").count().orderBy("label").show()
// 【1.3 label 分布(0=正常,1=异常)】
// +-----+-----+
// |label|count|
// +-----+-----+
// | 0| 800|
// | 1| 200|
// +-----+-----+
// =================================================================
// 步骤 2:特征工程 —— 把表格列打包成"特征向量"
// =================================================================
println("\n================= 2:特征工程 =================")
// 把多列特征合并成一个特征向量列 features,方便 Spark ML 训练模型使用。
// 【为什么要这一步?】
// Spark ML 的所有模型(包括随机森林)都有一个硬性要求:
// 输入只能是【一列】,这一列的每个格子里装的是【一个向量】。
// 向量:一组按顺序排列的数值集合,如数组 [1.0,30.35,0.57,218.0]
// 所以即使你有 4 个特征,也不能直接喂给模型,
// 必须先把这 4 列【打包成一列】才行。这就叫"向量装配"。
// 选定要打包的特征列(就是 4 个字符串名)
val featureCols = Array("status", "temperature", "load", "voltage")
// ========== 创建 VectorAssembler "组装机" ==========
// VectorAssembler 翻译过来就是"向量组装器",
// 它专门干一件事:把多列数值合成一列向量。
//
// assembler 是变量名(可以随便起),代表"一台配置好的向量组装机"。
// 用 .setXxx() 给它"上岗培训":
val assembler = new VectorAssembler()
.setInputCols(featureCols) // 告诉它:要打包哪几列
.setOutputCol("features") // 告诉它:打包后的新列叫 "features"
// ========== 用组装机处理数据 ==========
// 第①件事:assembler.transform(df) ——【打包】
// 把 df 交给组装机,它在表的【末尾追加一列】features,原来的列一个不少。
//
// df 进去时是 7 列:
// device_id | event_time | status | temperature | load | voltage | label
//
// transform 后变成 8 列:
// device_id | event_time | status | temperature | load | voltage | label | features
// ↑↑↑↑↑↑↑↑
// [1.0,30.35,0.57,218.0]
// 注意:transform 只【加】列,不删列!
//
// 然后 .select(...) 只保留我们后面用得到的 4 列,
// 原来的 status/temperature/... 已经包在 features 里,不需要了
val data = assembler.transform(df)
.select("device_id", "event_time", "features", "label")
// 显示前 5 条,确认 features 列已经生成
// 你会看到形如 [1.0,30.35,0.57,218.0] 这样的向量,
// 顺序和 featureCols 里写的一样:[status, temperature, load, voltage]
println("【2.1 features 向量生成后前5条】")
data.show(5, truncate = false)
// =================================================================
// 步骤 3:训练随机森林模型
// =================================================================
println("\n================= 3:训练随机森林模型 =================")
// ========== 切分训练集 / 测试集 ==========
// 【为什么要切分?】
// 不能用同一份数据"训练+考试",那样模型会作弊(死记硬背)。
// 所以要留一部分数据【藏起来】,训练完再拿出来考它,看真本事。
//
// randomSplit(Array(0.8, 0.2), seed) 表示:
// - 随机把 data 拆成两份,80% 用来训练,20% 用来测试
// - seed 就是让随机分数据每次都分得一样,方便老师、同学、自己重复运行时得到相同结果。
//
// val Array(trainDF, testDF) = ... 是 Scala 的"模式匹配"语法,
// 把返回的数组直接拆给两个变量
val Array(trainDF, testDF) = data.randomSplit(Array(0.8, 0.2), seed)
println(s"【3.1 训练集数量】${trainDF.count()},【测试集数量】${testDF.count()}")
// ========== 创建一个"还没训练"的随机森林分类器 ==========
// 注意:此时 rf 是个【空模型】,还啥都不会,只是"配置好了规则"
val rf = new RandomForestClassifier()
.setLabelCol("label") // 标签列叫什么(模型要学的"答案")
.setFeaturesCol("features") // 特征列叫什么(模型用来判断的依据)
.setNumTrees(numTrees) // 用 80 棵树
.setMaxDepth(maxDepth) // 每棵树最深 8 层
.setSeed(seed) // 随机种子
// ========== 训练!这是机器学习的核心一行 ==========
// rf.fit(trainDF) 的意思是:
// "rf 这个空白分类器,你给我吃下 trainDF 这份数据,自己去学规律,
// 学完吐一个【训练好的模型】出来给我"
//
// 这一行内部做了什么?(简化版)
// 1. 从 trainDF 有放回抽 80 份子样本 (Bootstrap)
// 2. 每份子样本独立(训练)长出一棵决策树,
// 每次判断时, 只随机看部分特征, 从中选最能区分正常/异常的问题
// 3. 80 棵树训练完,组成一个"森林" → 这就是随机森林模型
//
// ⚠️ 注意区分两个动词:
// - fit = 训练,让"空白工具"通过数据学规律 → 产出"会做事的模型"
// - transform = 让"会做事的模型"对数据做处理(比如预测)
val model = rf.fit(trainDF)
println(s"✅ 3.2 模型训练完成(numTrees=$numTrees, maxDepth=$maxDepth)")
// =================================================================
// 步骤 4:模型预测
// =================================================================
println("\n================= 4:模型预测 =================")
// ========== 让训练好的模型对测试集做判断 ==========
// model.transform(testDF):把 testDF 每一行交给模型预测,
// 在testDF原表后面【追加 3 列】结果:
// - rawPrediction:原始打分(80 棵树的加权得票,未归一化)
// - probability :概率向量,如 [0.85, 0.15] 表示 85% 正常 / 15% 异常
// - prediction :最终判定(取概率大的那类),0=正常,1=异常
//
// 注意:testDF 里有 label 列,但模型预测时【不看】它,
// 这样后面才能用 label 和 prediction 公平对比,看猜得准不准。
val pred = model.transform(testDF)
// ========== 提取异常概率 ==========
// probability 是一个长度为 2 的向量,例如 [0.85, 0.15]:
// - 第 0 个数 = 预测为"正常(0)"的概率
// - 第 1 个数 = 预测为"异常(1)"的概率
//
// 我们更关心"异常的概率",所以用 vector_to_array() 把向量转成数组,
// 再用 .getItem(1) 取第 1 个元素,新增成一列叫 prob1。
// 这样以后做"高风险报警"时,可以拿 prob1 > 某阈值 来过滤
.withColumn("prob1", vector_to_array(col("probability")).getItem(1))
println("【4.1 预测结果前10条(prediction + prob1)】")
// 显示对比:label=真实答案,prediction=模型预测,prob1=模型自信度
pred.select("device_id", "event_time", "label", "prediction", "prob1", "probability")
.show(10, truncate = false)
// =================================================================
// 步骤 5:模型评估
// =================================================================
println("\n================= 5:模型评估 =================")
// ========== 定义一个【小函数】用来算各种指标 ==========
// 因为 4 个指标的算法都一样,只是名字不同,所以包装成函数复用。
// def 在 Scala 里就是"定义函数"的关键字。
def eval(metric: String): Double = {
new MulticlassClassificationEvaluator()
.setLabelCol("label") // 真实答案在 label 列
.setPredictionCol("prediction") // 预测结果在 prediction列
.setMetricName(metric) // 算哪个指标(传进来的字符串)
.evaluate(pred) // 对 pred 这份预测数据做评估
}
// ========== 4 个核心指标解释 ==========
// 假设 100 个设备,实际 30 个异常 70 个正常:
//
// 1️⃣ Accuracy(准确率)= 猜对的总数 / 总数
// - 直观,但样本不平衡时会骗人
// - 比如全猜正常也能拿 70%
//
// 2️⃣ Precision(精确率)= 猜是异常的里面,真异常的比例
// - 关心:"我说异常的,有多少是真异常?"(不冤枉好设备)
//
// 3️⃣ Recall(召回率)= 实际异常的里面,被我抓出来的比例
// - 关心:"真异常的,我抓出来了多少?"(不放过坏设备)
//
// 4️⃣ F1 = Precision 和 Recall 的调和平均
// - 综合考虑两个,适合不平衡数据
val accuracy = eval("accuracy")
val precision = eval("weightedPrecision")
val recall = eval("weightedRecall")
val f1 = eval("f1")
// f"..." 是 Scala 的格式化字符串,%.4f 表示保留 4 位小数
println(f"【5.1 Accuracy(准确率) 】 = $accuracy%.4f")
println(f"【5.2 Precision(精确率)】 = $precision%.4f")
println(f"【5.3 Recall(召回率) 】 = $recall%.4f")
println(f"【5.4 F1 】 = $f1%.4f")
// =================================================================
// 步骤 6:混淆矩阵分析
// =================================================================
println("\n================= 6:混淆矩阵分析 =================")
println("【6.1 混淆矩阵(label=真实值,prediction=预测值)】")
// ========== 什么是混淆矩阵? ==========
// 是一张 2×2 的小表,把 4 种可能的"真实 vs 预测"组合数出来:
//
// 预测=正常(0) 预测=异常(1)
// 真实=正常(0) TN 真负例(对了) FP 假正例(错把正常当异常)
// 真实=异常(1) FN 假负例(漏报) TP 真正例(对了)
//
// 通过 groupBy + count 就能把这 4 种数量分别数出来
pred.groupBy("label", "prediction")
.count()
.orderBy("label", "prediction")
.show()
// ========== 怎么读这张表? ==========
// 比如下面这样的输出:
// +-----+----------+------+
// |label|prediction|count |
// +-----+----------+------+
// | 0 | 0 |281039| ← TN:正常被正确识别(对)
// | 0 | 1 | 1285| ← FP:正常被误报为异常(错)→ 误报浪费维修人力
// | 1 | 0 | 1587| ← FN:异常被漏掉了 (错)→ 漏报后果严重!
// | 1 | 1 |134824| ← TP:异常被成功抓出 (对)
// +-----+----------+------+
//
// 工业场景下,FN(漏报)往往比 FP(误报)更致命,
// 因为漏报意味着设备真坏了没人发现 → 可能酿成事故
// =================================================================
// 步骤 7:模型保存与版本管理
// =================================================================
println("\n================= 7:模型保存与版本管理 =================")
// ========== 为什么要做版本管理? ==========
// 训练好的模型不是终点,它要被"部署"上线、被反复调用。
// 但模型会不断更新(新数据 → 新模型),需要管理多个版本:
// - 出问题了能回滚到上个版本
// - 知道哪个版本对应哪天的数据
// - 线上系统总是用 latest 这个固定别名,后台静悄悄换模型
// 用当前时间戳生成版本号,如 "rf_20260128_153045"
val ver = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"))
val modelVersion = s"rf_$ver" // s"..." 是字符串模板,$变量 会被替换
val modelPath = s"$modelBasePath/$modelVersion"
println(s"【7.1 本次模型版本号】$modelVersion")
println(s"【7.2 模型保存路径】$modelPath")
// ========== 保存模型(策略 1:版本归档) ==========
// model.write.overwrite() = 如果路径已存在则覆盖,否则会报错
// 每次训练都会留一份带时间戳的归档 → 历史可追溯
model.write.overwrite().save(modelPath)
println(s"【7.3 模型版本保存成功】$modelPath")
// ========== 保存模型(策略 2:latest 别名) ==========
// 同时再存一份到固定路径 .../latest,
// 这样线上代码只需 load(".../latest") 就能拿到最新模型,不用改代码
val latestPath = s"$modelBasePath/latest"
model.write.overwrite().save(latestPath)
println(s" latest 已更新:$latestPath(version=$modelVersion)")
// ========== 验证模型可以重新加载 ==========
// 保存了不一定就能用,要 load 回来再做一次预测验证。
// 这是"上线前自检",防止模型保存损坏导致线上故障
val loaded = RandomForestClassificationModel.load(modelPath)
val pred2 = loaded.transform(testDF)
println("【7.4 加载模型验证(前5条)】")
pred2.select("label", "prediction", "probability").show(5, truncate = false)
println(s"✅ 7.5 模型加载验证成功:$modelVersion")
// =================================================================
// 步骤 8:预测结果入库
// =================================================================
println("\n================= 8:预测结果入库 =================")
// ========== 为什么要入库? ==========
// 模型在 Spark 里跑出来的 pred 只存在内存里,程序一停就没了。
// 业务大屏需要长期、稳定地查询预测结果,所以要存进数据库(ClickHouse)。
val createdAt = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
// ========== 选出要存的列,并给它们改成数据库表对应的格式 ==========
// lit(xxx) = "literal" 字面量,把固定值变成一列(每行都填一样的值)
// 比如 lit(modelVersion) 就是给每行都填上当前模型版本号
val predOut = pred.select(
lit(modelVersion).as("model_version"), // 模型版本号(每行都一样)
col("device_id"), // 设备ID
col("event_time"), // 事件时间
col("label").cast("int").as("label"), // 真实标签
col("prediction").cast("int").as("prediction"),// 模型预测
col("prob1").cast("double").as("prob1"), // 异常概率
lit(createdAt).as("created_at") // 入库时间
)
println("【8.1 入库数据预览(前10条)】")
predOut.show(10, truncate = false)
// ========== 通过 JDBC 写入 ClickHouse ==========
// SaveMode.Append = 追加模式(不删原表数据,只往后接)
// 其他可选:Overwrite(覆盖)、ErrorIfExists(报错)、Ignore(忽略)
predOut.write
.format("jdbc") // 用 JDBC 协议写
.option("url", ckUrl) // 数据库地址
.option("driver", ckDriver) // 驱动类
.option("dbtable", ckPredTable) // 写入哪张表
.mode(SaveMode.Append) // 追加模式
.save() // 执行写入
println(s"✅ 8.2 预测结果已写入 ClickHouse:$ckPredTable(model_version=$modelVersion)")
// ========== 关闭 Spark 会话,释放资源 ==========
// 不写也行,程序结束自动会关。但显式关闭是好习惯,
// 尤其在长时间运行的任务里,可以及时释放集群资源给别人用
spark.stop()
}
}6.7 验证结果
查看Clickhouse数据表
-- 查看 event_time 最新的10条预测记录 SELECT * FROM iot_report.dws_device_pred_detail ORDER BY event_time DESC LIMIT 10;
-- 查看结果
┌─model_version──────┬─device_id──┬──────────event_time─┬─label─┬─prediction─┬────────────────prob1─┬──────────created_at─┐
│ rf_20260215_192856 │ device-001 │ 2026-01-30 23:59:00 │ 0 │ 0 │ 0.013446288363634786 │ 2026-02-15 19:29:08 │
│ rf_20260215_190927 │ device-011 │ 2026-01-30 23:59:00 │ 1 │ 1 │ 0.950403529104434 │ 2026-02-15 19:09:40 │
│ rf_20260215_192856 │ device-005 │ 2026-01-30 23:59:00 │ 1 │ 1 │ 1 │ 2026-02-15 19:29:08 │
│ rf_20260215_190927 │ device-001 │ 2026-01-30 23:59:00 │ 0 │ 0 │ 0.013446288363634786 │ 2026-02-15 19:09:40 │
│ rf_20260215_192856 │ device-011 │ 2026-01-30 23:59:00 │ 1 │ 1 │ 0.950403529104434 │ 2026-02-15 19:29:08 │
│ rf_20260215_191430 │ device-023 │ 2026-01-30 23:59:00 │ 0 │ 0 │ 0.006801822455131763 │ 2026-02-15 19:14:43 │
│ rf_20260215_192856 │ device-015 │ 2026-01-30 23:59:00 │ 0 │ 0 │ 0.008066635693123252 │ 2026-02-15 19:29:08 │
│ rf_20260215_191923 │ device-011 │ 2026-01-30 23:59:00 │ 1 │ 1 │ 0.950403529104434 │ 2026-02-15 19:19:34 │
│ rf_20260215_191923 │ device-001 │ 2026-01-30 23:59:00 │ 0 │ 0 │ 0.013446288363634786 │ 2026-02-15 19:19:34 │
│ rf_20260215_191923 │ device-023 │ 2026-01-30 23:59:00 │ 0 │ 0 │ 0.006801822455131763 │ 2026-02-15 19:19:34 │
└────────────────────┴────────────┴─────────────────────┴───────┴────────────┴──────────────────────┴─────────────────────┘
解释:
一、字段含义解释
字段 含义 model_version 使用的模型版本号 device_id 设备编号 event_time 设备数据时间 label 真实标签(0=正常,1=异常) prediction 模型预测结果 prob1 预测为异常的概率 created_at 预测结果写入数据库时间 二、从整体上看这 10 条数据
① 同一个时间点的数据
👉 这些记录来自同一个设备数据时间(2026-01-30 23:59:00),属于同一批测试数据,而不是实时数据。
② 存在多个模型版本
👉 不同模型版本分别对同一批数据进行了预测,因此同一设备会出现多条记录。
三、逐条逻辑解释
1️⃣ 正常设备示例(device-001)
👉 真实是正常,模型预测也为正常,异常概率仅 1.3%,判断非常稳定。
2️⃣ 高风险异常示例(device-011)
👉 真实是异常,模型预测为异常,异常概率达 95%,模型判断较有信心。
3️⃣ 极端异常示例(device-005)
👉 所有决策树都投票为异常,因此模型给出 100% 的异常概率。
4️⃣ 低风险正常示例(device-023)
👉 真实是正常,模型预测为正常,异常概率不足 1%,属于非常安全的样本。
-- 查看异常概率在 0.95~0.99 区间的前10条记录(按概率从高到低排序) SELECT * FROM iot_report.dws_device_pred_detail WHERE prob1 BETWEEN 0.95 AND 0.99 ORDER BY prob1 DESC LIMIT 10;