实验二:利用模型实时预测
7.1 工作场景
本实验模拟一个设备运行状态监控系统,结合实时数据流和机器学习模型进行异常检测。系统实时接收设备数据(如温度、负载、电压等),通过Kafka进行数据流传输,Spark进行数据处理与预测,最终将预测结果存储在ClickHouse数据库中以供大屏展示。
该系统的核心目标是实时预测设备的异常状态,通过预测结果帮助企业提前识别潜在的设备故障,降低故障风险并提高设备的可靠性。
7.2 实验目标
数据流读取与处理:通过Spark从Kafka实时读取设备状态数据,并进行预处理,转换为结构化数据。
特征工程:将多个输入特征(如设备状态、温度、负载、电压等)合并为一个特征向量,供机器学习模型使用。
实时预测:加载最新的训练好的随机森林模型,并实时对每批数据进行预测,输出异常概率。前提:实验一经过训练已经把最新模型覆盖保存到
hdfs:///model/device_abnormal/latest结果存储与展示:将预测结果(设备ID、预测结果、异常概率等)实时写入ClickHouse数据库
iot_report.dws_device_pred_detail,供大屏展示。性能优化与稳定性:使用
foreachBatch进行批处理,以优化数据的写入和处理性能,同时确保数据不丢失,通过Checkpoint记录处理进度。

路线:Kafka → Spark Streaming → 模型加载 → ClickHouse → 大屏
7.3 启动环境
# 启动 Hadoop start-dfs.sh start-yarn.sh # 在master/slave1/slave2启动 ZooKeeper zkServer.sh start # 在master/slave1/slave2检查 ZooKeeper状态 zkServer.sh status # 在master/slave1/slave2启动 Kafka kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties # 写入实时状态流数据到 Kafka # 1)在master节点中操作【新开窗口】 cd /opt/datas # 2)启动设备状态模拟脚本,并将数据写入 Kafka python 01_device_status_sim.py | \ kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic
7.4 实时预测 Job
package com.demo.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.RandomForestClassificationModel
import org.apache.spark.ml.functions.vector_to_array
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
/**
* 实时预测任务(给大屏用):
*
* 整个流程就像一条流水线:
* Kafka(设备一直在发数据) → Spark(接住数据处理) → 用训练好的模型预测 → 结果写到 ClickHouse → 大屏展示
*
* Kafka 发过来的数据长这样(一条JSON):
* {"status":1,"load":0.51,"event_type":"device_status","event_time":"2026-02-16 22:04:37","voltage":216.7,"device_id":"device-039","temperature":26.42}
*
* 字段含义:
* device_id:设备编号(哪台机器)
* status:设备状态(0关机/1开机)
* temperature:温度
* load:负载(机器忙不忙)
* voltage:电压
* event_time:这条数据是什么时候产生的
*/
object DevicePredictStreamJob {
def main(args: Array[String]): Unit = {
// =========================
// 0)先把要用的"地址、路径"配置好,方便后面直接用
// =========================
val kafkaServers = "master:9092" // Kafka 服务器地址(数据从这里来)
val topic = "device_status_topic" // Kafka 的"频道"名,我们订阅这个频道
// 模型存放的位置(在HDFS分布式文件系统上)
// latest 表示"最新版"——训练任务每次训练完都会把新模型覆盖到这里
val latestModelPath = "hdfs:///model/device_abnormal/latest"
// ClickHouse 数据库的连接信息(预测结果要写到这里,大屏从这里读)
val ckUrl = "jdbc:clickhouse://master:8123/iot_report?jdbcCompliant=false"
val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver"
val ckPredTable = "dws_device_pred_detail" // 表名:存预测结果的表(大屏查这张表)
// 检查点路径(记账本,记录"我处理到哪条数据了")
// 作用:程序万一挂了,重启后能接着之前的进度继续,不会重复处理也不会漏处理
val checkpointPath = "hdfs:///ckpt/device_abnormal_predict_kafka"
// 设置HDFS用户名(告诉HDFS:我是root用户,我有权限读写)
System.setProperty("HADOOP_USER_NAME", "root")
// =========================
// 1)创建 SparkSession —— 相当于"启动Spark引擎"
// 后面所有Spark操作都要通过这个对象来做
// =========================
val spark = SparkSession.builder()
.appName("DeviceAbnormalPredictKafkaStreamJob") // 给作业起个名字(方便在监控页面找到它)
.master("local[*]") // local[*] 表示在本机运行,用所有CPU核心(适合学习和调试)
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR") // 只打印错误日志,屏幕不会被刷屏
import spark.implicits._
// ==========================
// 2)开始接收 Kafka 数据流
// 注意:这里不是一次性读完,而是"持续监听",有新数据就处理
// ==========================
val streamRaw = spark.readStream
.format("kafka") // 数据源类型是 Kafka
.option("kafka.bootstrap.servers", kafkaServers) // Kafka 在哪
.option("subscribe", topic) // 订阅哪个频道
.option("startingOffsets", "latest") // 从"程序启动后产生的新数据"开始读(旧的不要)
.option("failOnDataLoss", "false") // 中间丢点数据不报错,继续跑
.load()
println("====== 1.正在读取 Kafka 实时数据流...")
// Kafka 发过来的是二进制数据,先转成普通字符串才能看懂
val jsonDF = streamRaw.selectExpr("CAST(value AS STRING) AS json")
// ==========================
// 3)拆JSON —— 把一整串JSON拆成一个个字段
// 比如把 {"device_id":"device-039","temperature":26.42,...}
// 拆成单独的列:device_id 列、temperature 列、...
// ==========================
val streamDF = jsonDF
.withColumn("event_type", get_json_object(col("json"), "$.event_type"))
.withColumn("device_id", get_json_object(col("json"), "$.device_id"))
.withColumn("status", get_json_object(col("json"), "$.status").cast("int")) // 转成整数
.withColumn("temperature", get_json_object(col("json"), "$.temperature").cast("double")) // 转成小数
.withColumn("load", get_json_object(col("json"), "$.load").cast("double"))
.withColumn("voltage", get_json_object(col("json"), "$.voltage").cast("double"))
// 时间字符串 "2026-02-16 22:04:37" 转成时间类型,这样后面能按时间查询
.withColumn("event_time", to_timestamp(get_json_object(col("json"), "$.event_time"), "yyyy-MM-dd HH:mm:ss"))
.drop("json") // 原始JSON列已经拆完了,不需要了,删掉省空间
// 数据清洗:这几个字段缺一不可,只要有一个是空的就把整条记录丢掉
// 类似于"考试身份证、准考证缺一不可,缺了就不让进考场"
.na.drop(Seq("device_id", "status", "temperature", "load", "voltage", "event_time"))
println("====== 2.数据转换完成,准备进行特征工程...")
// =========================
// 4)特征工程 —— 把多个字段"打包"成模型能看懂的格式
//
// 模型只认一种输入格式:一个叫 features 的"向量"(可以理解为一个打包好的数组)
// 所以要把 status、temperature、load、voltage 这4个字段塞进一个向量里
//
// ⚠️ 重要:顺序必须和训练时完全一样!
// 训练时是 [status, temperature, load, voltage],预测时也必须是这个顺序
// 顺序错了模型就会"张冠李戴",把温度当电压看,预测肯定不准
// =========================
val assembler = new VectorAssembler()
.setInputCols(Array("status", "temperature", "load", "voltage")) // 输入:这4个字段
.setOutputCol("features") // 输出:打包成一个叫 features 的列
val featStream = assembler.transform(streamDF)
.select("device_id", "event_time", "features") // 后面预测只需要这3列,其它不要了
println("====== 3.特征工程完成,开始进行实时预测...")
// =========================
// 加载训练好的模型 —— 只加载一次!
//
// 为什么放在循环外面?
// 模型文件存在HDFS上,每次加载都要走网络,很慢(就像每次做菜都要去超市买锅一样)
// 所以一开始就把模型"放到内存里",后面每批数据都用这个模型,速度快很多
// =========================
val model = RandomForestClassificationModel.load(latestModelPath)
println("====== 4.模型加载完成,准备开始接收数据进行预测...")
// =========================
// 5)核心处理逻辑:每来一批数据就预测一次,然后写到数据库
//
// 流式数据是源源不断的,Spark会把它切成"一小批一小批"来处理
// (比如每5秒攒一批,这批可能有几百条数据)
// foreachBatch 的意思就是:每来一批,就执行一次下面的代码
// =========================
val query = featStream.writeStream
.outputMode("append") // 追加模式:新数据接在后面,不覆盖旧的
.option("checkpointLocation", checkpointPath) // 用前面配的"记账本"记录进度
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// batchDF:这一批的数据
// batchId:这是第几批(0、1、2、3...)
// 如果这批一条数据都没有(比如设备暂时没发数据),就跳过,什么都不做
if (!batchDF.isEmpty) {
// 5.1 用模型预测:输入特征 → 输出"是否异常"
val pred = model.transform(batchDF)
// 模型预测后会多一列 probability,值是一个向量,长这样:[正常概率, 异常概率]
// 比如 [0.8, 0.2] 表示"80%概率正常,20%概率异常"
// 我们关心的是异常概率(下标1的那个值),把它单独拿出来变成一列
.withColumn("prob1", vector_to_array(col("probability")).getItem(1))
// 5.2 整理要写入数据库的字段
// 取当前系统时间作为"入库时间",格式化成 "2026-05-15 10:30:25" 这种样子
val createdAt = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
val out = pred.select(
lit("latest").as("model_version"), // 模型版本:写死 "latest"(表示用的是最新模型)
col("device_id"), // 设备编号
col("event_time"), // 数据原本的时间
lit(null).cast("int").as("label"), // 真实标签:实时数据当下不知道是不是真异常,先空着占个位
col("prediction").cast("int").as("prediction"), // 预测结果:0=正常,1=异常
col("prob1").cast("double").as("prob1"), // 异常概率:0~1之间,越接近1越危险
lit(createdAt).as("created_at") // 入库时间:这批是几点几分写进去的
)
out.cache() // 缓存到内存:下面要用两次(算count + 写库),缓存了就不用重新计算
val cnt = out.count() // 数一下这批有多少条数据(只是为了打印日志)
// 5.3 写入 ClickHouse 数据库 —— 大屏就是从这张表读数据展示的
out.write
.format("jdbc") // 用 JDBC 方式连数据库
.option("url", ckUrl) // 数据库地址
.option("driver", ckDriver) // 驱动(就是连接数据库的"翻译官")
.option("dbtable", ckPredTable) // 写到哪张表
.mode("append") // 追加模式:新数据接在后面,不会清空原有数据
.save()
out.unpersist() // 用完释放内存,不然内存会被占满
// 打印日志,方便看到任务在正常跑
println(s"第 $batchId 批数据已写入 ClickHouse,包含 $cnt 条记录")
}
}
.start()
println("====== 5.实时预测作业开始,等待数据处理...")
// 让程序一直跑下去(不会自己退出),除非手动停止或者出错
// 否则程序一启动就立刻结束了,根本来不及处理数据
query.awaitTermination()
}
}7.5 验证结果
-- 在 ClickHouse 查最新 5 条(按写入时间), 实时数据在持续写入数据库 SELECT * FROM iot_report.dws_device_pred_detail ORDER BY created_at DESC LIMIT 5;
运行结果:
┌─model_version─┬─device_id──┬──────────event_time─┬─label─┬─prediction─┬────────────────prob1─┬──────────created_at─┐
│ latest │ device-050 │ 2026-04-30 15:37:57 │ ᴺᵁᴸᴸ │ 0 │ 0.009223852744338843 │ 2026-04-30 15:38:04 │
│ latest │ device-007 │ 2026-04-30 15:38:02 │ ᴺᵁᴸᴸ │ 1 │ 0.9037160693542747 │ 2026-04-30 15:38:04 │
│ latest │ device-008 │ 2026-04-30 15:38:02 │ ᴺᵁᴸᴸ │ 0 │ 0.007730614385868521 │ 2026-04-30 15:38:04 │
│ latest │ device-006 │ 2026-04-30 15:38:02 │ ᴺᵁᴸᴸ │ 0 │ 0.007307107142851261 │ 2026-04-30 15:38:04 │
│ latest │ device-009 │ 2026-04-30 15:38:02 │ ᴺᵁᴸᴸ │ 0 │ 0.00798883423233388 │ 2026-04-30 15:38:04 │
└───────────────┴────────────┴─────────────────────┴───────┴────────────┴──────────────────────┴──────────────
NULL 写入正常:
label列全是NULL,符合预期——实时流没有真实标签,占位而已。
7.6 打包与部署
1)本地/服务器打包
IDEA → Maven → spark-ods-ingest → clean → package
# 运行打包程序后,生成target目录,内含 spark-job-1.0.0.jar 包
2)上传 Jar包 到服务器
复制 spark-job-1.0.0.jar 到 master:/opt/jars/
3) 执行Jar包
# 方法一:本地模式(调试用,在本地模式运行 Spark,并使用 1 个线程) spark-submit \ --class com.demo.spark.DevicePredictStreamJob \ --master local[1] \ /opt/jars/spark-job-1.0.0.jar
# 方法二:YARN client 模式(半集群模式)
#
# 运行机制:
# Driver(司令部)在你提交命令的这台机器上运行,
# Executor(干活的工人)在 YARN 集群的其他节点上运行。
# 适合开发调试:能在当前终端直接看到日志输出。
#
# ⚠️ 注意:必须先把源码中的 .master("local[*]") 注释掉,再重新打jar包!
# 原因:代码里的 .master() 优先级高于 spark-submit 的 --master 参数,会把它覆盖掉。
# 后果:程序不会报错,但实际上变成在本机单机运行,YARN 集群根本没参与。
spark-submit \
--class com.demo.spark.DevicePredictStreamJob \
--master yarn \
--deploy-mode client \
/opt/jars/spark-job-1.0.0.jar# 方法三:YARN cluster 模式(⭐企业生产场景运行模式)
#
# 运行机制:
# Driver(司令部)和 Executor(工人)全部在 YARN 集群里运行,
# 你的本机只负责"提交任务"这一下,提交完就可以关掉终端了。
# 适合生产环境:任务交给集群自己管,本机断网/关机都不影响作业运行。
#
# ⚠️ 注意:必须先把源码中的 .master("local[*]") 注释掉,再重新打jar包!
# 原因:代码里的 .master() 优先级高于 spark-submit 的 --master 参数,会把它覆盖掉。
# 后果:Driver 被分配到 YARN 容器里运行,却又被代码强制要求以 local 模式启动,
# 两者冲突 → 任务直接报错失败。
spark-submit \
--class com.demo.spark.DevicePredictStreamJob \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=1 \
/opt/jars/spark-job-1.0.0.jar
# 参数说明(为什么这些值这么设):
# --driver-memory 512m Driver(司令部)分配的内存,512m 是教学演示值,生产建议 ≥ 2g
# --executor-memory 512m 每个 Executor(工人)分配的内存
# --executor-cores 1 每个 Executor 用几个 CPU 核心
# --num-executors 1 总共启动几个 Executor(工人数量)
# --conf spark.sql.shuffle.partitions=1 shuffle 阶段并行度,设为 1 仅适合教学演示,
# 生产环境通常用默认 200 或按数据量调整