李翔-大数据技术

Big data technology!

离线分析实验07 - Spark 实时异常检测与告警入库【2026-4-28】

拓展实验七:Spark 实时异常检测与告警入库


12.1 实验目标

  1. 掌握 Spark 实时读取 Kafka 数据的基本流程

  2. 学会用 when 条件判断实现异常检测

  3. 实现将告警数据实时写入 ClickHouse


12.2 数据流动说明

Kafka → Spark(解析JSON → 判断异常) → ClickHouse

12.3 ClickHouse 建表

-- 启动clickhouse
clickhouse-client -m

-- 创建表
CREATE TABLE IF NOT EXISTS iot_report.dwd_device_alert_detail
(
    device_id   String,
    alert_type  String,    -- 注释:TEMP_HIGH / LOAD_HIGH / VOLTAGE_ABNORMAL
    alert_level String,    -- 注释:HIGH / MEDIUM
    temperature Float64,
    load        Float64,
    voltage     Float64,
    alert_msg   String,
    alert_time  DateTime,
    dt          Date
) ENGINE = MergeTree()
PARTITION BY dt
ORDER BY (device_id, alert_time);




12.4 告警规则


告警类型触发条件级别
TEMP_HIGH温度 > 80°CHIGH
LOAD_HIGH负载 > 90%MEDIUM
VOLTAGE_ABNORMAL电压 < 180V 或 > 260VMEDIUM


同时满足多个条件时,只命中第一个满足的规则。


完整的代码

package com.demo.spark

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

/**
 * 学生实验:Kafka → Spark Structured Streaming → ClickHouse 实时告警
 *
 * 实验目标:
 *   1. 掌握 Spark Structured Streaming 读取 Kafka 数据
 *   2. 掌握 JSON 字段解析与类型转换
 *   3. 掌握 when/otherwise 条件判断生成告警
 *   4. 掌握 foreachBatch 写入 ClickHouse
 *
 * 共 40 个 TODO 填空任务,请将每个 _________ 替换为正确代码
 */
object KafkaToClickHouseAlertJob {

  def main(args: Array[String]): Unit = {

    // ==================== 配置区 ====================
    // TODO 1: 填写 Kafka 服务器地址(host:port 格式)
    val kafkaServers = "_________"

    // TODO 2: 填写要订阅的 Kafka 主题名
    val kafkaTopic   = "_________"

    // TODO 3: 填写 ClickHouse 的 JDBC 连接地址(库名 iot_report)
    val ckUrl        = "_________"

    // TODO 4: 填写 ClickHouse 的 JDBC 驱动类全限定名
    val ckDriver     = "_________"

    // TODO 5: 填写要写入的 ClickHouse 目标表名
    val ckTable      = "_________"
    // ================================================

    // TODO 6: 设置 HADOOP_USER_NAME 系统属性为 root(用于 HDFS checkpoint 写入权限)
    System.setProperty("_________", "_________")

    // TODO 7: 创建 SparkSession,链式调用的起点方法
    val spark = SparkSession._________()
      // TODO 8: 设置应用名称为 "KafkaToClickHouseAlertJob"
      ._________("KafkaToClickHouseAlertJob")
      // TODO 9: 设置运行模式为本机所有核心
      .master("_________")
      // TODO 10: 设置时区为上海,避免时间相差 8 小时
      .config("spark.sql.session.timeZone", "_________")
      // TODO 11: 获取或创建 SparkSession 实例
      ._________()

    // TODO 12: 把日志级别调到 ERROR,屏蔽冗余 INFO
    spark.sparkContext.setLogLevel("_________")


    // ============== 第一步:从 Kafka 读取流数据 ==============
    // TODO 13: 使用流式读取 API(提示:与普通 read 不同,会持续监听)
    val rawDF = spark._________
      // TODO 14: 指定数据源格式为 kafka
      .format("_________")
      .option("kafka.bootstrap.servers", kafkaServers)
      // TODO 15: 订阅主题,参数名是什么?
      .option("_________", kafkaTopic)
      // TODO 16: 只消费最新数据,不读历史,offset 应该填什么?
      .option("startingOffsets", "_________")
      .option("failOnDataLoss", "false")
      .load()
      // TODO 17: 把 Kafka 的 value(二进制)转成字符串列 json
      .selectExpr("_________ AS json")


    // ============== 第二步:解析 JSON 字段 ==============
    val deviceDF = rawDF
      // TODO 18: 用 get_json_object 提取 device_id 字段
      .withColumn("device_id",   get_json_object(col("json"), "_________"))
      // TODO 19: 提取 status 并转为 int 类型
      .withColumn("status",      get_json_object(col("json"), "$.status").cast("_________"))
      .withColumn("temperature", get_json_object(col("json"), "$.temperature").cast("double"))
      .withColumn("load",        get_json_object(col("json"), "$.load").cast("double"))
      .withColumn("voltage",     get_json_object(col("json"), "$.voltage").cast("double"))
      // TODO 20: 添加 alert_time 列,值为当前时间戳的内置函数
      .withColumn("alert_time",  _________())
      // TODO 21: 添加 dt 列,值为当前日期的内置函数(按天分区用)
      .withColumn("dt",          _________())
      // TODO 22: 过滤掉 device_id 为空的脏数据
      .filter(col("device_id")._________)


    // ============== 第三步:生成告警字段 ==============
    val alertDF = deviceDF

      // 告警类型判断:温度>45→TEMP_HIGH,负载>0.9→LOAD_HIGH,电压异常→VOLTAGE_ABNORMAL
      .withColumn("alert_type",
        // TODO 23: 条件判断的起始函数(类似 SQL 的 CASE WHEN)
        _________(col("temperature") > 45, "TEMP_HIGH")
          // TODO 24: 第二个条件:负载大于 0.9
          ._________(col("load") > 0.9, "LOAD_HIGH")
          // TODO 25: 第三个条件:电压 > 235 或 < 200(使用 or 连接)
          .when(col("voltage") > 235 _________ col("voltage") < 200, "VOLTAGE_ABNORMAL"))

      // 告警级别:温度超标→HIGH,其他→MEDIUM
      .withColumn("alert_level",
        when(col("temperature") > 45, "HIGH")
          // TODO 26: 都不满足时返回 MEDIUM 的方法名
          ._________("MEDIUM"))

      // 拼接告警消息文本
      .withColumn("alert_msg",
        // TODO 27: status 等于 0 的判断(DataFrame API 中三等号)
        when(col("status") _________ 0, lit("离线"))
          // TODO 28: 用 concat 拼接固定文字和数值列,固定文字用什么函数包装?
          .when(col("temperature") > 45, concat(_________("温度过高:"), col("temperature")))
          .when(col("load") > 0.9, concat(lit("负载过高:"), col("load")))
          .otherwise(concat(lit("电压异常:"), col("voltage"))))

      // TODO 29: 过滤掉正常数据,保留 alert_type 不为 null 的行
      .filter(col("alert_type")._________)

      // TODO 30: 选出最终要写入 ClickHouse 的 9 个字段(按建表顺序)
      ._________("device_id", "alert_type", "alert_level",
        "temperature", "load", "voltage",
        "alert_msg", "alert_time", "dt")


    // ============== 第四步:写入 ClickHouse ==============
    // TODO 31: 启动流式写出 API
    alertDF._________
      // TODO 32: 输出模式(告警只增不改,应该用哪种?append/update/complete)
      .outputMode("_________")
      // TODO 33: 设置 checkpoint 路径,用于故障恢复
      .option("_________", "hdfs://master:9000/spark/checkpoint/device_alert")
      // TODO 34: 每 5 秒触发一次微批处理
      .trigger(Trigger._________("5 seconds"))
      // TODO 35: 自定义每批处理逻辑的方法名
      ._________ { (batchDF: DataFrame, batchId: Long) =>

        // TODO 36: 判断这批数据是否非空(统计行数 > 0)
        if (batchDF._________ > 0) {

          println(s"批次 $batchId:${batchDF.count()} 条告警")
          batchDF.show(false)

          // 把告警数据通过 JDBC 写入 ClickHouse
          batchDF.write
            // TODO 37: 写入格式为 jdbc
            .format("_________")
            .option("url", ckUrl)
            // TODO 38: 指定目标表名的参数键
            .option("_________", ckTable)
            .option("user", "default")
            .option("driver", ckDriver)
            // ClickHouse 不支持事务,必须设为 NONE
            .option("isolationLevel", "NONE")
            // TODO 39: 写入模式为追加
            .mode("_________")
            .save()
        }
      }
      .start()
      // TODO 40: 让流任务持续运行不退出的方法
      ._________()
  }
}


参考源码:

image.png


12.6 启动环境

(1)启动Hadoop

# 启动 Hadoop
start-dfs.sh   # 在master操作
start-yarn.sh  # 在slave1操作

(2)启动Kafka

# 在master/slave1/slave2启动 ZooKeeper
zkServer.sh start

# 在master/slave1/slave2检查 ZooKeeper状态
zkServer.sh status

# 在master/slave1/slave2启动 Kafka
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties

(3)写入状态流数据到 Kafka

# 在master节点中操作【新开窗口】
cd /opt/datas
# 启动设备状态模拟脚本,并将数据写入 Kafka
python 01_device_status_sim.py | \
kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic

(4)验证实时数据

# 从 device_status_topic 主题中实时消费(查看)Kafka中的设备状态数据流
kafka-console-consumer.sh --bootstrap-server master:9092 \
--topic device_status_topic


12.6 IDEA运行代码


12.7 验证结果

# 在clickhouse中查看写入的告警数据
clickhouse-client --query \
 "SELECT device_id, alert_type, alert_level, alert_msg, alert_time
  FROM iot_report.dwd_device_alert_detail
  ORDER BY alert_time DESC LIMIT 10"


12.8 打包运行

# 方法一:本地模式(调试用)
spark-submit \
 --class com.demo.spark.KafkaToClickHouseAlertJob \
 --master local[1] \
 /opt/jars/spark-job-1.0.0.jar


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1