(Kafka → Hive ODS 明细层)
8.1 实验目标
本实验完成:
✅ 使用 Spark Structured Streaming 实时读取 Kafka 数据流
✅ 实时解析 JSON 数据,提取设备状态、温度、负载等信息
✅ 将处理后的数据写入 Hive ODS 分区表,支持按日期分区存储
✅ 为后续的历史数据分析和实时监控提供数据支持,确保数据处理的准确性与实时性
8.2 数据入库链路说明
8.3 启动实验环境
(1)启动大数据环境
# 启动 Hadoop start-dfs.sh # 在master操作 start-yarn.sh # 在slave1操作 # 启动 Hive 元数据服务 hive --service metastore & # 启动 Hive 客户端 hive
(2)确认 Hive 表已创建
USE iot; SHOW TABLES;
确保存在:
ods_device_status_di
8.4 实施步骤
(1)创建 Spark 作业
在IDEA中创建 spark-ods-ingest 项目
创建 scala 模块文件路径:
src/main/scala/com/demo/spark/KafkaToHiveDeviceStatusJob.scala
方法:
右键 com.demo.spark → New → Scala Class
会弹出创建窗口,选择:
Object
然后输入文件名:
KafkaToHiveDeviceStatusJob
(2) 复制 hive-site.xml 文件到 IntelliJ IDEA 中
在 IntelliJ IDEA 中,将配置好的 hive-site.xml 文件复制到IDEA项目的 resources 目录下
目的:确保 Spark 能够读取 Hive 配置信息并正确连接到 Hive。
(3)复制 core-site.xml和 hdfs-site.xml 文件到 IntelliJ IDEA 中
同样地,将Hadoop的配置文件 core-site.xml 和 hdfs-site.xml 文件复制到IDEA项目的 resources 目录下。
目的:这些配置文件是在IDEA中运行项目时, Spark 连接到 HDFS 所必需的,它们提供了 HDFS 的核心配置信息,包括 NameNode 的地址和文件系统的基本配置。因为下面的代码把检查点设置到了HDFS路径上
(4)完整代码
package com.demo.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
/**
* 功能:实时读取Kafka设备数据 → 解析清洗 → 写入Hive分区表
* 技术:Spark 实时流计算
* 流程:Kafka → Spark → Hive
*/
object KafkaToHiveDeviceStatusJob {
def main(args: Array[String]): Unit = {
// ======================================================
// 1. 创建Spark程序入口
// ======================================================
// 设置HDFS操作用户,避免权限错误
System.setProperty("HADOOP_USER_NAME", "root")
val spark = SparkSession.builder()
.appName("KafkaToHiveDeviceStatusJob")
.master("local[*]") // 本地运行,集群运行需删除
.enableHiveSupport() // 开启Hive支持
.config("spark.sql.session.timeZone", "Asia/Shanghai")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.getOrCreate()
// 简化日志输出
spark.sparkContext.setLogLevel("ERROR")
println("Spark 启动成功,开始连接 Kafka...")
// ======================================================
// 2. 读取Kafka实时数据流
// ======================================================
val kafkaDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "master:9092")
.option("subscribe", "device_status_topic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
// 将Kafka数据转为字符串
val jsonDf = kafkaDf.selectExpr("CAST(value AS STRING) AS json")
// ======================================================
// 3. 解析JSON并清洗数据
// ======================================================
val parsedDf = jsonDf
// 从JSON中提取字段
.withColumn("event_type", get_json_object(col("json"), "$.event_type"))
.withColumn("device_id", get_json_object(col("json"), "$.device_id"))
.withColumn("status", get_json_object(col("json"), "$.status").cast("int"))
.withColumn("temperature", get_json_object(col("json"), "$.temperature").cast("double"))
.withColumn("load", get_json_object(col("json"), "$.load").cast("double"))
.withColumn("voltage", get_json_object(col("json"), "$.voltage").cast("double"))
.withColumn("event_time", get_json_object(col("json"), "$.event_time"))
// 生成时间字段与分区字段
.withColumn("ts", to_timestamp(col("event_time"), "yyyy-MM-dd HH:mm:ss"))
.withColumn("dt", date_format(col("ts"), "yyyy-MM-dd"))
// 过滤空值脏数据
.filter(col("device_id").isNotNull && col("ts").isNotNull)
// 最终要写入Hive的字段
.select("event_type", "device_id", "status", "temperature",
"load", "voltage", "event_time", "ts", "dt")
// ======================================================
// 4. 实时写入Hive(每10秒一批)
// ======================================================
val query = parsedDf.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime("10 seconds"))
.option("checkpointLocation", "hdfs:///ck/kafka_to_hive_device_status")
// 批量处理每一批数据
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
val total = batchDF.count()
println(s"\n批次 $batchId 数据量:$total 条")
// 打印预览数据
batchDF.show(6, truncate = false)
// 写入Hive
batchDF.write.mode("append").insertInto("iot.ods_device_status_di")
println(s"批次 $batchId 写入 Hive 成功\n")
}
.start()
println("流任务已启动,每10秒处理一批数据")
// 让程序持续运行
query.awaitTermination()
}
}8.5 启动Kafka实时采集数据
(1)启动Kafka
# 在master/slave1/slave2启动 ZooKeeper zkServer.sh start # 在master/slave1/slave2检查 ZooKeeper状态 zkServer.sh status # 在master/slave1/slave2启动 Kafka kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties
(2)写入状态流数据到 Kafka
# 在master节点中操作【新开窗口】 cd /opt/datas # 启动设备状态模拟脚本,并将数据写入 Kafka python 01_device_status_sim.py | \ kafka-console-producer.sh --broker-list master:9092 --topic device_status_topic
8.6 在IDEA运行作业
8.7 结果验证
在Hive的shell中运行
1️⃣ 查看 Hive 数据
# 查看最新的5条数据;(由于 ORDER BY 需要进行全局排序,Hive 会启动 MapReduce 作业来完成排序操作。) SELECT * FROM iot.ods_device_status_di ORDER BY ts DESC LIMIT 5;
2️⃣ 查看数据表分区
SHOW PARTITIONS iot.ods_device_status_di;
若出现:
hive (default)> SHOW PARTITIONS iot.ods_device_status_di; OK partition dt=2026-01-01 dt=2026-01-02 dt=2026-01-03 dt=2026-01-04 ............. dt=2026-04-20
最后一行看到今日的日期,说明数据入湖成功。
8.8 打包与部署(服务器长期运行)
1)本地/服务器打包
注意:打jar包前需要把代码中的
.master("local[*]")删除或注释掉,否则Spark 的设计是:代码里显式设置的参数优先级高于命令行参数。所以命令行传
--master yarn完全不起作用。
IDEA → Maven → spark-ods-ingest → clean → package # 运行打包程序后,生成target目录,内含 spark-ods-ingest-1.0.0.jar 包
2)上传 JAR 到服务器
复制 spark-ods-ingest-1.0.0.jar 到 master:/opt/jars/
3)执行任务
# 方法一:本地模式(调试用)
spark-submit \
--class com.demo.spark.KafkaToHiveDeviceStatusJob \
--master local[1] \
/opt/jars/spark-job-1.0.0.jar
# 方法二:YARN client 模式(半集群模式)
# 注意:需要把源码中的 .master("local[*]") 语句注释掉后,重新打jar包运行,否则会报错
spark-submit \
--class com.demo.spark.KafkaToHiveDeviceStatusJob \
--master yarn \
--deploy-mode client \
/opt/jars/spark-job-1.0.0.jar# 方法三:YARN cluster 模式(⭐企业生产场景运行模式)
# 注意:需要把源码中的 .master("local[*]") 语句注释掉后,重新打jar包运行,否则会报错
spark-submit \
--class com.demo.spark.KafkaToHiveDeviceStatusJob \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=1 \
/opt/jars/spark-job-1.0.0.jar