李翔-大数据技术

Big data technology!

第7章-实验二:利用模型实时预测

第7章 设备异常预测实训 — 基于 Spark ML 的随机森林应用


实验二:利用模型实时预测

7.1 工作场景

本实验模拟一个设备运行状态监控系统,结合实时数据流和机器学习模型进行异常检测。系统实时接收设备数据(如温度、负载、电压等),通过Kafka进行数据流传输,Spark进行数据处理与预测,最终将预测结果存储在ClickHouse数据库中以供大屏展示。

该系统的核心目标是实时预测设备的异常状态,通过预测结果帮助企业提前识别潜在的设备故障,降低故障风险并提高设备的可靠性。

7.2 实验目标

  1. 数据流读取与处理:通过Spark从Kafka实时读取设备状态数据,并进行预处理,转换为结构化数据。

  2. 特征工程:将多个输入特征(如设备状态、温度、负载、电压等)合并为一个特征向量,供机器学习模型使用。

  3. 实时预测:加载最新的训练好的随机森林模型,并实时对每批数据进行预测,输出异常概率。前提:实验一经过训练已经把最新模型覆盖保存到 hdfs:///model/device_abnormal/latest

  4. 结果存储与展示:将预测结果(设备ID、预测结果、异常概率等)实时写入ClickHouse数据库 iot_report.dws_device_pred_detail,供大屏展示。

  5. 性能优化与稳定性:使用foreachBatch进行批处理,以优化数据的写入和处理性能,同时确保数据不丢失,通过Checkpoint记录处理进度。

9021568c-a374-480b-bbb1-755a490a4334


路线:Kafka → Spark Streaming → 模型加载 → ClickHouse → 大屏



7.3 启动环境

# 启动 Hadoop
start-dfs.sh
start-yarn.sh

# 在master/slave1/slave2启动 ZooKeeper
zkServer.sh start

# 在master/slave1/slave2检查 ZooKeeper状态
zkServer.sh status

# 在master/slave1/slave2启动 Kafka
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties

# 写入实时状态流数据到 Kafka
# 1)在master节点中操作【新开窗口】
cd /opt/datas
# 2)启动设备状态模拟脚本,并将数据写入 Kafka
python 01_device_status_sim.py | \
kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic


7.4 实时预测 Job

package com.demo.spark

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.RandomForestClassificationModel
import org.apache.spark.ml.functions.vector_to_array

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

/**
 * 实时预测任务(给大屏用):
 *
 * 整个流程就像一条流水线:
 * Kafka(设备一直在发数据) → Spark(接住数据处理) → 用训练好的模型预测 → 结果写到 ClickHouse → 大屏展示
 *
 * Kafka 发过来的数据长这样(一条JSON):
 * {"status":1,"load":0.51,"event_type":"device_status","event_time":"2026-02-16 22:04:37","voltage":216.7,"device_id":"device-039","temperature":26.42}
 *
 * 字段含义:
 *   device_id:设备编号(哪台机器)
 *   status:设备状态(0关机/1开机)
 *   temperature:温度
 *   load:负载(机器忙不忙)
 *   voltage:电压
 *   event_time:这条数据是什么时候产生的
 */
object DevicePredictStreamJob {

  def main(args: Array[String]): Unit = {

    // =========================
    // 0)先把要用的"地址、路径"配置好,方便后面直接用
    // =========================
    val kafkaServers = "master:9092"          // Kafka 服务器地址(数据从这里来)
    val topic = "device_status_topic"         // Kafka 的"频道"名,我们订阅这个频道

    // 模型存放的位置(在HDFS分布式文件系统上)
    // latest 表示"最新版"——训练任务每次训练完都会把新模型覆盖到这里
    val latestModelPath = "hdfs:///model/device_abnormal/latest"

    // ClickHouse 数据库的连接信息(预测结果要写到这里,大屏从这里读)
    val ckUrl = "jdbc:clickhouse://master:8123/iot_report?jdbcCompliant=false"
    val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver"
    val ckPredTable = "dws_device_pred_detail" // 表名:存预测结果的表(大屏查这张表)

    // 检查点路径(记账本,记录"我处理到哪条数据了")
    // 作用:程序万一挂了,重启后能接着之前的进度继续,不会重复处理也不会漏处理
    val checkpointPath = "hdfs:///ckpt/device_abnormal_predict_kafka"

    // 设置HDFS用户名(告诉HDFS:我是root用户,我有权限读写)
    System.setProperty("HADOOP_USER_NAME", "root")

    // =========================
    // 1)创建 SparkSession —— 相当于"启动Spark引擎"
    // 后面所有Spark操作都要通过这个对象来做
    // =========================
    val spark = SparkSession.builder()
      .appName("DeviceAbnormalPredictKafkaStreamJob") // 给作业起个名字(方便在监控页面找到它)
      .master("local[*]")   // local[*] 表示在本机运行,用所有CPU核心(适合学习和调试)
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")  // 只打印错误日志,屏幕不会被刷屏
    import spark.implicits._

    // ==========================
    // 2)开始接收 Kafka 数据流
    // 注意:这里不是一次性读完,而是"持续监听",有新数据就处理
    // ==========================
    val streamRaw = spark.readStream
      .format("kafka")                                    // 数据源类型是 Kafka
      .option("kafka.bootstrap.servers", kafkaServers)    // Kafka 在哪
      .option("subscribe", topic)                         // 订阅哪个频道
      .option("startingOffsets", "latest")                // 从"程序启动后产生的新数据"开始读(旧的不要)
      .option("failOnDataLoss", "false")                  // 中间丢点数据不报错,继续跑
      .load()

    println("====== 1.正在读取 Kafka 实时数据流...")

    // Kafka 发过来的是二进制数据,先转成普通字符串才能看懂
    val jsonDF = streamRaw.selectExpr("CAST(value AS STRING) AS json")

    // ==========================
    // 3)拆JSON —— 把一整串JSON拆成一个个字段
    // 比如把 {"device_id":"device-039","temperature":26.42,...}
    // 拆成单独的列:device_id 列、temperature 列、...
    // ==========================
    val streamDF = jsonDF
      .withColumn("event_type", get_json_object(col("json"), "$.event_type"))
      .withColumn("device_id", get_json_object(col("json"), "$.device_id"))
      .withColumn("status", get_json_object(col("json"), "$.status").cast("int"))           // 转成整数
      .withColumn("temperature", get_json_object(col("json"), "$.temperature").cast("double")) // 转成小数
      .withColumn("load", get_json_object(col("json"), "$.load").cast("double"))
      .withColumn("voltage", get_json_object(col("json"), "$.voltage").cast("double"))
      // 时间字符串 "2026-02-16 22:04:37" 转成时间类型,这样后面能按时间查询
      .withColumn("event_time", to_timestamp(get_json_object(col("json"), "$.event_time"), "yyyy-MM-dd HH:mm:ss"))
      .drop("json") // 原始JSON列已经拆完了,不需要了,删掉省空间
      // 数据清洗:这几个字段缺一不可,只要有一个是空的就把整条记录丢掉
      // 类似于"考试身份证、准考证缺一不可,缺了就不让进考场"
      .na.drop(Seq("device_id", "status", "temperature", "load", "voltage", "event_time"))


    println("====== 2.数据转换完成,准备进行特征工程...")

    // =========================
    // 4)特征工程 —— 把多个字段"打包"成模型能看懂的格式
    //
    // 模型只认一种输入格式:一个叫 features 的"向量"(可以理解为一个打包好的数组)
    // 所以要把 status、temperature、load、voltage 这4个字段塞进一个向量里
    //
    // ⚠️ 重要:顺序必须和训练时完全一样!
    // 训练时是 [status, temperature, load, voltage],预测时也必须是这个顺序
    // 顺序错了模型就会"张冠李戴",把温度当电压看,预测肯定不准
    // =========================
    val assembler = new VectorAssembler()
      .setInputCols(Array("status", "temperature", "load", "voltage"))  // 输入:这4个字段
      .setOutputCol("features")                                          // 输出:打包成一个叫 features 的列

    val featStream = assembler.transform(streamDF)
      .select("device_id", "event_time", "features") // 后面预测只需要这3列,其它不要了

    println("====== 3.特征工程完成,开始进行实时预测...")

    // =========================
    // 加载训练好的模型 —— 只加载一次!
    //
    // 为什么放在循环外面?
    // 模型文件存在HDFS上,每次加载都要走网络,很慢(就像每次做菜都要去超市买锅一样)
    // 所以一开始就把模型"放到内存里",后面每批数据都用这个模型,速度快很多
    // =========================
    val model = RandomForestClassificationModel.load(latestModelPath)
    println("====== 4.模型加载完成,准备开始接收数据进行预测...")

    // =========================
    // 5)核心处理逻辑:每来一批数据就预测一次,然后写到数据库
    //
    // 流式数据是源源不断的,Spark会把它切成"一小批一小批"来处理
    // (比如每5秒攒一批,这批可能有几百条数据)
    // foreachBatch 的意思就是:每来一批,就执行一次下面的代码
    // =========================
    val query = featStream.writeStream
      .outputMode("append")                              // 追加模式:新数据接在后面,不覆盖旧的
      .option("checkpointLocation", checkpointPath)      // 用前面配的"记账本"记录进度
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        // batchDF:这一批的数据
        // batchId:这是第几批(0、1、2、3...)

        // 如果这批一条数据都没有(比如设备暂时没发数据),就跳过,什么都不做
        if (!batchDF.isEmpty) {

          // 5.1 用模型预测:输入特征 → 输出"是否异常"
          val pred = model.transform(batchDF)
            // 模型预测后会多一列 probability,值是一个向量,长这样:[正常概率, 异常概率]
            // 比如 [0.8, 0.2] 表示"80%概率正常,20%概率异常"
            // 我们关心的是异常概率(下标1的那个值),把它单独拿出来变成一列
            .withColumn("prob1", vector_to_array(col("probability")).getItem(1))

          // 5.2 整理要写入数据库的字段
          // 取当前系统时间作为"入库时间",格式化成 "2026-05-15 10:30:25" 这种样子
          val createdAt = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))

          val out = pred.select(
            lit("latest").as("model_version"),                // 模型版本:写死 "latest"(表示用的是最新模型)
            col("device_id"),                                 // 设备编号
            col("event_time"),                                // 数据原本的时间
            lit(null).cast("int").as("label"),                // 真实标签:实时数据当下不知道是不是真异常,先空着占个位
            col("prediction").cast("int").as("prediction"),   // 预测结果:0=正常,1=异常
            col("prob1").cast("double").as("prob1"),          // 异常概率:0~1之间,越接近1越危险
            lit(createdAt).as("created_at")                   // 入库时间:这批是几点几分写进去的
          )

          out.cache()              // 缓存到内存:下面要用两次(算count + 写库),缓存了就不用重新计算
          val cnt = out.count()    // 数一下这批有多少条数据(只是为了打印日志)

          // 5.3 写入 ClickHouse 数据库 —— 大屏就是从这张表读数据展示的
          out.write
            .format("jdbc")                       // 用 JDBC 方式连数据库
            .option("url", ckUrl)                 // 数据库地址
            .option("driver", ckDriver)           // 驱动(就是连接数据库的"翻译官")
            .option("dbtable", ckPredTable)       // 写到哪张表
            .mode("append")                       // 追加模式:新数据接在后面,不会清空原有数据
            .save()

          out.unpersist()  // 用完释放内存,不然内存会被占满

          // 打印日志,方便看到任务在正常跑
          println(s"第 $batchId 批数据已写入 ClickHouse,包含 $cnt 条记录")
        }
      }
      .start()

    println("====== 5.实时预测作业开始,等待数据处理...")

    // 让程序一直跑下去(不会自己退出),除非手动停止或者出错
    // 否则程序一启动就立刻结束了,根本来不及处理数据
    query.awaitTermination()
  }
}



7.5 验证结果

-- 在 ClickHouse 查最新 5 条(按写入时间), 实时数据在持续写入数据库
SELECT *
FROM iot_report.dws_device_pred_detail
ORDER BY created_at DESC
LIMIT 5;

运行结果:

┌─model_version─┬─device_id──┬──────────event_time─┬─label─┬─prediction─┬────────────────prob1─┬──────────created_at─┐
│ latest        │ device-050 │ 2026-04-30 15:37:57 │  ᴺᵁᴸᴸ │          0 │ 0.009223852744338843 │ 2026-04-30 15:38:04 │
│ latest        │ device-007 │ 2026-04-30 15:38:02 │  ᴺᵁᴸᴸ │          1 │   0.9037160693542747 │ 2026-04-30 15:38:04 │
│ latest        │ device-008 │ 2026-04-30 15:38:02 │  ᴺᵁᴸᴸ │          0 │ 0.007730614385868521 │ 2026-04-30 15:38:04 │
│ latest        │ device-006 │ 2026-04-30 15:38:02 │  ᴺᵁᴸᴸ │          0 │ 0.007307107142851261 │ 2026-04-30 15:38:04 │
│ latest        │ device-009 │ 2026-04-30 15:38:02 │  ᴺᵁᴸᴸ │          0 │  0.00798883423233388 │ 2026-04-30 15:38:04 │
└───────────────┴────────────┴─────────────────────┴───────┴────────────┴──────────────────────┴──────────────

NULL 写入正常label 列全是 NULL,符合预期——实时流没有真实标签,占位而已。


7.6 打包与部署

1)本地/服务器打包

IDEA → Maven → spark-ods-ingest →  clean → package

# 运行打包程序后,生成target目录,内含 spark-job-1.0.0.jar 包

2)上传 Jar包 到服务器

复制 spark-job-1.0.0.jar 到 master:/opt/jars/

3) 执行Jar包

# 方法一:本地模式(调试用,在本地模式运行 Spark,并使用 1 个线程)
spark-submit \
  --class com.demo.spark.DevicePredictStreamJob \
  --master local[1] \
  /opt/jars/spark-job-1.0.0.jar
# 方法二:YARN client 模式(半集群模式)
#
# 运行机制:
#   Driver(司令部)在你提交命令的这台机器上运行,
#   Executor(干活的工人)在 YARN 集群的其他节点上运行。
#   适合开发调试:能在当前终端直接看到日志输出。
#
# ⚠️ 注意:必须先把源码中的 .master("local[*]") 注释掉,再重新打jar包!
# 原因:代码里的 .master() 优先级高于 spark-submit 的 --master 参数,会把它覆盖掉。
# 后果:程序不会报错,但实际上变成在本机单机运行,YARN 集群根本没参与。
spark-submit \
  --class com.demo.spark.DevicePredictStreamJob \
  --master yarn \
  --deploy-mode client \
  /opt/jars/spark-job-1.0.0.jar
# 方法三:YARN cluster 模式(⭐企业生产场景运行模式)
#
# 运行机制:
#   Driver(司令部)和 Executor(工人)全部在 YARN 集群里运行,
#   你的本机只负责"提交任务"这一下,提交完就可以关掉终端了。
#   适合生产环境:任务交给集群自己管,本机断网/关机都不影响作业运行。
#
# ⚠️ 注意:必须先把源码中的 .master("local[*]") 注释掉,再重新打jar包!
# 原因:代码里的 .master() 优先级高于 spark-submit 的 --master 参数,会把它覆盖掉。
# 后果:Driver 被分配到 YARN 容器里运行,却又被代码强制要求以 local 模式启动,
#       两者冲突 → 任务直接报错失败。
spark-submit \
  --class com.demo.spark.DevicePredictStreamJob \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 512m \
  --executor-memory 512m \
  --executor-cores 1 \
  --num-executors 1 \
  --conf spark.sql.shuffle.partitions=1 \
  /opt/jars/spark-job-1.0.0.jar

# 参数说明(为什么这些值这么设):
#   --driver-memory 512m       Driver(司令部)分配的内存,512m 是教学演示值,生产建议 ≥ 2g
#   --executor-memory 512m     每个 Executor(工人)分配的内存
#   --executor-cores 1         每个 Executor 用几个 CPU 核心
#   --num-executors 1          总共启动几个 Executor(工人数量)
#   --conf spark.sql.shuffle.partitions=1   shuffle 阶段并行度,设为 1 仅适合教学演示,
#                                            生产环境通常用默认 200 或按数据量调整




发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1