李翔-大数据技术

Big data technology!

离线分析实验04-Spark 实时数据入湖【2026-4-20】

实验四:Spark 实时数据入湖

(Kafka → Hive ODS 明细层)


8.1 实验目标

本实验完成:

使用 Spark Structured Streaming 实时读取 Kafka 数据流

实时解析 JSON 数据,提取设备状态、温度、负载等信息

将处理后的数据写入 Hive ODS 分区表,支持按日期分区存储

为后续的历史数据分析和实时监控提供数据支持,确保数据处理的准确性与实时性


8.2 数据入库链路说明


8.3 启动实验环境

(1)启动大数据环境

# 启动 Hadoop
start-dfs.sh   # 在master操作
start-yarn.sh  # 在slave1操作

# 启动 Hive 元数据服务
hive --service metastore &

# 启动 Hive 客户端
hive

(2)确认 Hive 表已创建

USE iot;

SHOW TABLES;

确保存在:

ods_device_status_di


8.4 实施步骤


(1)创建 Spark 作业

在IDEA中创建 spark-ods-ingest 项目

创建 scala 模块文件路径:

src/main/scala/com/demo/spark/KafkaToHiveDeviceStatusJob.scala

方法:

右键 com.demo.spark → New → Scala Class

会弹出创建窗口,选择:

Object

然后输入文件名:

KafkaToHiveDeviceStatusJob


(2) 复制 hive-site.xml 文件到 IntelliJ IDEA 中

在 IntelliJ IDEA 中,将配置好的 hive-site.xml 文件复制到IDEA项目的 resources 目录下

目的:确保 Spark 能够读取 Hive 配置信息并正确连接到 Hive。


(3)复制 core-site.xmlhdfs-site.xml 文件到 IntelliJ IDEA 中

同样地,将Hadoop的配置文件 core-site.xmlhdfs-site.xml 文件复制到IDEA项目的 resources 目录下。

目的:这些配置文件是在IDEA中运行项目时, Spark 连接到 HDFS 所必需的,它们提供了 HDFS 的核心配置信息,包括 NameNode 的地址和文件系统的基本配置。因为下面的代码把检查点设置到了HDFS路径上



(4)完整代码

package com.demo.spark

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

/**
 * 功能:实时读取Kafka设备数据 → 解析清洗 → 写入Hive分区表
 * 技术:Spark 实时流计算
 * 流程:Kafka → Spark → Hive
 */
object KafkaToHiveDeviceStatusJob {

  def main(args: Array[String]): Unit = {

    // ======================================================
    // 1. 创建Spark程序入口
    // ======================================================
    // 设置HDFS操作用户,避免权限错误
    System.setProperty("HADOOP_USER_NAME", "root")

    val spark = SparkSession.builder()
      .appName("KafkaToHiveDeviceStatusJob")
      .master("local[*]")        // 本地运行,集群运行需删除
      .enableHiveSupport()       // 开启Hive支持
      .config("spark.sql.session.timeZone", "Asia/Shanghai")
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .getOrCreate()

    // 简化日志输出
    spark.sparkContext.setLogLevel("ERROR")
    println("Spark 启动成功,开始连接 Kafka...")

    // ======================================================
    // 2. 读取Kafka实时数据流
    // ======================================================
    val kafkaDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "master:9092")
      .option("subscribe", "device_status_topic")
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .load()

    // 将Kafka数据转为字符串
    val jsonDf = kafkaDf.selectExpr("CAST(value AS STRING) AS json")

    // ======================================================
    // 3. 解析JSON并清洗数据
    // ======================================================
    val parsedDf = jsonDf
      // 从JSON中提取字段
      .withColumn("event_type",  get_json_object(col("json"), "$.event_type"))
      .withColumn("device_id",   get_json_object(col("json"), "$.device_id"))
      .withColumn("status",      get_json_object(col("json"), "$.status").cast("int"))
      .withColumn("temperature", get_json_object(col("json"), "$.temperature").cast("double"))
      .withColumn("load",        get_json_object(col("json"), "$.load").cast("double"))
      .withColumn("voltage",     get_json_object(col("json"), "$.voltage").cast("double"))
      .withColumn("event_time",  get_json_object(col("json"), "$.event_time"))
      // 生成时间字段与分区字段
      .withColumn("ts",  to_timestamp(col("event_time"), "yyyy-MM-dd HH:mm:ss"))
      .withColumn("dt",  date_format(col("ts"), "yyyy-MM-dd"))
      // 过滤空值脏数据
      .filter(col("device_id").isNotNull && col("ts").isNotNull)
      // 最终要写入Hive的字段
      .select("event_type", "device_id", "status", "temperature",
        "load", "voltage", "event_time", "ts", "dt")

    // ======================================================
    // 4. 实时写入Hive(每10秒一批)
    // ======================================================
    val query = parsedDf.writeStream
      .outputMode("append")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .option("checkpointLocation", "hdfs:///ck/kafka_to_hive_device_status")
      // 批量处理每一批数据
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        val total = batchDF.count()
        println(s"\n批次 $batchId 数据量:$total 条")
        
        // 打印预览数据
        batchDF.show(6, truncate = false)
        
        // 写入Hive
        batchDF.write.mode("append").insertInto("iot.ods_device_status_di")
        println(s"批次 $batchId 写入 Hive 成功\n")
      }
      .start()

    println("流任务已启动,每10秒处理一批数据")
    // 让程序持续运行
    query.awaitTermination()
  }
}

8.5 启动Kafka实时采集数据

(1)启动Kafka

# 在master/slave1/slave2启动 ZooKeeper
zkServer.sh start

# 在master/slave1/slave2检查 ZooKeeper状态
zkServer.sh status

# 在master/slave1/slave2启动 Kafka
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties


(2)写入状态流数据到 Kafka

# 在master节点中操作【新开窗口】
cd /opt/datas
# 启动设备状态模拟脚本,并将数据写入 Kafka
python 01_device_status_sim.py | \
kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic



8.6 在IDEA运行作业


8.7 结果验证

在Hive的shell中运行

1️⃣ 查看 Hive 数据

# 查看最新的5条数据;(由于 ORDER BY 需要进行全局排序,Hive 会启动 MapReduce 作业来完成排序操作。)
SELECT * FROM iot.ods_device_status_di 
ORDER BY ts DESC 
LIMIT 5;

2️⃣ 查看数据表分区

SHOW PARTITIONS iot.ods_device_status_di;

若出现:

hive (default)> SHOW PARTITIONS iot.ods_device_status_di;
OK
partition
dt=2026-01-01
dt=2026-01-02
dt=2026-01-03
dt=2026-01-04
.............
dt=2026-04-20

最后一行看到今日的日期,说明数据入湖成功。


8.8 打包与部署(服务器长期运行)

1)本地/服务器打包

注意:打jar包前需要把代码中的 .master("local[*]") 删除或注释掉,否则

Spark 的设计是:代码里显式设置的参数优先级高于命令行参数。所以命令行传 --master yarn 完全不起作用。

IDEA → Maven → spark-ods-ingest →  clean → package

# 运行打包程序后,生成target目录,内含 spark-ods-ingest-1.0.0.jar 包

2)上传 JAR 到服务器

复制 spark-ods-ingest-1.0.0.jar 到 master:/opt/jars/

3)执行任务

# 方法一:本地模式(调试用)
spark-submit \
  --class com.demo.spark.KafkaToHiveDeviceStatusJob \
  --master local[1] \
  /opt/jars/spark-job-1.0.0.jar

# 方法二:YARN client 模式(半集群模式)
# 注意:需要把源码中的 .master("local[*]") 语句注释掉后,重新打jar包运行,否则会报错
spark-submit \
  --class com.demo.spark.KafkaToHiveDeviceStatusJob \
  --master yarn \
  --deploy-mode client \
  /opt/jars/spark-job-1.0.0.jar

# 方法三:YARN cluster 模式(⭐企业生产场景运行模式)
# 注意:需要把源码中的 .master("local[*]") 语句注释掉后,重新打jar包运行,否则会报错
spark-submit \
  --class com.demo.spark.KafkaToHiveDeviceStatusJob \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 512m \
  --executor-memory 512m \
  --executor-cores 1 \
  --num-executors 1 \
  --conf spark.sql.shuffle.partitions=1 \
  /opt/jars/spark-job-1.0.0.jar


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1