李翔-大数据技术

Big data technology!

离线分析实验06 - Spark 实时消费 Kafka 结果入库【2026-4-27】

拓展实验六:Spark 实时消费 Kafka 结果入库


补充实验:此实验的数据没有呈现在数据大屏


11.1 实验目标

1.掌握 Spark 实时流计算的基本开发流程,理解 Structured Streaming 微批处理机制。

2.学会使用 Spark 实时消费 Kafka 中的设备 JSON 数据,并完成字段解析与数据过滤。

3.掌握基于流数据的实时聚合统计,实时统计:平均温度、最高温度、平均负载、平均电压

4.实现实时计算结果通过 JDBC 写入 ClickHouse,为大数据大屏提供标准统计数据源。

5.理解 Kafka → Spark → ClickHouse 实时数仓架构,提升大数据综合实战能力。


和前两张图的对比:



第一张第二张第三张(这张)
数据源KafkaHiveKafka
目的地HiveClickHouseClickHouse
模式流处理批处理流处理
触发间隔10秒手动运行5秒
用途存原始数据离线报表实时大屏



11.2 数据流动说明

设备数据 → Kafka 消息缓存 → Spark 实时解析与聚合统计 → ClickHouse 结果入库,形成完整的实时数仓入库链路


11.3 ClickHouse 表设计

-- 1.启动多行模式(在master中操作)
clickhouse-client -m

-- 2.检查 iot_report 数据库是否存在
show databases;

-- 3.选择数据库
use iot_report;

-- 4.创建实时设备统计报表
CREATE TABLE IF NOT EXISTS iot_report.dws_device_real_report
(
    device_id   String,
    dt          Date,
    avg_temp    Float64,
    max_temp    Float64,
    avg_load    Float64,
    avg_voltage Float64
) ENGINE = MergeTree()
PARTITION BY dt
ORDER BY (device_id, dt);

-- 5.查看数据表
show tables;



11.4 完整代码

完成代码填空

package com.demo.spark

// TODO 1: 导入 DataFrame 和 SparkSession(填写正确的类名)
import org.apache.spark.sql.{__________, SparkSession}
import org.apache.spark.sql.functions._
// TODO 2: 导入流式触发器 Trigger(填写正确的类名)
import org.apache.spark.sql.streaming.__________

object KafkaToClickHouseRealTimeJob {

  def main(args: Array[String]): Unit = {

    // ========================== 统一配置 ==========================
    val kafkaServers = "master:9092"
    val kafkaTopic   = "device_status_topic"
    val ckUrl        = "jdbc:clickhouse://master:8123/iot_report"
    val ckDriver     = "com.clickhouse.jdbc.ClickHouseDriver"
    val ckTable      = "dws_device_real_report"
    // =============================================================

    // TODO 3: 设置 HADOOP_USER_NAME 系统属性 key(填写环境变量名)
    System.setProperty("__________", "root")

    // TODO 4: 创建 SparkSession 的入口类(填写类名)
    val spark = __________.builder()
      // TODO 5: 设置应用名称为 "KafkaToClickHouseRealTime"
      .appName("__________")
      // TODO 6: 设置运行模式为本地所有核心
      .master("__________")
      // TODO 7: 设置时区为 "Asia/Shanghai",避免时间差 8 小时
      .config("spark.sql.session.timeZone", "__________")
      // TODO 8: 补全获取或创建 SparkSession 的方法名
      .__________()

    // TODO 9: 设置日志级别屏蔽 INFO,只保留 ERROR 日志
    spark.sparkContext.setLogLevel("__________")
    println("Spark 启动成功,开始实时处理...")


    // ==========================
    // 2. 读取 Kafka 实时数据
    // ==========================

    // TODO 10: 持续读取数据应使用 readStream 而非 read,填写正确方法名
    val kafkaDF = spark.__________
      // TODO 11: 数据源类型设置为 kafka
      .format("__________")
      // TODO 12: 配置 Kafka 服务器地址,填写正确的 option key
      .option("__________", kafkaServers)
      // TODO 13: 订阅 Kafka Topic,填写正确的 option key
      .option("__________", kafkaTopic)
      // TODO 14: 启动时只读取最新数据,不处理历史积压数据(填写偏移量策略)
      .option("startingOffsets", "__________")
      // TODO 15: 数据丢失时不报错继续运行(填写 true 或 false)
      .option("failOnDataLoss", "__________")
      .load()

    // TODO 16: 把 Kafka 的二进制 value 字段转为字符串,列名为 json
    val jsonDF = kafkaDF.selectExpr("__________")


    // ==========================
    // 3. 解析 JSON(已增加 voltage)
    // ==========================

    val deviceDF = jsonDF
      // TODO 17: 从 json 列中提取 device_id 字段(补全 JSON 路径,以 $ 开头)
      .withColumn("device_id",   get_json_object(col("json"), "__________"))
      // TODO 18: 提取 temperature 字段并转为 double 类型(补全 JSON 路径)
      .withColumn("temperature", get_json_object(col("json"), "__________").cast("double"))
      .withColumn("load",        get_json_object(col("json"), "$.load").cast("double"))
      // TODO 19: 提取 voltage 字段并转为 double 类型(补全 cast 的目标类型)
      .withColumn("voltage",     get_json_object(col("json"), "$.voltage").cast("__________"))
      // TODO 20: 新增 dt 列,值为当前日期,用于按天分区(使用内置函数)
      .withColumn("dt",          __________)
      // TODO 21: 过滤掉 device_id 为 null 的脏数据(填写判断方法名)
      .filter(col("device_id").__________)


    // ==========================
    // 4. 实时统计(按设备 + 日期分组)
    // ==========================

    // TODO 22: 按 device_id 和 dt 进行分组聚合(填写分组方法名)
    val reportDF = deviceDF
      .__________("device_id", "dt")
      .agg(
        // TODO 23: 计算今日平均温度,列名为 avg_temp(填写聚合函数)
        __________("temperature").as("avg_temp"),
        // TODO 24: 计算今日最高温度,列名为 max_temp(填写聚合函数)
        __________("temperature").as("max_temp"),
        avg("load").as("avg_load"),
        // TODO 25: 计算今日平均电压,列名为 avg_voltage(填写聚合函数)
        __________("voltage").as("avg_voltage")
      )


    // ==========================
    // 5. 实时写入 ClickHouse
    // ==========================

    val query = reportDF.writeStream
      // TODO 26: 聚合统计场景使用 update 模式,只输出有变化的行(不能用 complete/append)
      .outputMode("__________")
      // TODO 27: 补全 checkpoint 配置的 option key 名称
      .option("__________", "hdfs://master:9000/spark/checkpoint/device_realtime")
      // TODO 28: 每 5 秒触发一次批处理(带时间单位,如 "5 seconds")
      .trigger(Trigger.ProcessingTime("__________"))
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>

        if (batchDF.count() > 0) {
          println(s"批次 $batchId 实时统计结果:")
          batchDF.show(false)

          batchDF.write
            .format("jdbc")
            .option("url", ckUrl)
            .option("dbtable", ckTable)
            .option("user", "default")
            .option("driver", ckDriver)
            // TODO 29: 一批写入 1000 行,提升写入性能(填写 option 的值)
            .option("batchsize", __________)
            // TODO 30: ClickHouse 不支持事务,isolationLevel 必须设为 NONE
            .option("isolationLevel", "__________")
            .mode("append")
            .save()

          println("实时数据已写入 ClickHouse!")
        }
      }
      .start()

    query.awaitTermination()
    spark.stop()
  }
}

参考源代码

image.png

11.5 启动环境

(1)启动Hadoop

# 启动 Hadoop
start-dfs.sh   # 在master操作
start-yarn.sh  # 在slave1操作


(2)启动Kafka

# 在master/slave1/slave2启动 ZooKeeper
zkServer.sh start

# 在master/slave1/slave2检查 ZooKeeper状态
zkServer.sh status

# 在master/slave1/slave2启动 Kafka
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties

(3)写入状态流数据到 Kafka

# 在master节点中操作【新开窗口】
cd /opt/datas
# 启动设备状态模拟脚本,并将数据写入 Kafka
python 01_device_status_sim.py | \
kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic

(4)验证实时数据

# 从 device_status_topic 主题中实时消费(查看)Kafka中的设备状态数据流
kafka-console-consumer.sh --bootstrap-server master:9092 \
--topic device_status_topic


11.6 IDEA运行代码


11.7 验证结果

-- 查看clickhouse中的dws_temp_trend_day表数据
-- 实时统计:平均温度、最高温度、平均负载、平均电压
SELECT * FROM iot_report.dws_device_real_report ORDER BY dt;


11.8 打包运行

# 方法一:本地模式(调试用)
spark-submit \
  --class com.demo.spark.KafkaToClickHouseRealTimeJob \
  --master local[1] \
  /opt/jars/spark-job-1.0.0.jar




发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1