李翔-大数据技术

Big data technology!

离线分析实验05 - Spark 批处理历史温度趋势【2026-4-21】

实验五:Spark 批处理历史温度趋势


10.1 实验目标

通过本实验,使学生掌握基于 Spark SQL 的离线数据分析流程,理解历史数据从 Hive → ClickHouse → 大屏展示 的完整数据流转过程,具体目标包括:

  1. 从 Hive 中表 iot.ods_device_status_di 读取设备历史温度明细数据;

  2. 按天(dt)对历史数据进行分组统计;

  3. 计算每日平均温度(avg_temp)与最大温度(max_temp);

  4. 清空 ClickHouseiot_report.dws_temp_trend_day 中的数据,防止重复写入。

  5. 将计算结果写入 ClickHouseiot_report.dws_temp_trend_day

  6. ClickHouse 中读取数据进行验证,用于后续报表分析与可视化展示。


10.3 ClickHouse 表设计

启动ClickHouse客户端

# 启动多行模式(在master中操作)
clickhouse-client -m


创建ClickHouse 表

如果没有指定数据库,ClickHouse 默认会将表创建在 default 数据库中。default 是 ClickHouse 中的默认数据库,所有没有显式指定数据库的表都会创建在该数据库下。

-- 1.创建报表数据库(如果不存在)
CREATE DATABASE IF NOT EXISTS iot_report;

-- 2.创建温度趋势报表表
CREATE TABLE IF NOT EXISTS iot_report.dws_temp_trend_day
(
    dt Date,
    avg_temp Float64,
    max_temp Float64
)
ENGINE = MergeTree()
ORDER BY dt;

-- 3.选择数据库
use iot_report;

-- 4.查看数据表
show tables;


启动大数据环境

# 启动 Hadoop
start-dfs.sh   # 在master操作
start-yarn.sh  # 在slave1操作

# 启动 Hive 元数据服务,
# 或者确保命令在终端关闭时继续运行。nohup hive --service metastore &
hive --service metastore &

# 启动 Hive 客户端
hive



10.4 完整代码

在IDEA中创建Scala项目,文件路径

src/main/java/com/demo/spark/TempTrendBatchJob

完整代码:

package com.demo.spark

import org.apache.spark.sql.SparkSession
import java.sql.DriverManager

object TempTrendBatchJob {

  def main(args: Array[String]): Unit = {

    // ======================================================
    // 创建 SparkSession,Spark 程序的入口
    // ======================================================
    val spark = SparkSession.builder()
      .appName("TempTrendBatchJob")
      .master("local[*]")  // 本地调试时保留,集群提交时需删除
      .enableHiveSupport()  // 开启 Hive 支持
      .getOrCreate()

    // 设置日志级别为 ERROR,减少日志输出
    spark.sparkContext.setLogLevel("ERROR")
      
    // ClickHouse 配置
    val ckUrl = "jdbc:clickhouse://master:8123/iot_report"
    val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver"
    val ckTable = "dws_temp_trend_day"  

    // ======================================================
    // 从 Hive 读取数据,计算每日温度趋势
    // ======================================================
    println("====== 1. 从 Hive 读取并统计温度数据 ======")
    spark.sql("USE iot")

    val result = spark.sql(
      """
        |SELECT dt,
        |       AVG(temperature) AS avg_temp,
        |       MAX(temperature) AS max_temp
        |FROM ods_device_status_di
        |GROUP BY dt
        |ORDER BY dt
      """.stripMargin)

    println("====== 2. 每日温度趋势计算结果 ======")
    result.show(false)

    // ======================================================
    // 清空 ClickHouse 表,避免重复写入
    // ======================================================
    println("====== 3. 清空 ClickHouse 表(避免重复写入) ======")
    val conn = DriverManager.getConnection(ckUrl, "default", "")  
    val stmt = conn.createStatement()
    stmt.execute("TRUNCATE TABLE iot_report." + ckTable)
    stmt.close()
    conn.close()
    println("ClickHouse 表已清空")

    // ======================================================
    // 将计算结果写入 ClickHouse
    // ======================================================
    println("====== 4. 将计算结果写入 ClickHouse ======")
    result.write
      .format("jdbc")
      .option("url", ckUrl)
      .option("dbtable", ckTable)
      .option("driver", ckDriver)
      .mode("append")
      .save()

    println("====== 5. 写入完成:iot_report.dws_temp_trend_day ======")

    // ======================================================
    // 从 ClickHouse 读取数据,验证写入是否成功
    // ======================================================
    println("====== 6. 验证:从 ClickHouse 读取写入结果 ======")
    val ckResult = spark.read
      .format("jdbc")
      .option("url", ckUrl)
      .option("dbtable", s"(SELECT * FROM $ckTable ORDER BY dt) AS tmp")
      .option("driver", ckDriver)
      .load()

    ckResult.show(false)

    println("====== 任务执行完成 ======")
    spark.stop()  // 关闭 SparkSession
  }
}



10.5 运行代码,验证数据

-- 查看clickhouse中的dws_temp_trend_day表数据
SELECT * FROM iot_report.dws_temp_trend_day ORDER BY dt;



10.6 打包与部署(服务器长期运行)

# 方法一:本地模式(调试用)
spark-submit \
  --class com.demo.spark.TempTrendBatchJob \
  --master local[1] \
  /opt/jars/spark-job-1.0.0.jar
  
  
# 方法二:YARN client 模式(半集群模式)
spark-submit \
  --class com.demo.spark.TempTrendBatchJob \
  --master yarn \
  --deploy-mode client \
  /opt/jars/spark-job-1.0.0.jar
# 方法三:YARN cluster 模式(⭐企业生产场景运行模式)
# 注意:需要把源码中的 .master("local[*]") 语句注释掉后,重新打jar包运行,否则会报错
spark-submit \
  --class com.demo.spark.TempTrendBatchJob \
  --master yarn \
  --deploy-mode cluster \
  --conf spark.sql.shuffle.partitions=1 \
  /opt/jars/spark-job-1.0.0.jar
  
  
# 完全版
spark-submit \
  --class com.demo.spark.TempTrendBatchJob \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 512m \
  --executor-memory 512m \
  --executor-cores 1 \
  --num-executors 1 \
  --conf spark.sql.shuffle.partitions=1 \
  /opt/jars/spark-job-1.0.0.jar


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1