实验五:Spark 批处理历史温度趋势
10.1 实验目标
通过本实验,使学生掌握基于 Spark SQL 的离线数据分析流程,理解历史数据从 Hive → ClickHouse → 大屏展示 的完整数据流转过程,具体目标包括:
从 Hive 中表
iot.ods_device_status_di读取设备历史温度明细数据;按天(dt)对历史数据进行分组统计;
计算每日平均温度(avg_temp)与最大温度(max_temp);
清空 ClickHouse 表
iot_report.dws_temp_trend_day中的数据,防止重复写入。将计算结果写入 ClickHouse 表
iot_report.dws_temp_trend_day。从 ClickHouse 中读取数据进行验证,用于后续报表分析与可视化展示。
10.3 ClickHouse 表设计
启动ClickHouse客户端
# 启动多行模式(在master中操作) clickhouse-client -m
创建ClickHouse 表
如果没有指定数据库,ClickHouse 默认会将表创建在
default数据库中。default是 ClickHouse 中的默认数据库,所有没有显式指定数据库的表都会创建在该数据库下。
-- 1.创建报表数据库(如果不存在) CREATE DATABASE IF NOT EXISTS iot_report; -- 2.创建温度趋势报表表 CREATE TABLE IF NOT EXISTS iot_report.dws_temp_trend_day ( dt Date, avg_temp Float64, max_temp Float64 ) ENGINE = MergeTree() ORDER BY dt; -- 3.选择数据库 use iot_report; -- 4.查看数据表 show tables;
启动大数据环境
# 启动 Hadoop start-dfs.sh # 在master操作 start-yarn.sh # 在slave1操作 # 启动 Hive 元数据服务, # 或者确保命令在终端关闭时继续运行。nohup hive --service metastore & hive --service metastore & # 启动 Hive 客户端 hive
10.4 完整代码
在IDEA中创建Scala项目,文件路径
src/main/java/com/demo/spark/TempTrendBatchJob
完整代码:
package com.demo.spark
import org.apache.spark.sql.SparkSession
import java.sql.DriverManager
object TempTrendBatchJob {
def main(args: Array[String]): Unit = {
// ======================================================
// 创建 SparkSession,Spark 程序的入口
// ======================================================
val spark = SparkSession.builder()
.appName("TempTrendBatchJob")
.master("local[*]") // 本地调试时保留,集群提交时需删除
.enableHiveSupport() // 开启 Hive 支持
.getOrCreate()
// 设置日志级别为 ERROR,减少日志输出
spark.sparkContext.setLogLevel("ERROR")
// ClickHouse 配置
val ckUrl = "jdbc:clickhouse://master:8123/iot_report"
val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver"
val ckTable = "dws_temp_trend_day"
// ======================================================
// 从 Hive 读取数据,计算每日温度趋势
// ======================================================
println("====== 1. 从 Hive 读取并统计温度数据 ======")
spark.sql("USE iot")
val result = spark.sql(
"""
|SELECT dt,
| AVG(temperature) AS avg_temp,
| MAX(temperature) AS max_temp
|FROM ods_device_status_di
|GROUP BY dt
|ORDER BY dt
""".stripMargin)
println("====== 2. 每日温度趋势计算结果 ======")
result.show(false)
// ======================================================
// 清空 ClickHouse 表,避免重复写入
// ======================================================
println("====== 3. 清空 ClickHouse 表(避免重复写入) ======")
val conn = DriverManager.getConnection(ckUrl, "default", "")
val stmt = conn.createStatement()
stmt.execute("TRUNCATE TABLE iot_report." + ckTable)
stmt.close()
conn.close()
println("ClickHouse 表已清空")
// ======================================================
// 将计算结果写入 ClickHouse
// ======================================================
println("====== 4. 将计算结果写入 ClickHouse ======")
result.write
.format("jdbc")
.option("url", ckUrl)
.option("dbtable", ckTable)
.option("driver", ckDriver)
.mode("append")
.save()
println("====== 5. 写入完成:iot_report.dws_temp_trend_day ======")
// ======================================================
// 从 ClickHouse 读取数据,验证写入是否成功
// ======================================================
println("====== 6. 验证:从 ClickHouse 读取写入结果 ======")
val ckResult = spark.read
.format("jdbc")
.option("url", ckUrl)
.option("dbtable", s"(SELECT * FROM $ckTable ORDER BY dt) AS tmp")
.option("driver", ckDriver)
.load()
ckResult.show(false)
println("====== 任务执行完成 ======")
spark.stop() // 关闭 SparkSession
}
}-- 查看clickhouse中的dws_temp_trend_day表数据 SELECT * FROM iot_report.dws_temp_trend_day ORDER BY dt;
# 方法一:本地模式(调试用) spark-submit \ --class com.demo.spark.TempTrendBatchJob \ --master local[1] \ /opt/jars/spark-job-1.0.0.jar # 方法二:YARN client 模式(半集群模式) spark-submit \ --class com.demo.spark.TempTrendBatchJob \ --master yarn \ --deploy-mode client \ /opt/jars/spark-job-1.0.0.jar
# 方法三:YARN cluster 模式(⭐企业生产场景运行模式)
# 注意:需要把源码中的 .master("local[*]") 语句注释掉后,重新打jar包运行,否则会报错
spark-submit \
--class com.demo.spark.TempTrendBatchJob \
--master yarn \
--deploy-mode cluster \
--conf spark.sql.shuffle.partitions=1 \
/opt/jars/spark-job-1.0.0.jar
# 完全版
spark-submit \
--class com.demo.spark.TempTrendBatchJob \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=1 \
/opt/jars/spark-job-1.0.0.jar