李翔-大数据技术

Big data technology!

7.2 实验二_特征工程与模型训练【2026-5-5】

实验二:特征工程与随机森林模型训练

一、实验背景

实验一中我们已经把原始 CSV 数据清洗成了一份干净的 DataFrame,并保存在了 /tmp/lab1_clean_data 目录。但这些"表格数据"还不能直接喂给机器学习模型——模型只认"特征向量"。

本实验要做两件大事:

  1. 特征工程:把多列特征"打包"成一列向量,让模型能吃下去;

  2. 模型训练:用随机森林算法从历史数据中学规律,得到一个会预测设备异常的模型。

特征向量可以简单理解为:

把一条设备记录中能用来判断结果的多个特征,按顺序装进一个数字列表里,打包成一个“数字数组”。

比如原来一条设备数据是表格形式:

statustemperatureloadvoltage
147.50.77219.6

机器学习模型不想一列一列地看,它更喜欢这种形式:

[1, 47.5, 0.77, 219.6]

这个 [1, 47.5, 0.77, 219.6] 就叫特征向量


二、实验目标

完成本实验后,学生应当能够:

  1. 在原项目中加入 Spark MLlib 依赖;

  2. 加载实验一保存的 Parquet 数据;

  3. 使用 VectorAssembler 把多列特征装配成一列向量;

  4. randomSplit() 把数据切分为训练集和测试集,并理解为什么必须切分;

  5. 配置 RandomForestClassifier 的三个核心参数(numTrees、maxDepth、seed);

  6. 区分 fit()transform() 两个动词的本质区别;

  7. 用训练好的模型对测试集做预测,并提取异常概率;

  8. 把训练好的模型保存到本地,供实验三加载使用。

三、实验环境


项目配置
操作系统Windows / Linux / macOS
JDK1.8
Scala2.12.x
Spark3.3.0(含 spark-mllib)
开发工具IntelliJ IDEA + Maven(沿用实验一的项目
输入数据/datas/lab1_clean_data(实验一产出的 Parquet 目录)




四、启动环境

# 在master上启动HDFS
start-dfs.sh

# 在Slave1启动Yarn
start-yarn.sh

# 在HDFS上检查数据:
⚠️ 前置检查:开始本实验前,请确认 `/datas/lab1_clean_data` 
目录存在且有 `part-xxxxx.snappy.parquet` 文件。如果没有,请先把实验一重新跑一遍。

五、完整代码(一次性复制运行)

5.1 项目准备

第 1 步:检查 pom.xml 中是否有 MLlib 依赖

pom.xml<dependencies> 标签内追加以下依赖(已有的 spark-core 和 spark-sql 不动):

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-mllib_2.12</artifactId>
   <version>3.3.0</version>
</dependency>


第 2 步:创建 Scala Object 文件

com.demo.spark 包上右键 → New → Scala Class → 选 Object → 命名 Lab2_TrainModel

注意:是在实验一同一个项目里新建文件,不要新建项目,避免重复配置依赖。


5.2 完整代码

把下面的代码完整复制Lab2_TrainModel.scala 文件中,并且完成Todo的填空:

package com.demo.spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.functions.vector_to_array

object Lab2_TrainModel {

  def main(args: Array[String]): Unit = {

    // ========== 参数配置区 ==========
    val inputPath = "hdfs:///datas/lab1_clean_data"  // 实验一产出的 Parquet 数据
    val modelPath = "hdfs:///datas/lab2_rf_model"    // 训练完成的模型保存路径(实验三使用)

    // 随机森林核心超参数
    val numTrees = 80   // 树的数量:越多越稳定,但训练越慢
    val maxDepth = 8    // 树的最大深度:越深拟合越强,过深易过拟合
    val seed     = 42   // 随机种子:固定后每次 split / 训练结果可复现

    // ========== 启动 SparkSession ==========
    val spark = SparkSession.builder()
      .appName("Lab2-TrainModel")
      .master("local[*]")  // 本地模式,使用全部 CPU 核
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    // ★ 必须在 SparkSession 之后导入,启用 $"col" 语法 和 Dataset 隐式编码器
    import spark.implicits._

    // ========== 步骤 1:加载实验一的清洗数据 ==========
    println(s"========== 步骤 1:加载实验一的清洗数据 ==========")
    // ★ read.parquet:直接读取列式存储文件,schema 自动还原,无需 inferSchema
    val df = spark.read.parquet(inputPath)

    println(s"【1.1 数据行数】: ${df.count()}")
    println(s"【1.2 数据 Schema】:")
    df.printSchema()
    println(s"【1.3 前 3 行预览】:")
    df.show(3, truncate = false)

    // ========== 步骤 2:特征向量装配 ==========
    println(s"========== 步骤 2:特征向量装配 ==========")
    
    // 定语数组
    val featureCols = Array("status", "temperature", "load", "voltage")

    // ★ VectorAssembler:ML Pipeline 的特征预处理器
    //   将多个数值列"打包"成一个 DenseVector 列,RandomForest 只接受向量输入
    val assembler = new VectorAssembler()
      .setInputCols(featureCols)  // 指定要合并的输入列
      .setOutputCol("features")   // 输出列名,约定俗成叫 "features"

    // ★ transform:Transformer 的核心方法,不训练,只做列变换(Transformation)
    val data = assembler.transform(df)
      .select("device_id", "event_time", "features", "label")  // 只保留后续需要的列

    println(s"【2.1 features 向量生成后前 5 条】:")
    data.show(5, truncate = false)

    // ========== 步骤 3:切分训练集与测试集 ==========
    println(s"========== 步骤 3:切分训练集与测试集 ==========")
    // ★ randomSplit:按比例随机切分 DataFrame;seed 固定保证可复现
    //   返回 Array[DataFrame],用模式匹配直接解构赋值
    val Array(trainDF, testDF) = data.randomSplit(Array(0.8, 0.2), seed)

    println(s"【3.1 训练集行数:】${trainDF.count()}")
    println(s"【3.2 测试集行数:】${testDF.count()}")
    println(s"【3.3 训练集标签分布】")
    trainDF.groupBy("label").count().orderBy("label").show()
    println(s"【3.4 测试集标签分布】")
    testDF.groupBy("label").count().orderBy("label").show()

    // ========== 步骤 4:训练随机森林模型 ==========
    println(s"========== 步骤 4:训练随机森林模型 ==========")
    // ★ RandomForestClassifier:Estimator(估计器),需要调用 fit() 才会训练
    //   通过链式 setter 配置超参数(Builder 风格)
    val rf = new RandomForestClassifier()
      .setLabelCol("label")        // 标签列:0=正常,1=异常
      .setFeaturesCol("features")  // 特征列:VectorAssembler 的输出
      .setNumTrees(numTrees)       // 决策树棵数
      .setMaxDepth(maxDepth)       // 单棵树最大深度
      .setSeed(seed)               // 随机种子,影响 bootstrap 采样

    // 记录程序开始运行的时间,单位是毫秒  
    val startTime = System.currentTimeMillis()

    // fit() 会读取训练集 trainDF,开始真正的模型训练
    // 输入:训练数据 DataFrame
    // 输出:训练好的随机森林分类模型
    val model = rf.fit(trainDF)

    val elapsed = (System.currentTimeMillis() - startTime) / 1000.0

    println(s"【4.1 模型训练完成】")
    println(f"训练耗时:$elapsed%.2f 秒,实际生成树数量:${model.getNumTrees}")

    // ========== 步骤 5:预测测试集 ==========
    println(s"\n========== 步骤 5:用模型对测试集做预测 ==========")

	// 使用训练好的模型,对测试集 testDF 进行预测
    // transform() 会给 DataFrame 自动新增预测结果相关的列
    // 常见新增列:
    // rawPrediction:原始预测分数
    // probability:预测为各类别的概率
    // prediction:最终预测类别,0 表示正常,1 表示异常
    val pred = model.transform(testDF)
      // ★ vector_to_array:将 probability 向量转为数组,再用 getItem(1) 取下标 1(异常概率)
      // probability 是一个长度为 2 的向量,例如 [0.85, 0.15]:
      .withColumn("prob1", vector_to_array(col("probability")).getItem(1))

    println(s"【5.1 预测结果(前 10 条)】")
    pred.select("device_id", "label", "prediction", "prob1", "probability")
      .show(10, truncate = false)

    // ========== 步骤 6:筛选高风险设备 ==========
    println(s"========== 步骤 6:筛选高风险设备 ==========")

    // $"prob1" 依赖 import spark.implicits._;> 是 Column 的大于比较运算符
    val highRisk = pred.filter($"prob1" > 0.8)

    println(s"【6.1 高风险设备数(prob1 > 0.8)】:${highRisk.count()}")
    println(s"【6.2 高风险设备样例(前 10 条)】")
    highRisk.select("device_id", "label", "prediction", "prob1")
      .show(10, truncate = false)

    // ========== 步骤 7:保存模型 ==========
    println(s"========== 步骤 7:保存模型 ==========")

    // ★ model.write.overwrite().save():将模型序列化为 Parquet + JSON 元数据写入磁盘
    //   overwrite() 等价于 DataFrameWriter 的 mode("overwrite"),目标路径已存在时覆盖
    model.write.overwrite().save(modelPath)

    println(s"模型已保存到 $modelPath,实验三将直接加载,勿删除!")

    // ========== 关闭 SparkSession ==========
    spark.stop()  // 释放所有 Executor 和 Driver 资源
  }
}



5.3 运行程序

点击 main 方法旁边的绿色三角图标 → Run,等待程序执行完成(约 1—3 分钟,取决于数据量和机器性能)。

如果一切正常,控制台会依次输出 7 个步骤的结果。下面我们逐步骤讲解每个输出代表什么。


六、分步骤讲解

步骤 1:加载实验一的清洗数据

这段代码做什么:用 spark.read.parquet() 把实验一保存的 Parquet 数据读回 DataFrame。

为什么读 Parquet 比读 CSV 简单

  • 不需要 .option("header", "true")——Parquet 自带表头;

  • 不需要 .option("inferSchema", "true")——Parquet 自带类型;

  • 不需要 cast()na.drop()——数据已经在实验一清洗过了。

预期控制台输出

========== 步骤 1:加载实验一的清洗数据 ==========
【1.1 数据行数】: 2095196
【1.2 数据 Schema】:
root
|-- device_id: string (nullable = true)
|-- event_time: string (nullable = true)
|-- status: integer (nullable = true)
|-- temperature: double (nullable = true)
|-- load: double (nullable = true)
|-- voltage: double (nullable = true)
|-- label: integer (nullable = true)

【1.3 前 3 行预览】
+----------+-------------------+------+-----------+----+-------+-----+
|device_id |event_time         |status|temperature|load|voltage|label|
+----------+-------------------+------+-----------+----+-------+-----+
|device-026|2026-01-16 10:29:00|1     |29.15      |0.59|222.5  |0    |
|device-027|2026-01-16 10:29:00|1     |30.01      |0.65|227.6  |0    |
|device-028|2026-01-16 10:29:00|1     |33.42      |0.57|226.4  |0    |
+----------+-------------------+------+-----------+----+-------+-----+
only showing top 3 rows

输出含义

  • 行数 2095196 应该和实验一步骤 3 输出的"过滤后数据量"完全一致;

  • Schema 中所有列的类型都是正确的(int、double 而不是 string);

  • 前 3 行不再有(status=0 的离线设备),证明实验一的过滤生效了。


思考题 1:实验一的产出是 Parquet 格式而不是 CSV,为什么这种格式更适合作为中间结果?(提示:从读取速度、是否保留类型、文件大小三个角度思考)

答案:Parquet 更适合作为中间结果,因为它读取速度快能保留字段类型,而且文件体积更小。再次读取时,不需要重新设置表头、推断类型或清洗数据。


步骤 2:特征向量装配

这段代码做什么:用 VectorAssembler 把 4 列特征(status、temperature、load、voltage)打包成一列向量 features

为什么必须装配:Spark MLlib 中所有模型都有一个硬性要求——输入只能是一列,且这一列的每个格子里装的是一个向量。打个比方:模型像工厂传送带上的机器,只有一个进料口,所有原料必须先打包成一个箱子才能进去。VectorAssembler 就是这台"打包机"。

关键代码解读

  • setInputCols(featureCols):告诉打包机要打包哪几列;

  • setOutputCol("features"):告诉打包机打包后的新列叫什么;

  • assembler.transform(df):执行打包,原表的所有列都保留,末尾追加一列 features

预期控制台输出

========== 步骤 2:特征向量装配 ==========
【2.1 features 向量生成后前 5 条】
+----------+-------------------+----------------------+-----+
|device_id |event_time         |features              |label|
+----------+-------------------+----------------------+-----+
|device-026|2026-01-16 10:29:00|[1.0,29.15,0.59,222.5]|0    |
|device-027|2026-01-16 10:29:00|[1.0,30.01,0.65,227.6]|0    |
|device-028|2026-01-16 10:29:00|[1.0,33.42,0.57,226.4]|0    |
|device-029|2026-01-16 10:29:00|[1.0,34.69,0.89,210.5]|1    |
|device-030|2026-01-16 10:29:00|[1.0,35.67,0.76,218.4]|0    |
+----------+-------------------+----------------------+-----+
only showing top 5 rows

输出含义

  • features 列里是 [1.0,29.15,0.59,222.5] 这种向量,4 个数字的顺序和 featureCols 数组里写的顺序完全一致

  • 第 1 个数 = status,第 2 个数 = temperature,第 3 个数 = load,第 4 个数 = voltage;

  • 原来的 4 列分散数据(status / temperature / load / voltage)已经合并到了一列里。


思考题 2:观察输出,第 1 行的 features = [1.0, 30.35, 0.57, 218.0],请手工分解:向量中第 1、2、3、4 个位置分别对应原数据的哪一列?

答案:第 1 个位置:status;第 2 个位置:temperature;第 3 个位置:load;第 4 个位置:voltage


步骤 3:切分训练集与测试集

这段代码做什么:把数据按 8:2 随机切分成训练集(学规律)和测试集(藏起来考试用)。

为什么必须切分:不能用同一份数据"训练 + 考试",否则模型可以靠死记硬背得高分,但遇到新数据就抓瞎。所以要留一部分数据藏起来,训练完再拿出来考它,看真本事。

关键代码解读

  • randomSplit(Array(0.8, 0.2), seed):按 80%、20% 随机切分;

  • seed = 42 是随机种子,固定后每次切出来的训练集和测试集都一样,方便调试和复现;

  • val Array(trainDF, testDF) = ... 是 Scala 的"模式匹配"语法,把返回的数组直接拆成两个变量。

预期控制台输出

========== 步骤 3:切分训练集与测试集 ==========
【3.1 训练集行数:】1676445
【3.2 测试集行数:】418751
【3.3 训练集标签分布】
+-----+-------+
|label|  count|
+-----+-------+
|    0|1131489|
|    1| 544956|
+-----+-------+

【3.4 测试集标签分布】
+-----+------+
|label| count|
+-----+------+
|    0|282927|
|    1|135824|
+-----+------+

输出含义

  • 训练集  + 测试集 ≈  2095196 (实验一过滤后总量),证明切分总量正确;

  • 训练集和测试集的标签比例都接近 2:1,与原始数据比例一致——这说明 randomSplit 的随机性是均匀的,没有把异常样本都分到一边去。


思考题 3:如果把 seed 从 42 改成其他数字(比如 100),训练集和测试集的总行数会变吗?具体内容会变吗?为什么?

答案:总行数基本不变,还是会按 8:2 左右切分。但训练集和测试集里的具体数据会变

因为 seed 控制随机切分的结果,换了 seed,就相当于换了一种随机分法。


步骤 4:训练随机森林模型

这段代码做什么:创建一个空白的随机森林分类器,配置好参数,然后用 fit(trainDF) 训练它。

关于 fit 这个动词(重要!)

机器学习里最容易混淆的两个动词:

  • fit(训练):让"空白工具"通过数据学规律 → 产出"会做事的模型";

  • transform(变换):让"会做事的模型"对数据做处理(比如预测)。

简单说:fit 是学习,transform 是干活。

3 个核心参数


参数含义太小太大
numTrees森林里有多少棵树投票不充分容易出错训练超慢
maxDepth每棵树最深问几层问题学不到复杂规律死记硬背过拟合
seed随机种子————


预期控制台输出

========== 步骤 4:训练随机森林模型 ==========
【4.1 模型训练完成】
训练耗时:31.87 秒,实际生成树数量:80

输出含义

  • 训练耗时取决于电脑性能,一般是 30 秒—2 分钟

  • "实际生成树的数量"应该和你设置的 numTrees 一致;

  • 没有报错就说明训练成功,模型对象 model 已经"会判断异常"了。


思考题 4:如果把 numTrees 从 80 改成 5,模型可能会出现什么问题?如果改成 1000 呢?

答案:如果 numTrees 改成 5,树太少,投票不充分,模型可能不稳定、准确率下降。 如果改成 1000,树很多,模型可能更稳定,但训练速度会明显变慢,占用资源也更多。


步骤 5:用模型对测试集做预测

这段代码做什么:用训练好的模型对测试集做预测,并提取异常概率。

关于 transform 这个动词:上一步用 fit 训练出了模型,这一步用 transform 让模型干活——读入测试数据,输出预测结果。

模型输出的 3 列

  • prediction:最终判定(0=正常,1=异常);

  • probability:长度为 2 的概率向量 [正常概率, 异常概率],例如 [0.85, 0.15] 表示 85% 概率正常、15% 概率异常;

  • rawPrediction:原始打分,一般用不上。

为什么要单独提取 prob1probability 是一个向量,操作起来麻烦。我们用 vector_to_array 把它转成数组,再 getItem(1) 取第 2 个元素("异常概率"),存到一个新列 prob1 里,方便后续筛选高风险设备。

预期控制台输出

========== 步骤 5:用模型对测试集做预测 ==========
【5.1 预测结果(前 10 条)】
+----------+-----+----------+--------------------+-----------------------------------------+
|device_id |label|prediction|prob1               |probability                              |
+----------+-----+----------+--------------------+-----------------------------------------+
|device-001|1    |1.0       |0.9297361576750062  |[0.07026384232499391,0.9297361576750062] |
|device-001|0    |0.0       |0.005195099952609918|[0.9948049000473901,0.005195099952609918]|
|device-001|0    |0.0       |0.004994138018897672|[0.9950058619811024,0.004994138018897672]|
|device-001|0    |0.0       |0.005689677771895455|[0.9943103222281046,0.005689677771895455]|
|device-001|0    |1.0       |0.9079884349771054  |[0.09201156502289463,0.9079884349771054] |
|device-001|0    |0.0       |0.006296494439502669|[0.9937035055604972,0.006296494439502669]|
|device-001|1    |1.0       |1.0                 |[0.0,1.0]                                |
|device-001|0    |0.0       |0.005359600517045561|[0.9946403994829545,0.005359600517045561]|
|device-001|0    |0.0       |0.006766155219800312|[0.9932338447801996,0.006766155219800312]|
|device-001|1    |1.0       |1.0                 |[0.0,1.0]                                |
+----------+-----+----------+--------------------+-----------------------------------------+
only showing top 10 rows

输出含义

  • 第 1 行:真实 label=0,模型 prediction=0.0,预测对了;prob1=0.0432 说明模型很有把握是正常;

  • 第 7 行:真实 label=1,但模型 prediction=0.0,这是一条预测错的,prob1=0.4382 说明模型有点犹豫(差点猜对);

  • probability 中两个数加起来一定是 1.0,prob1 就是其中第 2 个数。


思考题 5:第 7 行 label=1, prediction=0 是模型预测错了。这种"实际异常但被判定为正常"的错误叫什么名字?(提示:实验三会详细讲,这里先思考)

答案: 漏报 / 假阴性 / False Negative;意思是:设备实际上异常,但模型没有识别出来。


步骤 6:筛选高风险设备

这段代码做什么:找出模型认为"很可能异常"的设备(prob1 > 0.8)。

为什么要这一步:在生产环境里,不是所有 prediction=1 都要立刻派人去维修——模型有时候只是"轻度怀疑"。我们一般会设一个高阈值(比如 0.8),只对高度怀疑的设备发报警,避免误报浪费人力。

预期控制台输出

========== 步骤 6:筛选高风险设备 ==========
【6.1 高风险设备数(prob1 > 0.8)】:135824
【6.2 高风险设备样例(前 10 条)】
+----------+-----+----------+------------------+
|device_id |label|prediction|prob1             |
+----------+-----+----------+------------------+
|device-001|1    |1.0       |0.9297361576750062|
|device-001|0    |1.0       |0.9079884349771054|
|device-001|1    |1.0       |1.0               |
|device-001|1    |1.0       |1.0               |
|device-001|1    |1.0       |1.0               |
|device-001|1    |1.0       |0.9598024735650206|
|device-001|1    |1.0       |1.0               |
|device-001|1    |1.0       |1.0               |
|device-001|1    |1.0       |0.9825015824280433|
|device-001|1    |1.0       |1.0               |
+----------+-----+----------+------------------+
only showing top 10 rows

输出含义

  • 高风险设备约 4.9 万条,占测试集的 17% 左右;

  • 这些设备的 prediction 都是 1.0(高分一定预测为异常);

  • 大部分 label 也是 1(说明模型判断准确),但偶尔会有 label=0 的(如第 8 行 D0512),这是"误报"。


思考题 6:如果把高风险阈值从 0.8 调低到 0.5,会让"被报警的设备数"变多还是变少?这样做有什么坏处?

答案:把阈值从 0.8 调低到 0.5,被报警的设备数会变多

坏处是:模型会把更多设备判为高风险,可能带来更多误报,浪费检查和维修人力。


步骤 7:保存模型

这段代码做什么:把训练好的模型保存到本地,供实验三使用。

为什么要保存:模型在内存里,程序一停就没了。如果不保存,每次想用模型都要重新训练几分钟,太浪费时间。保存后,实验三可以直接 load() 加载,几秒钟就能用。

预期控制台输出

========== 步骤 7:保存模型 ==========
模型已保存到 hdfs:///tmp/lab2_rf_model,实验三将直接加载,勿删除!

验证保存成功:在系统终端执行(Linux/Mac):

ls /tmp/lab2_rf_model

应该能看到 3 个子目录:

data/             # 保存模型真正训练出来的内容,例如随机森林中树的结构和判断规则,相当于“模型本体”。
metadata/         # 保存模型的基本信息,例如模型类型、参数、版本等,相当于“说明书”
treesMetadata/    # 保存随机森林中每棵树的元信息,例如树的编号、结构信息等,相当于“树的索引说明”。

思考题 7:保存出来的不是一个 .model 文件,而是一个目录里有 datametadatatreesMetadata 三个子目录。结合"随机森林由 80 棵树组成"思考,这 3 个子目录里分别可能存了什么?

答案:保存模型时生成的是一个目录,不是单个文件。三个子目录大致作用是:

metadata:保存模型说明信息,例如模型类型、参数、版本等。

data:保存模型真正学到的内容,例如随机森林中每棵树的结构和判断规则。

treesMetadata:保存每棵树的元信息,例如树的编号、结构信息等。



七、本实验产出(衔接实验三)

本实验运行成功后,会在 hdfs:///datas/lab2_rf_model 目录生成训练好的随机森林模型。

实验三将直接加载这个模型,因此:下次实验课开始前,请先确认这两个目录都还在。


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1