李翔-大数据技术

Big data technology!

7.1 实验一:数据读取与探索【2026-5-5】

实验一:设备数据读取与探索


一、实验背景

在工业物联网(IoT)场景中,工厂里的每台设备都会源源不断地把自己的状态数据(温度、负载、电压等)发送到数据中心。这些数据量非常大,普通的 Excel 根本打不开,必须用 Spark 这种分布式计算工具来处理。

本实验是"随机森林预测设备异常"项目的第一步:把历史数据从 CSV 文件读进 Spark,看看数据长什么样、有没有问题。只有先把数据搞清楚,后面的训练模型才有意义。

二、实验目标

完成本实验后,学生应当能够:

  1. 在 IDEA 中创建一个 Spark Maven 项目并配置依赖;

  2. 启动一个 SparkSession,并理解它的作用;

  3. 使用 spark.read.csv() 读取本地 CSV 文件;

  4. 看懂 printSchema() 输出的表结构信息;

  5. cast() 做类型转换、用 na.drop() 删除空值;

  6. groupBy().count() 统计标签分布;

  7. 把清洗好的 DataFrame 保存为 Parquet 文件,供后续实验使用。

三、实验环境


项目配置
操作系统Windows / Linux / macOS
JDK1.8
Scala2.12.x
Spark3.3.0
开发工具IntelliJ IDEA + Maven
数据文件/bigdata1/datas/device_status_with_label_2026_01.csv




四、启动环境与数据准备

任务:把历史数据device_status_with_label_2026_01.csv 复制到HDFS

# 在master上启动HDFS
start-dfs.sh

# 在Slave1启动Yarn
start-yarn.sh

# 在HDFS上创建数据目录
hdfs dfs -mkdir  /datas

# 把master上的历史数据上传到HDFS上
hdfs dfs -put /opt/datas/device_status_with_label_2026_01.csv /datas/


五、完整代码(一次性复制运行)

5.1 项目准备

第 1 步:在 IDEA 中创建 Maven 项目

  1. File → New → Project → 选 Maven(不勾 archetype)

  2. GroupId:com.demo.spark,ArtifactId:spark-rf-lab

  3. JDK 选 1.8(一定要选 1.8,不要选高版本)

  4. 点 Finish

第 2 步:添加 Scala 框架支持

  1. 项目根目录右键 → Add Framework Support → 勾选 Scala → 选 Scala 2.12.x SDK → OK

  2. src/main 下右键 → New → Directory → 命名 scala

  3. 右键 scala 目录 → Mark Directory as → Sources Root(图标变蓝)

  4. scala 下右键 → New → Package → 命名 com.demo.spark

第 3 步:创建 Scala Object 文件

com.demo.spark 包上右键 → New → Scala Class → 选 Object → 命名 Lab1_DataExplore


5.2 完整代码

把下面的代码完整复制Lab1_DataExplore.scala 文件中(完成代码填空):

package com.demo.spark

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object Lab1_DataExplore {

  def main(args: Array[String]): Unit = {

    // ========== 参数配置区 ==========
    // CSV 数据文件路径
    val csvPath = "hdfs:///datas/device_status_with_label_2026_01.csv"
    // 清洗后数据保存路径(实验二要用,不要删!)
    val outputPath = "hdfs:///datas/lab1_clean_data"
    // 是否只用在线设备的数据(true=只看在线,false=全部)
    val filterOnlineOnly = true

    // ========== 启动 SparkSession ==========
    val spark = SparkSession.builder()
      .appName("Lab1-DataExplore")
      .master("local[*]")
      .getOrCreate()
      
    // 设置日志层级为"ERROR"
    spark.sparkContext.setLogLevel("ERROR")
    
    // 隐式转换:启用 $"col" 语法 和 Dataset 编码器
    import spark.implicits._

    println("\n========== 步骤 1:读取原始 CSV 数据 ==========")

    val raw = spark.read
      .option("header", "true")        // 首行作为列名
      .option("inferSchema", "true")   // 自动推断每列类型
      .csv(csvPath)

    println("【1.1 数据 Schema(表结构)】")
    raw.printSchema()

    println("【1.2 前 5 行数据预览】")
    raw.show(5, truncate = false)

    println(s"【1.3 总行数】${raw.count()}")

    println("\n========== 步骤 2:类型转换与清洗 ==========")

    val df0 = raw.select(
      col("device_id"),
      col("event_time"),
      col("status").cast("int").as("status"),       // ★ cast:强制类型转换,as 重命名列
      col("temperature").cast("double").as("temperature"),
      col("load").cast("double").as("load"),
      col("voltage").cast("double").as("voltage"),
      col("label").cast("int").as("label")
    ).na.drop()   // ★ na.drop():删除任意列含 null 的整行

    println(s"【2.1 清洗后行数】${df0.count()}")
    println("【2.2 清洗后前 3 行】")
    df0.show(3, truncate = false)

    println("\n========== 步骤 3:过滤离线设备 ==========")

    val df: DataFrame =
      if (filterOnlineOnly) 
         df0.filter($"status" === 1) 
      else 
         df0

    println(s"【3.1 当前过滤模式】filterOnlineOnly = $filterOnlineOnly")
    println(s"【3.2 当前数据量】${df.count()}")

    println("\n========== 步骤 4:标签分布统计 ==========")

    println("【4.1 标签分布(0=正常,1=异常)】")
    df.groupBy("label")
      .count()            // 聚合动作,生成新 DataFrame(含 label, count 两列)
      .orderBy("label")
      .show()

    println("\n========== 步骤 5:保存清洗结果 ==========")

    df.write
      .mode("overwrite")
      .parquet(outputPath)  // ★ Parquet = 列式存储,压缩比高,实验二直接读取

    println(s"✅ 数据已保存到 $outputPath")
    println("   实验二将直接读取这个文件,千万不要删除!")

    // ========== 关闭 SparkSession ==========
    spark.stop()
  }
}



5.3 运行程序

点击 main 方法旁边的绿色三角图标 → Run,等待程序执行完成(约 30 秒—1 分钟)。

如果一切正常,控制台会依次输出 5 个步骤的结果。下面我们逐步骤讲解每个输出代表什么。



六、分步骤讲解

步骤 1:读取原始 CSV 数据

这段代码做什么:通过 spark.read.csv() 把 CSV 文件读进 Spark,得到一个 DataFrame(可以理解为分布式的 Excel 表格)。

关键代码解读

  • .option("header", "true"):告诉 Spark 第一行是列名,不是数据;

  • .option("inferSchema", "true"):让 Spark 自动猜测每列的数据类型,否则所有列默认都是字符串。

预期控制台输出

========== 步骤 1:读取原始 CSV 数据 ==========
【1.1 数据 Schema(表结构)】
root
|-- event_type: string (nullable = true)
|-- device_id: string (nullable = true)
|-- status: integer (nullable = true)
|-- temperature: double (nullable = true)
|-- load: double (nullable = true)
|-- voltage: double (nullable = true)
|-- event_time: string (nullable = true)
|-- label: integer (nullable = true)

【1.2 前 5 行数据预览】
+-------------+----------+------+-----------+----+-------+-------------------+-----+
|event_type   |device_id |status|temperature|load|voltage|event_time         |label|
+-------------+----------+------+-----------+----+-------+-------------------+-----+
|device_status|device-001|1     |30.35      |0.57|218.0  |2026-01-01 00:00:00|0    |
|device_status|device-002|1     |31.89      |0.94|228.2  |2026-01-01 00:00:00|1    |
|device_status|device-003|1     |32.2       |0.8 |210.4  |2026-01-01 00:00:00|0    |
|device_status|device-004|1     |47.47      |0.77|219.6  |2026-01-01 00:00:00|1    |
|device_status|device-005|1     |50.11      |0.74|219.6  |2026-01-01 00:00:00|1    |
+-------------+----------+------+-----------+----+-------+-------------------+-----+
only showing top 5 rows

【1.3 总行数】2160000

输出含义

  • root 表示根节点,下面 7 个字段是 CSV 的 7 列;每列括号里 string/integer/double 是它的数据类型;

  • show(5) 输出的表格中,每一行就是一条设备记录;

  • 1500000 表示这份 CSV 一共有 150 万条数据。

如果输出不对


现象原因
FileNotFoundExceptionCSV 路径写错,注意 file:/// 是三个斜杠
所有列都是 string 类型没加 inferSchema 选项
总行数为 0CSV 文件是空的,或者只有表头没有数据


思考题 1:观察 1.1 输出,device_id 是 string 类型,但 status 是 integer。如果不加 .option("inferSchema", "true"),所有列默认都会是什么类型?

答案:如果不加 .option("inferSchema", "true"),所有列默认都会被当成 string 字符串类型


步骤 2:类型转换与清洗

这段代码做什么:用 cast() 把列强制转成正确的类型,再用 na.drop() 删掉所有含空值的行。

为什么要这一步:虽然步骤 1 自动推断了类型,但实际数据有时候会有问题(比如温度列里偶尔混入字符串"N/A")。cast 配合 na.drop 能把这些脏数据清理干净。

预期控制台输出

========== 步骤 2:类型转换与清洗 ==========
【2.1 清洗后行数】2160000
【2.2 清洗后前 3 行】
+----------+-------------------+------+-----------+----+-------+-----+
|device_id |event_time         |status|temperature|load|voltage|label|
+----------+-------------------+------+-----------+----+-------+-----+
|device-001|2026-01-01 00:00:00|1     |30.35      |0.57|218.0  |0    |
|device-002|2026-01-01 00:00:00|1     |31.89      |0.94|228.2  |1    |
|device-003|2026-01-01 00:00:00|1     |32.2       |0.8 |210.4  |0    |
+----------+-------------------+------+-----------+----+-------+-----+
only showing top 3 rows

输出含义

  • 清洗后行数没变——没有空值 ;

  • 前 3 行的内容看起来和步骤 1 一样,但底层数据类型已经是我们指定的标准类型了。

如果输出不对


现象原因
清洗后行数和原始一样数据没有空值(正常情况)
清洗后行数大幅减少数据质量太差,需要回头检查 CSV 文件
cast 类型不匹配某列里有非数字字符,先用 filter 排除


思考题 2label 字段表示设备是否异常,只可能是 0 或 1。为什么这里要 cast("int") 而不是 cast("double")?提示:考虑后续模型训练时分类器对标签类型的要求。

答案:因为 label 是分类标签,只表示两类:0=正常1=异常 分类模型通常要求标签是整数类型,所以用 int 更合适;如果用 double,会变成 0.01.0,不如整数标签清晰。


步骤 3:过滤离线设备

这段代码做什么:根据 filterOnlineOnly 这个开关,决定要不要只保留 status=1(在线)的设备数据。

为什么要过滤:离线设备的温度、负载等数据通常都是 0 或异常值,会污染训练集。如果不过滤,模型会学到"看到 0 就预测异常"这种没用的规律。

预期控制台输出

========== 步骤 3:过滤离线设备 ==========
【3.1 当前过滤模式】filterOnlineOnly = true
【3.2 当前数据量】2095196

输出含义

  • 过滤后还剩2095196条数据——这些就是被过滤掉的离线设备记录。

如果输出不对


现象原因
过滤后行数没变化数据中所有设备都在线,或者 status 列识别有问题
过滤后行数为 0所有 status 都不是 1,检查 CSV 数据


思考题 3:如果某天工厂大停电,所有设备都离线了 1 小时,这一小时内 status 全是 0。如果不过滤就拿去训练,模型会得出什么"错误的规律"?

答案:模型可能会错误地学到:只要 status=0,或者温度、负载、电压为 0,就判断为异常。但这其实只是设备离线或停电造成的,并不一定代表设备真的故障。


步骤 4:标签分布统计

这段代码做什么:用 groupBy("label").count() 统计正常和异常各有多少条。

为什么要这一步:训练前必须看正负样本比例。如果 99% 都是正常、1% 异常,模型会偷懒"全部预测正常"也能拿 99% 准确率,但毫无用处。这种叫"类别不平衡"问题。

预期控制台输出

========== 步骤 4:标签分布统计 ==========
【4.1 标签分布(0=正常,1=异常)】
+-----+-------+
|label|  count|
+-----+-------+
|    0|1414416|
|    1| 680780|
+-----+-------+

输出含义

  • label=0 有 1414416条,是正常设备数据;

  • label=1 有  680780  条,是异常设备数据;

  • 正常 : 异常 ≈ 2 : 1,正常与异常比例约为 2.08:1,异常样本占比约 32.5%。

如果输出不对


现象原因
只有 label=0 一行数据中没有异常样本,无法做分类
比例严重失衡(如 100:1)需要采样或调权重,本实验不处理


思考题 4:如果 label=0 有 1000 条、label=1 只有 10 条,模型直接预测"全部正常"也能拿 99% 准确率。这说明哪个评估指标在这种情况下"会骗人"?(提示:这个问题在实验三会详细回答)

答案:会骗人的评估指标是:Accuracy,准确率;因为模型全部预测正常也能得到很高准确率,但它完全识别不出异常设备。


步骤 5:保存清洗结果

这段代码做什么:把清洗好的 DataFrame 保存为 Parquet 格式,供实验二使用。

为什么用 Parquet 不用 CSV:Parquet 是一种列式存储格式,相比 CSV 有三个优势:

  1. 保留类型:CSV 重新读还是字符串,Parquet 重读直接是 int / double;

  2. 读写更快:列式存储比行式存储快几倍;

  3. 占用空间小:自动压缩,文件比 CSV 小一半。

预期控制台输出

========== 步骤 5:保存清洗结果 ==========
✅ 数据已保存到 file:///tmp/lab1_clean_data
  实验二将直接读取这个文件,千万不要删除!

验证保存成功:在系统终端执行(Linux/Mac):

ls /tmp/lab1_clean_data

或在 Windows 资源管理器打开 D:\tmp\lab1_clean_data,应该能看到类似下面的文件:

_SUCCESS
part-00000-xxxx-c000.snappy.parquet
part-00001-xxxx-c000.snappy.parquet
...

如果输出不对


现象原因
Path already exists.mode("overwrite") 或先手动删除目录
Windows 写 /tmp 失败改成 file:///D:/tmp/lab1_clean_data
看不到 part-xxx 文件检查保存路径,可能写到别的盘了


思考题 5:Parquet 保存出来不是一个文件,而是一个目录里有好几个 part-xxxxx 文件。为什么 Spark 要这样保存?(提示:从"分布式"这个词去想)

答案:因为 Spark 是分布式计算,数据会被分成多个分区,由多个任务并行处理和保存。 所以保存 Parquet 时,不是生成一个大文件,而是生成一个目录,里面有多个 part-xxxxx 文件。这样可以提高读写速度,也方便后续并行读取。



七、本实验产出(衔接实验二)

本实验运行成功后,会在 /tmp/lab1_clean_data 目录生成 Parquet 格式的清洗数据。

实验二将直接读取这个目录,因此:

  • ⚠️ 不要删除 /tmp/lab1_clean_data

  • ⚠️ 不要重启电脑导致 /tmp 被清空(如果你是 Linux 用户);

  • 建议把整个目录备份一份到自己的 U 盘或网盘,应对意外。

下次实验课开始前,请先确认这个目录还在,并且 filterOnlineOnly = true 的版本是最后一次运行的结果。


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1