一、实验背景
在工业物联网(IoT)场景中,工厂里的每台设备都会源源不断地把自己的状态数据(温度、负载、电压等)发送到数据中心。这些数据量非常大,普通的 Excel 根本打不开,必须用 Spark 这种分布式计算工具来处理。
本实验是"随机森林预测设备异常"项目的第一步:把历史数据从 CSV 文件读进 Spark,看看数据长什么样、有没有问题。只有先把数据搞清楚,后面的训练模型才有意义。
二、实验目标
完成本实验后,学生应当能够:
在 IDEA 中创建一个 Spark Maven 项目并配置依赖;
启动一个 SparkSession,并理解它的作用;
使用
spark.read.csv()读取本地 CSV 文件;看懂
printSchema()输出的表结构信息;用
cast()做类型转换、用na.drop()删除空值;用
groupBy().count()统计标签分布;把清洗好的 DataFrame 保存为 Parquet 文件,供后续实验使用。
三、实验环境
| 项目 | 配置 |
|---|---|
| 操作系统 | Windows / Linux / macOS |
| JDK | 1.8 |
| Scala | 2.12.x |
| Spark | 3.3.0 |
| 开发工具 | IntelliJ IDEA + Maven |
| 数据文件 | /bigdata1/datas/device_status_with_label_2026_01.csv |
四、启动环境与数据准备
任务:把历史数据device_status_with_label_2026_01.csv 复制到HDFS
# 在master上启动HDFS start-dfs.sh # 在Slave1启动Yarn start-yarn.sh # 在HDFS上创建数据目录 hdfs dfs -mkdir /datas # 把master上的历史数据上传到HDFS上 hdfs dfs -put /opt/datas/device_status_with_label_2026_01.csv /datas/
五、完整代码(一次性复制运行)
5.1 项目准备
第 1 步:在 IDEA 中创建 Maven 项目
File → New → Project → 选 Maven(不勾 archetype)
GroupId:
com.demo.spark,ArtifactId:spark-rf-labJDK 选 1.8(一定要选 1.8,不要选高版本)
点 Finish
第 2 步:添加 Scala 框架支持
项目根目录右键 → Add Framework Support → 勾选 Scala → 选 Scala 2.12.x SDK → OK
在
src/main下右键 → New → Directory → 命名scala右键
scala目录 → Mark Directory as → Sources Root(图标变蓝)在
scala下右键 → New → Package → 命名com.demo.spark
第 3 步:创建 Scala Object 文件
在 com.demo.spark 包上右键 → New → Scala Class → 选 Object → 命名 Lab1_DataExplore。
5.2 完整代码
把下面的代码完整复制到 Lab1_DataExplore.scala 文件中(完成代码填空):
package com.demo.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object Lab1_DataExplore {
def main(args: Array[String]): Unit = {
// ========== 参数配置区 ==========
// CSV 数据文件路径
val csvPath = "hdfs:///datas/device_status_with_label_2026_01.csv"
// 清洗后数据保存路径(实验二要用,不要删!)
val outputPath = "hdfs:///datas/lab1_clean_data"
// 是否只用在线设备的数据(true=只看在线,false=全部)
val filterOnlineOnly = true
// ========== 启动 SparkSession ==========
val spark = SparkSession.builder()
.appName("Lab1-DataExplore")
.master("local[*]")
.getOrCreate()
// 设置日志层级为"ERROR"
spark.sparkContext.setLogLevel("ERROR")
// 隐式转换:启用 $"col" 语法 和 Dataset 编码器
import spark.implicits._
println("\n========== 步骤 1:读取原始 CSV 数据 ==========")
val raw = spark.read
.option("header", "true") // 首行作为列名
.option("inferSchema", "true") // 自动推断每列类型
.csv(csvPath)
println("【1.1 数据 Schema(表结构)】")
raw.printSchema()
println("【1.2 前 5 行数据预览】")
raw.show(5, truncate = false)
println(s"【1.3 总行数】${raw.count()}")
println("\n========== 步骤 2:类型转换与清洗 ==========")
val df0 = raw.select(
col("device_id"),
col("event_time"),
col("status").cast("int").as("status"), // ★ cast:强制类型转换,as 重命名列
col("temperature").cast("double").as("temperature"),
col("load").cast("double").as("load"),
col("voltage").cast("double").as("voltage"),
col("label").cast("int").as("label")
).na.drop() // ★ na.drop():删除任意列含 null 的整行
println(s"【2.1 清洗后行数】${df0.count()}")
println("【2.2 清洗后前 3 行】")
df0.show(3, truncate = false)
println("\n========== 步骤 3:过滤离线设备 ==========")
val df: DataFrame =
if (filterOnlineOnly)
df0.filter($"status" === 1)
else
df0
println(s"【3.1 当前过滤模式】filterOnlineOnly = $filterOnlineOnly")
println(s"【3.2 当前数据量】${df.count()}")
println("\n========== 步骤 4:标签分布统计 ==========")
println("【4.1 标签分布(0=正常,1=异常)】")
df.groupBy("label")
.count() // 聚合动作,生成新 DataFrame(含 label, count 两列)
.orderBy("label")
.show()
println("\n========== 步骤 5:保存清洗结果 ==========")
df.write
.mode("overwrite")
.parquet(outputPath) // ★ Parquet = 列式存储,压缩比高,实验二直接读取
println(s"✅ 数据已保存到 $outputPath")
println(" 实验二将直接读取这个文件,千万不要删除!")
// ========== 关闭 SparkSession ==========
spark.stop()
}
}5.3 运行程序
点击 main 方法旁边的绿色三角图标 → Run,等待程序执行完成(约 30 秒—1 分钟)。
如果一切正常,控制台会依次输出 5 个步骤的结果。下面我们逐步骤讲解每个输出代表什么。
六、分步骤讲解
步骤 1:读取原始 CSV 数据
这段代码做什么:通过 spark.read.csv() 把 CSV 文件读进 Spark,得到一个 DataFrame(可以理解为分布式的 Excel 表格)。
关键代码解读:
.option("header", "true"):告诉 Spark 第一行是列名,不是数据;.option("inferSchema", "true"):让 Spark 自动猜测每列的数据类型,否则所有列默认都是字符串。
预期控制台输出:
========== 步骤 1:读取原始 CSV 数据 ==========
【1.1 数据 Schema(表结构)】
root
|-- event_type: string (nullable = true)
|-- device_id: string (nullable = true)
|-- status: integer (nullable = true)
|-- temperature: double (nullable = true)
|-- load: double (nullable = true)
|-- voltage: double (nullable = true)
|-- event_time: string (nullable = true)
|-- label: integer (nullable = true)
【1.2 前 5 行数据预览】
+-------------+----------+------+-----------+----+-------+-------------------+-----+
|event_type |device_id |status|temperature|load|voltage|event_time |label|
+-------------+----------+------+-----------+----+-------+-------------------+-----+
|device_status|device-001|1 |30.35 |0.57|218.0 |2026-01-01 00:00:00|0 |
|device_status|device-002|1 |31.89 |0.94|228.2 |2026-01-01 00:00:00|1 |
|device_status|device-003|1 |32.2 |0.8 |210.4 |2026-01-01 00:00:00|0 |
|device_status|device-004|1 |47.47 |0.77|219.6 |2026-01-01 00:00:00|1 |
|device_status|device-005|1 |50.11 |0.74|219.6 |2026-01-01 00:00:00|1 |
+-------------+----------+------+-----------+----+-------+-------------------+-----+
only showing top 5 rows
【1.3 总行数】2160000
输出含义:
root表示根节点,下面 7 个字段是 CSV 的 7 列;每列括号里string/integer/double是它的数据类型;show(5)输出的表格中,每一行就是一条设备记录;1500000表示这份 CSV 一共有 150 万条数据。
如果输出不对:
| 现象 | 原因 |
|---|---|
报 FileNotFoundException | CSV 路径写错,注意 file:/// 是三个斜杠 |
| 所有列都是 string 类型 | 没加 inferSchema 选项 |
| 总行数为 0 | CSV 文件是空的,或者只有表头没有数据 |
思考题 1:观察 1.1 输出,device_id 是 string 类型,但 status 是 integer。如果不加 .option("inferSchema", "true"),所有列默认都会是什么类型?
答案:如果不加
.option("inferSchema", "true"),所有列默认都会被当成 string 字符串类型。
步骤 2:类型转换与清洗
这段代码做什么:用 cast() 把列强制转成正确的类型,再用 na.drop() 删掉所有含空值的行。
为什么要这一步:虽然步骤 1 自动推断了类型,但实际数据有时候会有问题(比如温度列里偶尔混入字符串"N/A")。cast 配合 na.drop 能把这些脏数据清理干净。
预期控制台输出:
========== 步骤 2:类型转换与清洗 ==========
【2.1 清洗后行数】2160000
【2.2 清洗后前 3 行】
+----------+-------------------+------+-----------+----+-------+-----+
|device_id |event_time |status|temperature|load|voltage|label|
+----------+-------------------+------+-----------+----+-------+-----+
|device-001|2026-01-01 00:00:00|1 |30.35 |0.57|218.0 |0 |
|device-002|2026-01-01 00:00:00|1 |31.89 |0.94|228.2 |1 |
|device-003|2026-01-01 00:00:00|1 |32.2 |0.8 |210.4 |0 |
+----------+-------------------+------+-----------+----+-------+-----+
only showing top 3 rows
输出含义:
清洗后行数没变——没有空值 ;
前 3 行的内容看起来和步骤 1 一样,但底层数据类型已经是我们指定的标准类型了。
如果输出不对:
| 现象 | 原因 |
|---|---|
| 清洗后行数和原始一样 | 数据没有空值(正常情况) |
| 清洗后行数大幅减少 | 数据质量太差,需要回头检查 CSV 文件 |
报 cast 类型不匹配 | 某列里有非数字字符,先用 filter 排除 |
思考题 2:label 字段表示设备是否异常,只可能是 0 或 1。为什么这里要 cast("int") 而不是 cast("double")?提示:考虑后续模型训练时分类器对标签类型的要求。
答案:因为
label是分类标签,只表示两类:0=正常,1=异常。 分类模型通常要求标签是整数类型,所以用int更合适;如果用double,会变成0.0、1.0,不如整数标签清晰。
步骤 3:过滤离线设备
这段代码做什么:根据 filterOnlineOnly 这个开关,决定要不要只保留 status=1(在线)的设备数据。
为什么要过滤:离线设备的温度、负载等数据通常都是 0 或异常值,会污染训练集。如果不过滤,模型会学到"看到 0 就预测异常"这种没用的规律。
预期控制台输出:
========== 步骤 3:过滤离线设备 ==========
【3.1 当前过滤模式】filterOnlineOnly = true
【3.2 当前数据量】2095196
输出含义:
过滤后还剩2095196条数据——这些就是被过滤掉的离线设备记录。
如果输出不对:
| 现象 | 原因 |
|---|---|
| 过滤后行数没变化 | 数据中所有设备都在线,或者 status 列识别有问题 |
| 过滤后行数为 0 | 所有 status 都不是 1,检查 CSV 数据 |
思考题 3:如果某天工厂大停电,所有设备都离线了 1 小时,这一小时内 status 全是 0。如果不过滤就拿去训练,模型会得出什么"错误的规律"?
答案:模型可能会错误地学到:只要 status=0,或者温度、负载、电压为 0,就判断为异常。但这其实只是设备离线或停电造成的,并不一定代表设备真的故障。
步骤 4:标签分布统计
这段代码做什么:用 groupBy("label").count() 统计正常和异常各有多少条。
为什么要这一步:训练前必须看正负样本比例。如果 99% 都是正常、1% 异常,模型会偷懒"全部预测正常"也能拿 99% 准确率,但毫无用处。这种叫"类别不平衡"问题。
预期控制台输出:
========== 步骤 4:标签分布统计 ==========
【4.1 标签分布(0=正常,1=异常)】
+-----+-------+
|label| count|
+-----+-------+
| 0|1414416|
| 1| 680780|
+-----+-------+
输出含义:
label=0有 1414416条,是正常设备数据;label=1有 680780 条,是异常设备数据;正常 : 异常 ≈ 2 : 1,正常与异常比例约为 2.08:1,异常样本占比约 32.5%。
如果输出不对:
| 现象 | 原因 |
|---|---|
| 只有 label=0 一行 | 数据中没有异常样本,无法做分类 |
| 比例严重失衡(如 100:1) | 需要采样或调权重,本实验不处理 |
思考题 4:如果 label=0 有 1000 条、label=1 只有 10 条,模型直接预测"全部正常"也能拿 99% 准确率。这说明哪个评估指标在这种情况下"会骗人"?(提示:这个问题在实验三会详细回答)
答案:会骗人的评估指标是:Accuracy,准确率;因为模型全部预测正常也能得到很高准确率,但它完全识别不出异常设备。
步骤 5:保存清洗结果
这段代码做什么:把清洗好的 DataFrame 保存为 Parquet 格式,供实验二使用。
为什么用 Parquet 不用 CSV:Parquet 是一种列式存储格式,相比 CSV 有三个优势:
保留类型:CSV 重新读还是字符串,Parquet 重读直接是 int / double;
读写更快:列式存储比行式存储快几倍;
占用空间小:自动压缩,文件比 CSV 小一半。
预期控制台输出:
========== 步骤 5:保存清洗结果 ==========
✅ 数据已保存到 file:///tmp/lab1_clean_data
实验二将直接读取这个文件,千万不要删除!
验证保存成功:在系统终端执行(Linux/Mac):
ls /tmp/lab1_clean_data
或在 Windows 资源管理器打开 D:\tmp\lab1_clean_data,应该能看到类似下面的文件:
_SUCCESS
part-00000-xxxx-c000.snappy.parquet
part-00001-xxxx-c000.snappy.parquet
...
如果输出不对:
| 现象 | 原因 |
|---|---|
报 Path already exists | 加 .mode("overwrite") 或先手动删除目录 |
Windows 写 /tmp 失败 | 改成 file:///D:/tmp/lab1_clean_data |
| 看不到 part-xxx 文件 | 检查保存路径,可能写到别的盘了 |
思考题 5:Parquet 保存出来不是一个文件,而是一个目录里有好几个 part-xxxxx 文件。为什么 Spark 要这样保存?(提示:从"分布式"这个词去想)
答案:因为 Spark 是分布式计算,数据会被分成多个分区,由多个任务并行处理和保存。 所以保存 Parquet 时,不是生成一个大文件,而是生成一个目录,里面有多个
part-xxxxx文件。这样可以提高读写速度,也方便后续并行读取。
七、本实验产出(衔接实验二)
本实验运行成功后,会在 /tmp/lab1_clean_data 目录生成 Parquet 格式的清洗数据。
实验二将直接读取这个目录,因此:
⚠️ 不要删除
/tmp/lab1_clean_data;⚠️ 不要重启电脑导致
/tmp被清空(如果你是 Linux 用户);建议把整个目录备份一份到自己的 U 盘或网盘,应对意外。
下次实验课开始前,请先确认这个目录还在,并且 filterOnlineOnly = true