一、实验背景
实验一中我们已经把原始 CSV 数据清洗成了一份干净的 DataFrame,并保存在了 /tmp/lab1_clean_data 目录。但这些"表格数据"还不能直接喂给机器学习模型——模型只认"特征向量"。
本实验要做两件大事:
特征工程:把多列特征"打包"成一列向量,让模型能吃下去;
模型训练:用随机森林算法从历史数据中学规律,得到一个会预测设备异常的模型。
特征向量可以简单理解为:
把一条设备记录中能用来判断结果的多个特征,按顺序装进一个数字列表里,打包成一个“数字数组”。比如原来一条设备数据是表格形式:
status temperature load voltage 1 47.5 0.77 219.6 机器学习模型不想一列一列地看,它更喜欢这种形式:
[1, 47.5, 0.77, 219.6]这个
[1, 47.5, 0.77, 219.6]就叫特征向量。
二、实验目标
完成本实验后,学生应当能够:
在原项目中加入 Spark MLlib 依赖;
加载实验一保存的 Parquet 数据;
使用
VectorAssembler把多列特征装配成一列向量;用
randomSplit()把数据切分为训练集和测试集,并理解为什么必须切分;配置
RandomForestClassifier的三个核心参数(numTrees、maxDepth、seed);区分
fit()和transform()两个动词的本质区别;用训练好的模型对测试集做预测,并提取异常概率;
把训练好的模型保存到本地,供实验三加载使用。
三、实验环境
| 项目 | 配置 |
|---|---|
| 操作系统 | Windows / Linux / macOS |
| JDK | 1.8 |
| Scala | 2.12.x |
| Spark | 3.3.0(含 spark-mllib) |
| 开发工具 | IntelliJ IDEA + Maven(沿用实验一的项目) |
| 输入数据 | /datas/lab1_clean_data(实验一产出的 Parquet 目录) |
四、启动环境
# 在master上启动HDFS start-dfs.sh # 在Slave1启动Yarn start-yarn.sh # 在HDFS上检查数据: ⚠️ 前置检查:开始本实验前,请确认 `/datas/lab1_clean_data` 目录存在且有 `part-xxxxx.snappy.parquet` 文件。如果没有,请先把实验一重新跑一遍。
五、完整代码(一次性复制运行)
5.1 项目准备
第 1 步:检查 pom.xml 中是否有 MLlib 依赖
在 pom.xml 的 <dependencies> 标签内追加以下依赖(已有的 spark-core 和 spark-sql 不动):
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>3.3.0</version>
</dependency>
第 2 步:创建 Scala Object 文件
在 com.demo.spark 包上右键 → New → Scala Class → 选 Object → 命名 Lab2_TrainModel。
注意:是在实验一同一个项目里新建文件,不要新建项目,避免重复配置依赖。
5.2 完整代码
把下面的代码完整复制到 Lab2_TrainModel.scala 文件中,并且完成Todo的填空:
package com.demo.spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.functions.vector_to_array
object Lab2_TrainModel {
def main(args: Array[String]): Unit = {
// ========== 参数配置区 ==========
val inputPath = "hdfs:///datas/lab1_clean_data" // 实验一产出的 Parquet 数据
val modelPath = "hdfs:///datas/lab2_rf_model" // 训练完成的模型保存路径(实验三使用)
// 随机森林核心超参数
val numTrees = 80 // 树的数量:越多越稳定,但训练越慢
val maxDepth = 8 // 树的最大深度:越深拟合越强,过深易过拟合
val seed = 42 // 随机种子:固定后每次 split / 训练结果可复现
// ========== 启动 SparkSession ==========
val spark = SparkSession.builder()
.appName("Lab2-TrainModel")
.master("local[*]") // 本地模式,使用全部 CPU 核
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
// ★ 必须在 SparkSession 之后导入,启用 $"col" 语法 和 Dataset 隐式编码器
import spark.implicits._
// ========== 步骤 1:加载实验一的清洗数据 ==========
println(s"========== 步骤 1:加载实验一的清洗数据 ==========")
// ★ read.parquet:直接读取列式存储文件,schema 自动还原,无需 inferSchema
val df = spark.read.parquet(inputPath)
println(s"【1.1 数据行数】: ${df.count()}")
println(s"【1.2 数据 Schema】:")
df.printSchema()
println(s"【1.3 前 3 行预览】:")
df.show(3, truncate = false)
// ========== 步骤 2:特征向量装配 ==========
println(s"========== 步骤 2:特征向量装配 ==========")
// 定语数组
val featureCols = Array("status", "temperature", "load", "voltage")
// ★ VectorAssembler:ML Pipeline 的特征预处理器
// 将多个数值列"打包"成一个 DenseVector 列,RandomForest 只接受向量输入
val assembler = new VectorAssembler()
.setInputCols(featureCols) // 指定要合并的输入列
.setOutputCol("features") // 输出列名,约定俗成叫 "features"
// ★ transform:Transformer 的核心方法,不训练,只做列变换(Transformation)
val data = assembler.transform(df)
.select("device_id", "event_time", "features", "label") // 只保留后续需要的列
println(s"【2.1 features 向量生成后前 5 条】:")
data.show(5, truncate = false)
// ========== 步骤 3:切分训练集与测试集 ==========
println(s"========== 步骤 3:切分训练集与测试集 ==========")
// ★ randomSplit:按比例随机切分 DataFrame;seed 固定保证可复现
// 返回 Array[DataFrame],用模式匹配直接解构赋值
val Array(trainDF, testDF) = data.randomSplit(Array(0.8, 0.2), seed)
println(s"【3.1 训练集行数:】${trainDF.count()}")
println(s"【3.2 测试集行数:】${testDF.count()}")
println(s"【3.3 训练集标签分布】")
trainDF.groupBy("label").count().orderBy("label").show()
println(s"【3.4 测试集标签分布】")
testDF.groupBy("label").count().orderBy("label").show()
// ========== 步骤 4:训练随机森林模型 ==========
println(s"========== 步骤 4:训练随机森林模型 ==========")
// ★ RandomForestClassifier:Estimator(估计器),需要调用 fit() 才会训练
// 通过链式 setter 配置超参数(Builder 风格)
val rf = new RandomForestClassifier()
.setLabelCol("label") // 标签列:0=正常,1=异常
.setFeaturesCol("features") // 特征列:VectorAssembler 的输出
.setNumTrees(numTrees) // 决策树棵数
.setMaxDepth(maxDepth) // 单棵树最大深度
.setSeed(seed) // 随机种子,影响 bootstrap 采样
// 记录程序开始运行的时间,单位是毫秒
val startTime = System.currentTimeMillis()
// fit() 会读取训练集 trainDF,开始真正的模型训练
// 输入:训练数据 DataFrame
// 输出:训练好的随机森林分类模型
val model = rf.fit(trainDF)
val elapsed = (System.currentTimeMillis() - startTime) / 1000.0
println(s"【4.1 模型训练完成】")
println(f"训练耗时:$elapsed%.2f 秒,实际生成树数量:${model.getNumTrees}")
// ========== 步骤 5:预测测试集 ==========
println(s"\n========== 步骤 5:用模型对测试集做预测 ==========")
// 使用训练好的模型,对测试集 testDF 进行预测
// transform() 会给 DataFrame 自动新增预测结果相关的列
// 常见新增列:
// rawPrediction:原始预测分数
// probability:预测为各类别的概率
// prediction:最终预测类别,0 表示正常,1 表示异常
val pred = model.transform(testDF)
// ★ vector_to_array:将 probability 向量转为数组,再用 getItem(1) 取下标 1(异常概率)
// probability 是一个长度为 2 的向量,例如 [0.85, 0.15]:
.withColumn("prob1", vector_to_array(col("probability")).getItem(1))
println(s"【5.1 预测结果(前 10 条)】")
pred.select("device_id", "label", "prediction", "prob1", "probability")
.show(10, truncate = false)
// ========== 步骤 6:筛选高风险设备 ==========
println(s"========== 步骤 6:筛选高风险设备 ==========")
// $"prob1" 依赖 import spark.implicits._;> 是 Column 的大于比较运算符
val highRisk = pred.filter($"prob1" > 0.8)
println(s"【6.1 高风险设备数(prob1 > 0.8)】:${highRisk.count()}")
println(s"【6.2 高风险设备样例(前 10 条)】")
highRisk.select("device_id", "label", "prediction", "prob1")
.show(10, truncate = false)
// ========== 步骤 7:保存模型 ==========
println(s"========== 步骤 7:保存模型 ==========")
// ★ model.write.overwrite().save():将模型序列化为 Parquet + JSON 元数据写入磁盘
// overwrite() 等价于 DataFrameWriter 的 mode("overwrite"),目标路径已存在时覆盖
model.write.overwrite().save(modelPath)
println(s"模型已保存到 $modelPath,实验三将直接加载,勿删除!")
// ========== 关闭 SparkSession ==========
spark.stop() // 释放所有 Executor 和 Driver 资源
}
}5.3 运行程序
点击 main 方法旁边的绿色三角图标 → Run,等待程序执行完成(约 1—3 分钟,取决于数据量和机器性能)。
如果一切正常,控制台会依次输出 7 个步骤的结果。下面我们逐步骤讲解每个输出代表什么。
六、分步骤讲解
步骤 1:加载实验一的清洗数据
这段代码做什么:用 spark.read.parquet() 把实验一保存的 Parquet 数据读回 DataFrame。
为什么读 Parquet 比读 CSV 简单:
不需要
.option("header", "true")——Parquet 自带表头;不需要
.option("inferSchema", "true")——Parquet 自带类型;不需要
cast()和na.drop()——数据已经在实验一清洗过了。
预期控制台输出:
========== 步骤 1:加载实验一的清洗数据 ==========
【1.1 数据行数】: 2095196
【1.2 数据 Schema】:
root
|-- device_id: string (nullable = true)
|-- event_time: string (nullable = true)
|-- status: integer (nullable = true)
|-- temperature: double (nullable = true)
|-- load: double (nullable = true)
|-- voltage: double (nullable = true)
|-- label: integer (nullable = true)
【1.3 前 3 行预览】
+----------+-------------------+------+-----------+----+-------+-----+
|device_id |event_time |status|temperature|load|voltage|label|
+----------+-------------------+------+-----------+----+-------+-----+
|device-026|2026-01-16 10:29:00|1 |29.15 |0.59|222.5 |0 |
|device-027|2026-01-16 10:29:00|1 |30.01 |0.65|227.6 |0 |
|device-028|2026-01-16 10:29:00|1 |33.42 |0.57|226.4 |0 |
+----------+-------------------+------+-----------+----+-------+-----+
only showing top 3 rows
输出含义:
行数
2095196应该和实验一步骤 3 输出的"过滤后数据量"完全一致;Schema 中所有列的类型都是正确的(int、double 而不是 string);
前 3 行不再有(status=0 的离线设备),证明实验一的过滤生效了。
思考题 1:实验一的产出是 Parquet 格式而不是 CSV,为什么这种格式更适合作为中间结果?(提示:从读取速度、是否保留类型、文件大小三个角度思考)
答案:Parquet 更适合作为中间结果,因为它读取速度快、能保留字段类型,而且文件体积更小。再次读取时,不需要重新设置表头、推断类型或清洗数据。
步骤 2:特征向量装配
这段代码做什么:用 VectorAssembler 把 4 列特征(status、temperature、load、voltage)打包成一列向量 features。
为什么必须装配:Spark MLlib 中所有模型都有一个硬性要求——输入只能是一列,且这一列的每个格子里装的是一个向量。打个比方:模型像工厂传送带上的机器,只有一个进料口,所有原料必须先打包成一个箱子才能进去。VectorAssembler 就是这台"打包机"。
关键代码解读:
setInputCols(featureCols):告诉打包机要打包哪几列;setOutputCol("features"):告诉打包机打包后的新列叫什么;assembler.transform(df):执行打包,原表的所有列都保留,末尾追加一列 features。
预期控制台输出:
========== 步骤 2:特征向量装配 ==========
【2.1 features 向量生成后前 5 条】
+----------+-------------------+----------------------+-----+
|device_id |event_time |features |label|
+----------+-------------------+----------------------+-----+
|device-026|2026-01-16 10:29:00|[1.0,29.15,0.59,222.5]|0 |
|device-027|2026-01-16 10:29:00|[1.0,30.01,0.65,227.6]|0 |
|device-028|2026-01-16 10:29:00|[1.0,33.42,0.57,226.4]|0 |
|device-029|2026-01-16 10:29:00|[1.0,34.69,0.89,210.5]|1 |
|device-030|2026-01-16 10:29:00|[1.0,35.67,0.76,218.4]|0 |
+----------+-------------------+----------------------+-----+
only showing top 5 rows
输出含义:
features列里是[1.0,29.15,0.59,222.5]这种向量,4 个数字的顺序和featureCols数组里写的顺序完全一致;第 1 个数 = status,第 2 个数 = temperature,第 3 个数 = load,第 4 个数 = voltage;
原来的 4 列分散数据(status / temperature / load / voltage)已经合并到了一列里。
思考题 2:观察输出,第 1 行的 features = [1.0, 30.35, 0.57, 218.0],请手工分解:向量中第 1、2、3、4 个位置分别对应原数据的哪一列?
答案:第 1 个位置:status;第 2 个位置:temperature;第 3 个位置:load;第 4 个位置:voltage
步骤 3:切分训练集与测试集
这段代码做什么:把数据按 8:2 随机切分成训练集(学规律)和测试集(藏起来考试用)。
为什么必须切分:不能用同一份数据"训练 + 考试",否则模型可以靠死记硬背得高分,但遇到新数据就抓瞎。所以要留一部分数据藏起来,训练完再拿出来考它,看真本事。
关键代码解读:
randomSplit(Array(0.8, 0.2), seed):按 80%、20% 随机切分;seed = 42是随机种子,固定后每次切出来的训练集和测试集都一样,方便调试和复现;val Array(trainDF, testDF) = ...是 Scala 的"模式匹配"语法,把返回的数组直接拆成两个变量。
预期控制台输出:
========== 步骤 3:切分训练集与测试集 ==========
【3.1 训练集行数:】1676445
【3.2 测试集行数:】418751
【3.3 训练集标签分布】
+-----+-------+
|label| count|
+-----+-------+
| 0|1131489|
| 1| 544956|
+-----+-------+
【3.4 测试集标签分布】
+-----+------+
|label| count|
+-----+------+
| 0|282927|
| 1|135824|
+-----+------+
输出含义:
训练集 + 测试集 ≈
2095196(实验一过滤后总量),证明切分总量正确;训练集和测试集的标签比例都接近 2:1,与原始数据比例一致——这说明
randomSplit的随机性是均匀的,没有把异常样本都分到一边去。
思考题 3:如果把 seed 从 42 改成其他数字(比如 100),训练集和测试集的总行数会变吗?具体内容会变吗?为什么?
答案:总行数基本不变,还是会按 8:2 左右切分。但训练集和测试集里的具体数据会变。
因为
seed控制随机切分的结果,换了 seed,就相当于换了一种随机分法。
步骤 4:训练随机森林模型
这段代码做什么:创建一个空白的随机森林分类器,配置好参数,然后用 fit(trainDF) 训练它。
关于 fit 这个动词(重要!):
机器学习里最容易混淆的两个动词:
fit(训练):让"空白工具"通过数据学规律 → 产出"会做事的模型";
transform(变换):让"会做事的模型"对数据做处理(比如预测)。
简单说:fit 是学习,transform 是干活。
3 个核心参数:
| 参数 | 含义 | 太小 | 太大 |
|---|---|---|---|
| numTrees | 森林里有多少棵树 | 投票不充分容易出错 | 训练超慢 |
| maxDepth | 每棵树最深问几层问题 | 学不到复杂规律 | 死记硬背过拟合 |
| seed | 随机种子 | —— | —— |
预期控制台输出:
========== 步骤 4:训练随机森林模型 ==========
【4.1 模型训练完成】
训练耗时:31.87 秒,实际生成树数量:80
输出含义:
训练耗时取决于电脑性能,一般是 30 秒—2 分钟;
"实际生成树的数量"应该和你设置的
numTrees一致;没有报错就说明训练成功,模型对象
model已经"会判断异常"了。
思考题 4:如果把 numTrees 从 80 改成 5,模型可能会出现什么问题?如果改成 1000 呢?
答案:如果
numTrees改成 5,树太少,投票不充分,模型可能不稳定、准确率下降。 如果改成 1000,树很多,模型可能更稳定,但训练速度会明显变慢,占用资源也更多。
步骤 5:用模型对测试集做预测
这段代码做什么:用训练好的模型对测试集做预测,并提取异常概率。
关于 transform 这个动词:上一步用 fit 训练出了模型,这一步用 transform 让模型干活——读入测试数据,输出预测结果。
模型输出的 3 列:
prediction:最终判定(0=正常,1=异常);probability:长度为 2 的概率向量[正常概率, 异常概率],例如[0.85, 0.15]表示 85% 概率正常、15% 概率异常;rawPrediction:原始打分,一般用不上。
为什么要单独提取 prob1:probability 是一个向量,操作起来麻烦。我们用 vector_to_array 把它转成数组,再 getItem(1) 取第 2 个元素("异常概率"),存到一个新列 prob1 里,方便后续筛选高风险设备。
预期控制台输出:
========== 步骤 5:用模型对测试集做预测 ==========
【5.1 预测结果(前 10 条)】
+----------+-----+----------+--------------------+-----------------------------------------+
|device_id |label|prediction|prob1 |probability |
+----------+-----+----------+--------------------+-----------------------------------------+
|device-001|1 |1.0 |0.9297361576750062 |[0.07026384232499391,0.9297361576750062] |
|device-001|0 |0.0 |0.005195099952609918|[0.9948049000473901,0.005195099952609918]|
|device-001|0 |0.0 |0.004994138018897672|[0.9950058619811024,0.004994138018897672]|
|device-001|0 |0.0 |0.005689677771895455|[0.9943103222281046,0.005689677771895455]|
|device-001|0 |1.0 |0.9079884349771054 |[0.09201156502289463,0.9079884349771054] |
|device-001|0 |0.0 |0.006296494439502669|[0.9937035055604972,0.006296494439502669]|
|device-001|1 |1.0 |1.0 |[0.0,1.0] |
|device-001|0 |0.0 |0.005359600517045561|[0.9946403994829545,0.005359600517045561]|
|device-001|0 |0.0 |0.006766155219800312|[0.9932338447801996,0.006766155219800312]|
|device-001|1 |1.0 |1.0 |[0.0,1.0] |
+----------+-----+----------+--------------------+-----------------------------------------+
only showing top 10 rows
输出含义:
第 1 行:真实 label=0,模型 prediction=0.0,预测对了;prob1=0.0432 说明模型很有把握是正常;
第 7 行:真实 label=1,但模型 prediction=0.0,这是一条预测错的,prob1=0.4382 说明模型有点犹豫(差点猜对);
probability中两个数加起来一定是 1.0,prob1 就是其中第 2 个数。
思考题 5:第 7 行 label=1, prediction=0 是模型预测错了。这种"实际异常但被判定为正常"的错误叫什么名字?(提示:实验三会详细讲,这里先思考)
答案: 漏报 / 假阴性 / False Negative;意思是:设备实际上异常,但模型没有识别出来。
步骤 6:筛选高风险设备
这段代码做什么:找出模型认为"很可能异常"的设备(prob1 > 0.8)。
为什么要这一步:在生产环境里,不是所有 prediction=1 都要立刻派人去维修——模型有时候只是"轻度怀疑"。我们一般会设一个高阈值(比如 0.8),只对高度怀疑的设备发报警,避免误报浪费人力。
预期控制台输出:
========== 步骤 6:筛选高风险设备 ==========
【6.1 高风险设备数(prob1 > 0.8)】:135824
【6.2 高风险设备样例(前 10 条)】
+----------+-----+----------+------------------+
|device_id |label|prediction|prob1 |
+----------+-----+----------+------------------+
|device-001|1 |1.0 |0.9297361576750062|
|device-001|0 |1.0 |0.9079884349771054|
|device-001|1 |1.0 |1.0 |
|device-001|1 |1.0 |1.0 |
|device-001|1 |1.0 |1.0 |
|device-001|1 |1.0 |0.9598024735650206|
|device-001|1 |1.0 |1.0 |
|device-001|1 |1.0 |1.0 |
|device-001|1 |1.0 |0.9825015824280433|
|device-001|1 |1.0 |1.0 |
+----------+-----+----------+------------------+
only showing top 10 rows
输出含义:
高风险设备约 4.9 万条,占测试集的 17% 左右;
这些设备的
prediction都是 1.0(高分一定预测为异常);大部分 label 也是 1(说明模型判断准确),但偶尔会有 label=0 的(如第 8 行 D0512),这是"误报"。
思考题 6:如果把高风险阈值从 0.8 调低到 0.5,会让"被报警的设备数"变多还是变少?这样做有什么坏处?
答案:把阈值从
0.8调低到0.5,被报警的设备数会变多。坏处是:模型会把更多设备判为高风险,可能带来更多误报,浪费检查和维修人力。
步骤 7:保存模型
这段代码做什么:把训练好的模型保存到本地,供实验三使用。
为什么要保存:模型在内存里,程序一停就没了。如果不保存,每次想用模型都要重新训练几分钟,太浪费时间。保存后,实验三可以直接 load() 加载,几秒钟就能用。
预期控制台输出:
========== 步骤 7:保存模型 ==========
模型已保存到 hdfs:///tmp/lab2_rf_model,实验三将直接加载,勿删除!
验证保存成功:在系统终端执行(Linux/Mac):
ls /tmp/lab2_rf_model
应该能看到 3 个子目录:
data/ # 保存模型真正训练出来的内容,例如随机森林中树的结构和判断规则,相当于“模型本体”。
metadata/ # 保存模型的基本信息,例如模型类型、参数、版本等,相当于“说明书”
treesMetadata/ # 保存随机森林中每棵树的元信息,例如树的编号、结构信息等,相当于“树的索引说明”。
思考题 7:保存出来的不是一个 .model 文件,而是一个目录里有 data、metadata、treesMetadata 三个子目录。结合"随机森林由 80 棵树组成"思考,这 3 个子目录里分别可能存了什么?
答案:保存模型时生成的是一个目录,不是单个文件。三个子目录大致作用是:
metadata:保存模型说明信息,例如模型类型、参数、版本等。
data:保存模型真正学到的内容,例如随机森林中每棵树的结构和判断规则。
treesMetadata:保存每棵树的元信息,例如树的编号、结构信息等。
七、本实验产出(衔接实验三)
本实验运行成功后,会在 hdfs:///datas/lab2_rf_model 目录生成训练好的随机森林模型。
实验三将直接加载这个模型