李翔-大数据技术

Big data technology!

离线数据分析-02

离线数据分析 第二部分


3.数据迁入ClickHouse

创建数据表

(1)进入ClickHouse

# 启动客户端
clickhouse-client

image-20250409235623393

(2)在ClickHouse中创建分析库analysisdb,用于存放数据表

# 查看数据库
show databases;

# 先删除数据库
drop database if exists analysisdb

# 再建数据库
create database if not exists analysisdb

(3)切换当前数据库为刚刚创建的analysisdb

use analysisdb

(4)创建订单表Orders,该表字段包括cid(客户ID)、oid(订单ID)、purchase(下单时间)、orderStatus(订单状态)、delivery(配送完成时间),其中cid、oid、orderStatus为String类型,purchase为date类型,delivery为Int32类型,并按orderStatus分区,在每个分区内按cid和oid排序

-- 创建数据表 Orders
CREATE TABLE IF NOT EXISTS analysisdb.Orders ( \
   cid String, \
   oid String, \
   purchase Date, \
   orderStatus String, \
   delivery Int32 \
) ENGINE = MergeTree() \
PARTITION BY (orderStatus) \
ORDER BY (cid, oid);

image-20241223123335066

(5)创建订单详情表OrderItems,该表字段包括oid、itemId、pid、limitDate(配送截止日期)、price,其中oid、itemId、pid为String类型,limitDate为date类型,price为Float32类型,按limitDate分区,在每个分区内按oid排序

-- 创建数据表 OrderItems
CREATE TABLE IF NOT EXISTS analysisdb.OrderItems ( \
   oid String, \
   itemId String, \
   pid String, \
   limitDate Date, \
   price Float32 \
) ENGINE = MergeTree() \
PARTITION BY toYYYYMM(limitDate) \
ORDER BY (oid);

(6)创建产品名称表ProductInfo,该表字段包括categoryName和pid,且均为String类型,按pid排序

-- 创建数据表 ProductInfo
CREATE TABLE IF NOT EXISTS analysisdb.ProductInfo( \
categoryName String, \
pid String \
)ENGINE = MergeTree() \
ORDER BY (pid) ;

(7)查看已创建的3个数据表

show tables;

(8)创建一份名为OrdersToClickhouse的Scala Object文件,将Hive的Orders表数据存入Clickhouse数据表


实验任务4:

基于 Spark 的 Hive 的orders表数据清洗并写入 ClickHouse的 Orders


🎯实验任务描述:

本实验通过一个完整的订单数据处理案例,引导学生使用 Spark 从 Hive 表中读取原始订单数据,完成空值处理、字段去重、时间字段转换、字段重命名等清洗操作,并将结果写入 ClickHouse 数据库中的目标表。重点掌握 Spark 与 Hive 的读取集成、Spark SQL 的数据清洗能力,以及 Spark 与 ClickHouse 的 JDBC 写入流程。


📌实验步骤概览:

  1. 创建 SparkSession 并启用 Hive 支持

  2. 读取 Hive 中的 orders

  3. 进行空值过滤与去重处理

  4. 完成时间字段格式转换与日期提取

  5. 保留核心字段并进行字段重命名

  6. 配置 ClickHouse JDBC 写入参数

  7. 将处理后的数据写入 ClickHouse 中的 Orders

  8. 释放 Spark 资源,结束程序运行


✅实验要求:

1. 环境准备

  • 已安装并正确配置以下组件:

    • Apache Spark(建议支持 Hive)

    • Apache Hive,包含 orders 表及测试数据

    • ClickHouse,并预先创建好目标数据库 analysisdb 及表 Orders

  • 已添加 ClickHouse JDBC 驱动到项目pom中。


2. 编程实现要求

  • 使用 Spark 编写 Scala 程序,完成以下功能:

    • 从 Hive 表 orders 中读取数据;

    • 过滤掉 orderidcustomeridstatus 任意为空的数据;

    • customerid 去重,保留每位客户的一条订单记录;

    • PurchaseTimestamp 转换为 timestamp 类型;

    • 从时间戳中提取日期为新列 purchase

    • 删除无关字段,如 carrierdateapprovedat 等;

    • 重命名字段,如将 customerid 改为 cidorderid 改为 oid

    • 使用 JDBC 写入 ClickHouse 的 Orders 表,采用追加模式。


🧠能力提升目标:

  • 掌握 Spark 读取 Hive 表的基本方式;

  • 熟悉 Spark SQL 中的数据清洗、时间处理与列转换;

  • 掌握 JDBC 的写入方法及 ClickHouse 的连接参数配置;

  • 理解 Spark 在多数据源场景下的数据协同处理能力。


ClickHouse 表结构(Orders 表)


字段名数据类型来源字段名字段说明
cidStringcustomerid客户 ID(字段重命名)
oidStringorderid订单 ID(字段重命名)
purchaseDatepurchasetimestamp购买时间提取出的日期部分
orderStatusStringstatus订单状态(字段重命名)
deliveryInt32deliverydatetime配送所用天数(整型,原始字段)




完整代码如下

// 包名,用于组织代码
package org.example

// 引入 SparkSession
import org.apache.spark.sql.SparkSession
// 引入常用函数:引用列、格式化时间、字符串转时间戳
import org.apache.spark.sql.functions._

// 定义主对象,程序入口
object OrdersToClickhouse {
  def main(args: Array[String]): Unit = {

    // 1. 初始化 Spark 配置    
    println("启动 SparkSession , 并开启 Hive 支持...")
    val spark = SparkSession.builder()
      .appName("HiveToClickhouse") // 设置应用名称
      .master("local[*]")  // 本地运行,使用所有 CPU 核心
      .enableHiveSupport() // 开启对 Hive 的支持,才能访问 Hive 表
      .getOrCreate()
  

    // 2. 从 Hive 表中读取数据
    val hiveTable = "orders" // Hive 表名
    println(s"从 Hive 表 '$hiveTable' 读取数据 ...")
    val df = spark.table(hiveTable) // 使用 Spark SQL 读取 Hive 表
    println(s"读取完成,数据条数:${df.count()}")
    // 显示处理后的数据
    println("数据清洗完成,展示前5条记录:")
    df.show(5, truncate = false)  

    // 3. 开始数据清洗
    println("开始数据清洗...")

    // 第一步:过滤空值(删除 orderid、customerid、status 中任意字段为空的记录)
    println("第一步:过滤空值(orderid, customerid, status 不能为空)...")
    val filteredDf = df.filter(
      col("orderid").isNotNull &&
        col("customerid").isNotNull &&
        col("status").isNotNull &&
        col("customerid") =!= "Customer Id"  // 排除表头
    )
    println(s"过滤完成,剩余数据条数:${filteredDf.count()}")

    // 第二步:去重,如果一个 customerid 出现多次,只保留一条记录
    println("第二步:去重(按 customerid 去重)...")
    val deduplicatedDf = filteredDf.dropDuplicates("customerid")
    println(s"去重完成,剩余数据条数:${deduplicatedDf.count()}")

    // 第三步:转换时间格式(把字符串格式的时间数据(比如 "25/03/2024 14:30:00")转 timestamp格式[如2024-03-25 14:30:00])
    println("第三步:将 PurchaseTimestamp 字符串转换为 timestamp 类型 ...")
    val timestampFormat = "dd/MM/yyyy HH:mm:ss"  // 指定原时间字符串的格式
    val dfWithTimestamp = deduplicatedDf.withColumn(
      "PurchaseTimestamptimestamp", // 新列名
      to_timestamp(col("PurchaseTimestamp"), timestampFormat) // 转换函数
    )

    // 第四步:提取购买日期:从 timestamp 中提取日期部分,如 2023-04-10
    println("第四步:提取购买日期(purchase 字段) ...")
    val dateFormat = "yyyy-MM-dd"
    val dfWithDate = dfWithTimestamp.withColumn(
      "Purchasetdate", // 新列名
      date_format(col("PurchaseTimestamptimestamp"), dateFormat)
    )

    // 第五步:删除无用列:只保留我们需要的字段
    println("第五步:删除无用列 ...")
    val dfWithDrop = dfWithDate.drop(
      "approvedat", "carrierdate", "customerdate", "deliverydate",
      "purchasetimestamp", "PurchaseTimestamptimestamp"
    )

    // 第六步:字段重命名:使字段名更简洁或符合 ClickHouse 的目标表字段
    println("第六步:字段重命名 ...")
    val dfRenamed = dfWithDrop
      .withColumnRenamed("customerid", "cid") // 客户 ID
      .withColumnRenamed("orderid", "oid") // 订单 ID
      .withColumnRenamed("status", "orderStatus") // 订单状态
      .withColumnRenamed("deliverydatetime", "delivery") // 配送时间
      .withColumnRenamed("Purchasetdate", "purchase") // 购买日期

    // 显示处理后的数据
    println("数据清洗完成,展示前 5 条记录:")
    dfRenamed.show(5, truncate = false)

    // 4. 配置 ClickHouse 连接参数
    println("配置 ClickHouse JDBC 连接参数 ...")
    val clickhouseJdbcUrl = "jdbc:clickhouse://192.168.36.100:8123/analysisdb" // ClickHouse 数据库地址
    val clickhouseProps = new java.util.Properties()
    clickhouseProps.setProperty("user", "default") // 用户名
    clickhouseProps.setProperty("password", "")    // 密码(为空)
    clickhouseProps.setProperty("driver", "ru.yandex.clickhouse.ClickHouseDriver") // JDBC 驱动

    // 5. 写入 ClickHouse 表
    println("开始写入 ClickHouse 表 'Orders' ...")
    dfRenamed.write
      .mode("append") // 追加模式写入数据
      .jdbc(clickhouseJdbcUrl, "Orders", clickhouseProps)

    println("写入 ClickHouse 完成。")
    
    // 6. 从 ClickHouse 读取刚写入的数据,显示前 5 条
    println("从 ClickHouse 读取 'Orders' 表,显示前 5 条记录 ...")
    val clickhouseDf = spark.read
      .jdbc(clickhouseJdbcUrl, "Orders", clickhouseProps)
    clickhouseDf.show(5, truncate = false)  

    // 7. 停止 Spark
    println("停止 Spark 会话 ...")
    spark.stop()
    println("程序执行完毕。")
  }
}

查看clickhouse中的生成的Orders表的数据总行数

# 切换数据库
use analysisdb

# 在 clickhouse 的 Shell中查询Orders表的数据总行数
select count(*) from analysisdb.Orders;

# 在 clickhouse 的 Shell中查询Orders表的前3行数据
select * from analysisdb.Orders limit 3;
# 运行结果
master :) select count(*) from analysisdb.Orders;

SELECT count(*)
FROM analysisdb.Orders
Query id: 786d3745-f4e9-455d-9730-6562ea9e7a47
┌─count()─┐
│   99442 │
└─────────┘
1 rows in set. Elapsed: 0.013 sec.


打包运行

cd /opt/apps/spark

./bin/spark-submit \
 --master yarn \
 --deploy-mode client \
 --class org.example.OrdersToClickhouse \
 --conf spark.hadoop.hive.metastore.uris=thrift://master:9083 \
 spark_task-1.0-jar-with-dependencies.jar


(12)创建名为OrderItemsToClickhouse的Scala对象文件,用于将名为"OrderItems"的Hive表数据清洗后存入ClickHouse的OrderItems表。


实验任务5:

基于 Spark 的 Hive 表 OrderItems订单明细数据清洗并写入 ClickHouse的 OrderItems 表实践


🎯实验任务描述:

本实验通过订单明细数据处理案例,指导学生使用 Spark 从 Hive 表 OrderItems 中读取原始数据,完成空值过滤、重复项去除、时间字段格式转换及字段重命名等数据清洗工作,最终通过 JDBC 接口将结果写入 ClickHouse 表 OrderItems。重点掌握 Spark 与 Hive 集成读取、数据清洗流程以及 Spark 写入 ClickHouse 的实践方式。


📌实验步骤概览:

  1. 创建 SparkSession 并启用 Hive 支持

  2. 读取 Hive 中的 OrderItems

  3. 进行关键字段的空值过滤

  4. 按订单及产品维度进行去重

  5. 将配送截止时间转换为 timestamp 并提取日期

  6. 保留核心字段并重命名为业务含义明确的字段名

  7. 配置 ClickHouse JDBC 连接参数

  8. 将处理结果写入 ClickHouse 的 OrderItems

  9. 释放 Spark 资源


✅实验要求:

1. 环境准备

  • 已安装并正确配置以下组件:

    • Apache Spark(需支持 Hive)

    • Apache Hive,包含 OrderItems 表及测试数据

    • ClickHouse,已创建数据库 analysisdb 和目标表 OrderItems

  • 项目中已引入 ClickHouse JDBC 驱动依赖


2. 编程实现要求

  • 使用 Spark 编写 Scala 程序,完成如下任务:

    • 读取 Hive 表 OrderItems 数据;

    • 过滤空值(orderiditemidproductidshippinglimitdate);

    • 按 (orderid, itemid, productid) 进行去重;

    • shippinglimitdate 转换为 timestamp 类型;

    • 提取日期字段生成新列 limitDate

    • 删除冗余字段(如原时间列和中间字段);

    • 重命名字段以增强语义清晰度;

    • 使用 JDBC 将数据追加写入 ClickHouse 表 OrderItems


🧠能力提升目标:

  • 熟练掌握 Spark 与 Hive 表的数据交互读取方式;

  • 提高使用 Spark SQL 进行空值处理、去重、时间转换的能力;

  • 掌握 ClickHouse JDBC 的参数配置及数据写入技巧;

  • 理解清洗流程中字段重命名对数据建模与分析的作用;

  • 提升跨平台数据传输与集成实操经验。


Hive 表结构(OrderItems 表)


ClickHouse 表结构(OrderItems 表)


字段名数据类型来源字段名字段说明
oidStringorderid订单 ID(字段重命名)
itemIdStringitemid订单项 ID(字段重命名)
pidStringproductid商品 ID(字段重命名)
limitDateDateshippinglimitdate配送截止日期(转换后的新列)
priceFloat32price商品价格(原字段保留)




完整代码如下:

// 包名,用于组织代码
package org.example

// 引入 SparkSession
import org.apache.spark.sql.SparkSession
// 引入常用函数:引用列、格式化时间、字符串转时间戳
import org.apache.spark.sql.functions._

object OrderItemsToClickhouse {
  def main(args: Array[String]): Unit = {

    // 1. 初始化 Spark 配置
    println("启动 SparkSession , 并开启 Hive 支持...")
    val spark = SparkSession.builder()
      .appName("HiveToClickhouse") // 设置应用名称
      .master("local[*]")  // 本地运行,使用所有 CPU 核心
      .enableHiveSupport() // 开启对 Hive 的支持,才能访问 Hive 表
      .getOrCreate()

    // 2. 从 Hive 表中读取数据
    val hiveTable = "OrderItems"
    println(s"从 Hive 表 '$hiveTable' 读取数据 ...")
    val df = spark.table(hiveTable)
    println(s"读取完成,数据条数:${df.count()}")
    println("数据表前5行记录:")
    df.show(5, truncate = false)

    // 3. 开始数据清洗
    println("开始数据清洗...")

    // 第一步:过滤空值(指定关键字段)
    println("第一步:过滤空值(orderid, itemid, productid不能为空)...")
    val filteredDf = df.filter(
      col("orderid").isNotNull &&
        col("itemid").isNotNull &&
        col("productid").isNotNull
    )
    println(s"过滤完成,剩余数据条数:${filteredDf.count()}")

    // 第二步:去重(按 orderid, itemid, productid 去重)
    println("第二步:去重(按 orderid, itemid, productid 去重)...")
    val deduplicatedDf = filteredDf.dropDuplicates("orderid", "itemid", "productid")
    println(s"去重完成,剩余数据条数:${deduplicatedDf.count()}")

    // 第三步:转换 shippinglimitdate 为 timestamp 类型
    println("第三步:将 ShippingLimitDate 字符串转换为 timestamp 类型 ...")
    val timestampFormat = "dd/MM/yyyy HH:mm:ss"
    val dfWithTimestamp = deduplicatedDf.withColumn(
      "date_timestamp",
      to_timestamp(col("ShippingLimitDate"), timestampFormat)
    )

    // 第四步:提取年月日(limitDate 字段)
    println("第四步:提取日期部分,生成 limitDate 字段 ...")
    val dateFormat = "yyyy-MM-dd"
    val dfWithDate = dfWithTimestamp.withColumn(
      "limitDate",
      date_format(col("date_timestamp"), dateFormat)
    )

    // 第五步:删除多余字段
    println("第五步:删除无用字段 ...")
    val dfWithDrop = dfWithDate.drop("shippinglimitdate", "date_timestamp", "freightvalue")

    // 第六步:字段重命名(更清晰)
    println("第六步:字段重命名 ...")
    val dfRenamed = dfWithDrop
      .withColumnRenamed("orderid", "oid")
      .withColumnRenamed("itemid", "itemId")
      .withColumnRenamed("productid", "pid")
      .withColumnRenamed("deliverydatetime", "delivery")

    // 显示处理后的数据
    println("数据清洗完成,展示前 5 条记录:")
    dfRenamed.show(5, truncate = false)

    // 4. 配置 ClickHouse 连接参数
    println("配置 ClickHouse JDBC 连接参数 ...")
    val clickhouseJdbcUrl = "jdbc:clickhouse://192.168.36.100:8123/analysisdb"
    val clickhouseProps = new java.util.Properties()
    clickhouseProps.setProperty("user", "default")
    clickhouseProps.setProperty("password", "")
    clickhouseProps.setProperty("driver", "ru.yandex.clickhouse.ClickHouseDriver")

    // 5. 写入 ClickHouse 表
    println("开始写入 ClickHouse 表 'OrderItems' ...")
    dfRenamed.write
      .mode("append") // 追加写入
      .jdbc(clickhouseJdbcUrl, "OrderItems", clickhouseProps)

    println("写入 ClickHouse 完成。")

    // 6. 从 ClickHouse 读取刚写入的数据,显示前 5 条
    println("从 ClickHouse 读取 'OrderItems' 表,显示前 5 条记录 ...")
    val clickhouseDf = spark.read
      .jdbc(clickhouseJdbcUrl, "OrderItems", clickhouseProps)

    clickhouseDf.show(5, truncate = false)

    // 7. 停止 Spark
    println("停止 Spark 会话 ...")
    spark.stop()
    println("程序执行完毕。")
  }
}


查看clickhouse中的生成OrderItems表的数据总行数

# 查看 clickhouse 中的生成OrderItems表的数据总行数
select count(*) from analysisdb.OrderItems;

# 在 clickhouse 的 Shell中查询OrderItems表的前3行数据
select * from analysisdb.OrderItems limit 3;

运行结果

SELECT count(*)  FROM OrderItems

Query id: 126b826e-1391-4ddc-ad40-d39f8d08575a ┌─count()─┐ │  112650 │ └─────────┘ 1 rows in set. Elapsed: 0.004 sec.


(13)创建名为ProductInfoToClickhouse的Scala对象文件,用于将名为"ProductInfo"的Hive表数据清洗后存入ClickHouse的ProductInfo表。


实验任务6:

基于 Spark 的商品信息数据清洗并写入 ClickHouse 实践


实验任务描述:

本实验以商品信息表为基础,指导学生使用 Spark 从 Hive 表 productinfo 中读取原始商品数据,通过字段空值清理、去重、字段精简和重命名等步骤完成数据清洗,最终将结果通过 JDBC 接口写入 ClickHouse 表 ProductInfo。重点掌握 Spark 对字符串和结构化字段的清洗技巧,以及 ClickHouse 的高效写入流程。


实验步骤概览:

  1. 创建 SparkSession 并启用 Hive 支持

  2. 读取 Hive 中的 productinfo

  3. 对所有字段进行空值与字符串清理

  4. 根据 productid 进行去重

  5. 删除无关字段,保留核心信息

  6. 重命名关键字段,增强语义清晰度

  7. 配置 ClickHouse JDBC 连接参数

  8. 将处理结果写入 ClickHouse 的 ProductInfo

  9. 释放 Spark 资源


实验要求:

1. 环境准备

  • 已安装并正确配置以下组件:

    • Apache Spark(支持 Hive)

    • Apache Hive,包含 productinfo 表及商品测试数据

    • ClickHouse,预先创建好目标数据库 analysisdb 和表 ProductInfo

  • 项目中已引入 ClickHouse JDBC 驱动依赖


2. 编程实现要求

  • 使用 Spark 编写 Scala 程序,完成以下数据处理任务:

    • 从 Hive 表 productinfo 中读取原始数据;

    • 对所有字段进行空值处理,字符串列需 TRIM() 后非空;

    • 使用 productid 进行去重;

    • 删除冗余字段(如描述长度、尺寸、照片数量等);

    • 重命名关键字段(如 productid → pidcategoryname → categoryName);

    • 将处理结果通过 JDBC 接口写入 ClickHouse 表 ProductInfo


能力提升目标:

  • 掌握 Spark 对结构化数据的空值和非空字符串处理方法;

  • 提高字段筛选、字段清理及语义增强的实际操作能力;

  • 熟悉 Spark 与 ClickHouse 的高效连接与写入方式;

  • 培养在真实业务数据清洗中的字段优化与映射意识。


Hive 表结构(productinfo 表)


字段名数据类型说明
productidSTRING商品 ID
categorynameSTRING商品类别名称
descriptionlenghtINT描述长度(不保留)
heightcmDOUBLE商品高度(厘米)(不保留)
lengthcmDOUBLE商品长度(厘米)(不保留)
namelenghtINT名称长度(不保留)
photosqtyINT图片数量(不保留)
weightgDOUBLE商品重量(克)(不保留)
widthcmDOUBLE商品宽度(厘米)(不保留)



ClickHouse 表结构(ProductInfo 表)


字段名数据类型来源字段名说明
pidStringproductid商品 ID(重命名)
categoryNameStringcategoryname商品类别名称(重命名)




完整代码如下:

package org.example

// 引入 SparkSession
import org.apache.spark.sql.SparkSession
// 导入 Apache Spark SQL 中的 DataTypes 类,用于定义和操作数据类型
import org.apache.spark.sql.types.DataTypes

object ProductInfoToClickhouse {
  def main(args: Array[String]): Unit = {

    // 1. 初始化 Spark 配置    
    println("启动 SparkSession , 并开启 Hive 支持...")
    val spark = SparkSession.builder()
      .appName("HiveToClickhouse") // 设置应用名称
      .master("local[*]")  // 本地运行,使用所有 CPU 核心
      .enableHiveSupport() // 开启对 Hive 的支持,才能访问 Hive 表
      .getOrCreate()

    // 2. 从 Hive 表中读取数据
    val hiveTable = "productinfo"
    println(s"从 Hive 表 '$hiveTable' 读取数据 ...")
    val df= spark.table(hiveTable)
    println(s"读取完成,数据条数:${df.count()}")
    println("数据表展示前 5 条记录:")
    df.show(5, truncate = false)  

    // 3. 开始数据清洗
    println("开始数据清洗...")

    // 第一步:过滤空值(包含字符串去除空格后的非空判断)
    println("第一步:过滤空值(包括 TRIM 后非空的字符串列)...")
    val filterCondition = df.schema.fields.map { field =>
      val columnName = field.name
      val dataType = field.dataType
      if (dataType == DataTypes.StringType) {
        s"TRIM($columnName) <> '' AND $columnName IS NOT NULL"
      } else {
        s"$columnName IS NOT NULL"
      }
    }.mkString(" AND ")

    val filteredDf = df.filter(filterCondition)
    println(s"过滤完成,剩余数据条数:${filteredDf.count()}")

    // 第二步:去重(按 productid 去重)
    println("第二步:根据 productid 去重 ...")
    val deduplicatedDf = filteredDf.dropDuplicates("productid")
    println(s"去重完成,剩余数据条数:${deduplicatedDf.count()}")

    // 第三步:删除不需要的列
    println("第三步:删除无用字段 ...")
    val dfWithDrop = deduplicatedDf.drop(
      "descriptionlenght", "heightcm", "lengthcm",
      "namelenght", "photosqty", "weightg", "widthcm"
    )

    // 第四步:字段重命名(productid → pid,categoryname → categoryName)
    println("第四步:字段重命名 ...")
    val dfRenamed = dfWithDrop
      .withColumnRenamed("productid", "pid")
      .withColumnRenamed("categoryname", "categoryName")

    // 显示处理后的前5条数据
    println("数据清洗完成,展示前 5 条记录:")
    dfRenamed.show(5, truncate = false)

    // 4. 配置 ClickHouse JDBC 连接参数
    println("配置 ClickHouse JDBC 连接参数 ...")
    val clickhouseJdbcUrl = "jdbc:clickhouse://192.168.36.100:8123/analysisdb"
    val clickhouseProps = new java.util.Properties()
    clickhouseProps.setProperty("user", "default")
    clickhouseProps.setProperty("password", "")
    clickhouseProps.setProperty("driver", "ru.yandex.clickhouse.ClickHouseDriver")

    // 5. 写入 ClickHouse 表
    println("开始写入 ClickHouse 表 'ProductInfo' ...")
    dfRenamed.write
      .mode("append") // 追加写入
      .jdbc(clickhouseJdbcUrl, "ProductInfo", clickhouseProps)

    println("写入 ClickHouse 完成。")
      
    // 6. 从 ClickHouse 读取刚写入的数据,显示前 5 条
    println("从 ClickHouse 读取 'OrderItems' 表,显示前 5 条记录 ...")
    val clickhouseDf = spark.read
      .jdbc(clickhouseJdbcUrl, "ProductInfo", clickhouseProps)
      
    clickhouseDf.show(5, truncate = false)      

    // 7. 停止 Spark
    println("停止 Spark 会话 ...")
    spark.stop()
    println("程序执行完毕。")
  }
}


查看clickhouse中的生成的OrderItems表的数据总行数

# 查看clickhouse中的生成的OrderItems表的数据总行数
select count(*) from ProductInfo;

运行结果

SELECT count(*) FROM ProductInfo

Query id: 8cd7b05c-3c31-4d60-ad2d-cc0d0414e39d ┌─count()─┐ │   32340 │ └─────────┘ 1 rows in set. Elapsed: 0.003 sec.


四、数据分析

1. Hive数据分析

启动环境

# 启动Hadoop
start-dfs.sh
start-yarn.sh

# 启动Hive服务
hive --service metastore &
hive --service hiveserver2 &

# 启动hive的客户端IDEA

HiveQL

实验任务7:

(1)统计分析各种支付方式的使用情况

# 选择默认数据库
use default;

# 统计分析各种支付方式的使用数量
SELECT PayType , COUNT(Oid) AS total 
FROM details  
GROUP BY PayType;
# 运行结果 
+--------------+--------+
|   paytype    | total  |
+--------------+--------+
| NULL         | 1      |
| boleto       | 19784  | 银行付款票据
| credit_card  | 76505  | 信用卡
| debit_card   | 1528   | 借记卡
| not_defined  | 3      |
| voucher      | 3866   | 代金券
+--------------+--------+

将分析结果存入pay_type_analyse中

# 统计分析各种支付方式的使用数量,将分析结果存入pay_type_analyse中
insert into pay_type_analyse
    SELECT PayType , COUNT(Oid) AS total 
    FROM Details  where PayType is not null
    GROUP BY PayType;


(2)统计分析不同地域的销售情况

image-20250411191321685

# 统计分析不同地域的销售情况  
SELECT
    U.Statename,
    SUM(P.Price) AS total
FROM
    Usertb U
        LEFT JOIN
    Details D ON U.Cid = D.Uid
        LEFT JOIN
    ProductList P ON D.Oid = P.Oid
GROUP BY
    U.Statename;
# 运行结果
+----------------------+---------------------+
|      statename       |        total        |
+----------------------+---------------------+
| Acre                 | 16108.449999999992  |
| Alagoas              | 79022.53000000004   |
| Amapá                | 12151.269999999993  |
| Amazonas             | 21455.910000000018  |
| Bahia                | 489701.570000006    |
| Ceará                | 221158.31999999803  |
| Distrito Federal     | 292844.2299999968   |
| Espírito Santo       | 262677.1299999957   |
| Goiás                | 275896.6599999962   |
| Maranhão             | 116451.51999999983  |
| MatoGrosso           | 144508.37999999925  |
| MatoGrosso do Sul    | 111570.4999999999   |
| Minas Gerais         | 1522408.3699999005  |
| Paraná               | 647553.4000000175   |
| Paraíba              | 111140.20999999985  |
| Pará                 | 173956.2199999987   |
| Pernambuco           | 256667.9599999968   |
| Piauí                | 83226.23999999996   |
| Rio Grande do Norte  | 82514.91999999995   |
| Rio Grande do Sul    | 717682.2600000211   |
| Rio de Janeiro       | 1742382.1499998374  |
| Rondônia             | 45625.37000000009   |
| Roraima              | 6958.519999999996   |
| Santa Catarina       | 498155.3700000074   |
| Sergipe              | 57116.060000000056  |
| São Paulo            | 4978608.120001085   |
| Tocantins            | 48900.790000000045  |
+----------------------+---------------------+


将分析结果存入area_sale_analyse中

# 统计分析不同地域的销售情况 , 将分析结果存入area_sale_analyse中
INSERT INTO area_sale_analyse
SELECT
    U.Statename,
    SUM(P.Price) AS total
FROM
    Usertb U
        LEFT JOIN
    Details D ON U.Cid = D.Uid
        LEFT JOIN
    ProductList P ON D.Oid = P.Oid
GROUP BY
    U.Statename;


Spark离线分析

(3)查看在不同时间段用户下单的情况


实验任务8:

基于 Spark 的订单数据时间分析实验


🎯实验任务描述:

本实验通过一个完整的订单分析案例,引导学生使用 Spark 从 Hive 表中读取订单数据,完成时间字段的解析、清洗与转换,并以小时为维度进行统计分析。重点掌握 Spark SQL 中时间字段的处理流程,提升学生对数据预处理与分组分析的能力。


📌实验步骤概览:

  1. 创建 SparkSession 并启用 Hive 支持

  2. 读取 Hive 中的 Orders 表数据

  3. PurchaseTimestamp 字段转换为标准时间类型

  4. 过滤无效时间并提取订单下单的小时信息

  5. 统计每小时的订单数量并展示结果

  6. 关闭 SparkSession


✅实验要求:

1. 环境准备

  • 安装并配置好 Spark 和 Hive。

  • Hive 中已创建并导入 Orders 表,包含订单字段 PurchaseTimestamp

  • 时间字段格式:dd/MM/yyyy HH:mm:ss


2. 编程实现要求

  • 使用 Spark 编写 Scala 程序,完成以下功能:

    • 读取 Hive 表 Orders 中的订单数据;

    • 将字符串格式的 PurchaseTimestamp 字段转换为 timestamp 类型;

    • 过滤掉转换失败(为 null)的记录;

    • 使用 hour() 函数提取下单时间中的小时部分;

    • 统计各个小时的订单数量,并以表格形式输出结果;

    • 可选:打印前 10 行的转换示例,辅助调试与理解。


🧠能力提升目标:

  • 掌握 Spark 与 Hive 表集成的查询与数据提取方式;

  • 理解 unix_timestampcast 的时间转换机制;

  • 掌握使用 Spark SQL 进行数据清洗与字段提取;

  • 能够运用聚合函数进行分组统计分析;

  • 提升日志类、订单类数据的时间字段处理能力。


完整代码如下:

package org.example

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SparkSession, functions}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

object HiveAnalysis {
  def main(args: Array[String]): Unit = {

    // 1. 初始化 Spark 配置    
    println("启动 SparkSession , 并开启 Hive 支持...")
    val spark = SparkSession.builder()
      .appName("HiveToClickhouse") // 设置应用名称
      .master("local[*]")  // 本地运行,使用所有 CPU 核心
      .enableHiveSupport() // 开启对 Hive 的支持,才能访问 Hive 表
      .getOrCreate()

    // 2. 从 Hive 表读取数据
    println("3.从 Hive 表 Orders 读取数据 ...")
    val ordersDF = spark.sql("SELECT * FROM Orders")
    ordersDF.show(5, truncate = false)           // 显示前 5 条记录(不截断列内容)
    println(s"Hive中的Orders表读取完成,共有数据条数:${ordersDF.count()}")

    // 3. 将字符串时间转换为 Timestamp 类型,并删除转换失败(为 null)的记录
    println("4.将 PurchaseTimestamp 转换为 Timestamp,并清洗格式错误的记录 ...")
    val dateFormat = "dd/MM/yyyy HH:mm:ss"
    // 在 DataFrame 中新增一个列 "order_timestamp_parsed",
    // 把原来的 "PurchaseTimestamp"(字符串时间)转换为 Timestamp 类型(yyyy-MM-dd HH:mm:ss)
    val dfWithTimestamp = ordersDF
      .withColumn(
        "order_timestamp_parsed",  // 新列名
        to_timestamp(              // 内置函数:把字符串转换为 Timestamp 类型
          col("PurchaseTimestamp"),// 取出 "PurchaseTimestamp" 列的值
          dateFormat               // 按照指定的时间格式解析:dd/MM/yyyy HH:mm:ss
        )
      )

    // 4. 打印前几行时间转换示例
    println("5.前 5 行时间字段转换示例:")
    dfWithTimestamp.select("PurchaseTimestamp", "order_timestamp_parsed")
      .show(5, truncate = false)   

    // 5. 过滤掉时间转换失败的数据(null),并提取小时字段
    println("6.提取订单时间中的小时字段 ...")
    val ordersDFWithHour = dfWithTimestamp
      .filter(col("order_timestamp_parsed").isNotNull)
      .withColumn("hour", hour(col("order_timestamp_parsed")))

    // 6. 按小时统计订单数量
    println("7.按小时统计订单数量 ...")
    val orderCountByHour = ordersDFWithHour
      .groupBy("hour")
      .agg(count("*").as("order_count"))
      .orderBy("hour")

    // 7. 显示结果
    println("8.每个小时的订单数量如下:")
    orderCountByHour.show(24, truncate = false)

    // 8. 关闭 SparkSession
    println("9.关闭 SparkSession,程序执行完毕。")
    spark.stop()
  }
}
# 运行结果
每个小时的订单数量如下:
+----+-----------+
|hour|order_count|
+----+-----------+
|0   |2394       |
|1   |1170       |
|2   |510        |
|3   |272        |
|4   |206        |
|5   |188        |
|6   |502        |
|7   |1231       |
|8   |2967       |
|9   |4785       |
|10  |6177       |
|11  |6578       |
|12  |5995       |
|13  |6518       |
|14  |6569       |
|15  |6454       |
|16  |6675       |
|17  |6150       |
|18  |5769       |
|19  |5982       |
|20  |6193       |
|21  |6217       |
|22  |5816       |
|23  |4123       |
+----+-----------+
only showing top 24 rows


方法二:Hive Sql实现

-- 1. 过滤掉无效 PurchaseTimestamp 的记录,先尝试转换为 timestamp
WITH orders_with_ts AS (
  SELECT 
    *,
    unix_timestamp(PurchaseTimestamp, 'dd/MM/yyyy HH:mm:ss') AS ts
  FROM Orders
  WHERE PurchaseTimestamp IS NOT NULL
),

-- 2. 过滤掉转换失败的行(ts 为 null 或 0),并提取小时字段
orders_with_hour AS (
  SELECT 
    hour(from_unixtime(ts)) AS hour
  FROM orders_with_ts
  WHERE ts IS NOT NULL AND ts != 0
)

-- 3. 按小时统计订单数量
SELECT 
  hour,
  COUNT(*) AS order_count
FROM orders_with_hour
GROUP BY hour
ORDER BY hour;


2. Hbase数据分析

实验任务9:

Hbase查询

(1)查看订单ID为“000229ec398224ef6ca0657da4fc703e”的订单详细信息

scan 'OrderItemsHB', { FILTER => "SingleColumnValueFilter('orderinfo', 'orderid', =, 'binary:000229ec398224ef6ca0657da4fc703e')" }
# 代码注释
scan 'OrderItemsHB', \  -- 🔍 从名为 OrderItemsHB 的表中扫描所有行(就是去查表里的数据)

{ 
  FILTER => "SingleColumnValueFilter( \
    'orderinfo', \                    -- 列族名:orderinfo,相当于一个“字段分类文件夹” \
    'orderid', \                      -- 列名:orderid,就是我们要检查的字段 \
    =, \                              -- 比较方式:=,表示“等于” \
    'binary:000229ec398224ef6ca0657da4fc703e')"  -- 要匹配的值,表示只要这个字段的值正好等于这个,就返回这一行
}

# SingleColumnValueFilter 是用来查 “某一列的值是否满足某个条件” 的过滤器。
# binary: 表示要“完全一模一样”地匹配这个值。
# 用 HBase 存储的所有值其实都是二进制(字节数组),而不是字符串。
# 所以当你写 'binary:abc123',HBase 的意思是:
#  “把这个值转成字节,然后去一条条比,谁的字节完全一样就留下,其他都不要。”

image-20241223132943253

(2)列出OrdersHB表中“row10”的用户订单信息

# 按行键(rowkey)查找名为 OrdersHB 的表中,rowkey 为 row10 的那一行数据。
get 'OrdersHB', 'row10'

image-20241223133013622

(3)列出购买时间(Purchasetdate)在2016年8月1日到2016年10月1日内创建的所有订单

scan 'OrdersHB', {FILTER => "SingleColumnValueFilter('timeinfo', 'Purchasetdate', >=, 'binary:2016-08-01') AND SingleColumnValueFilter('timeinfo', 'Purchasetdate', <=, 'binary:2016-10-01')"}

image-20241223133054114


3. ClickHouse数据分析

ClickHouse SQL

切换当前数据库为刚刚创建的analysisdb

# 切换当前数据库为analysisdb
use analysisdb


查询订单表Orders

ClickHouse 表结构(Orders 表)


字段名数据类型来源字段名字段说明
cidStringcustomerid客户 ID(字段重命名)
oidStringorderid订单 ID(字段重命名)
purchaseDatepurchasetimestamp购买时间提取出的日期部分
orderStatusStringstatus订单状态(字段重命名)
deliveryInt32deliverydatetime配送所用天数(整型,原始字段)



实验任务10:

(1)统计每天的订单数量,并按从多到少的顺序进行排序

SELECT  \
  purchase AS order_date, \
  COUNT(*) AS order_count \
FROM analysisdb.Orders \
GROUP BY purchase \
ORDER BY order_count DESC;

image-20241223133652106


(2)统计每个用户的总消费金额和平均消费金额,并按平均消费金额进行排序(只显示前10行)

ClickHouse 表结构(Orders 表)


字段名数据类型来源字段名字段说明
cidStringcustomerid客户 ID(字段重命名)
oidStringorderid订单 ID(字段重命名)
purchaseDatepurchasetimestamp购买时间提取出的日期部分
orderStatusStringstatus订单状态(字段重命名)
deliveryInt32deliverydatetime配送所用天数(整型,原始字段)


ClickHouse 表结构(OrderItems 表)


字段名数据类型来源字段名字段说明
oidStringorderid订单 ID(字段重命名)
itemIdStringitemid订单项 ID(字段重命名)
pidStringproductid商品 ID(字段重命名)
limitDateDateshippinglimitdate配送截止日期(转换后的新列)
priceFloat32price商品价格(原字段保留)


# 统计每个用户的总消费金额和平均消费金额,并按平均消费金额进行排序(只显示前10行)
SELECT  \
  o.cid AS user_id, \
  SUM(oi.price) AS total_spent, \
  AVG(oi.price) AS avg_spent_per_order \
FROM analysisdb.Orders AS o \
INNER JOIN analysisdb.OrderItems AS oi ON o.oid = oi.oid \
GROUP BY o.cid \
ORDER BY total_spent DESC \
limit 10;

image-20241223134005332


(3)统计销售额最高的商品

ClickHouse 表结构(OrderItems 表)


字段名数据类型来源字段名字段说明
oidStringorderid订单 ID(字段重命名)
itemIdStringitemid订单项 ID(字段重命名)
pidStringproductid商品 ID(字段重命名)
limitDateDateshippinglimitdate配送截止日期(转换后的新列)
priceFloat32price商品价格(原字段保留)


🏁 ClickHouse 表结构(ProductInfo 表)


字段名数据类型来源字段名说明
pidStringproductid商品 ID(重命名)
categoryNameStringcategoryname商品类别名称(重命名)


# 统计销售额最高的商品
SELECT \
  p.pid AS product_id, \
  p.categoryName AS product_category, \
  SUM(oi.price) AS total_sales \
FROM analysisdb.OrderItems AS oi \
INNER JOIN analysisdb.ProductInfo AS p ON oi.pid = p.pid \
GROUP BY \
  p.pid, \
  p.categoryName \
ORDER BY total_sales DESC \
LIMIT 1;

image-20241223134127818


Spark离线分析

(4)使用Spark统计每个用户的平均消费金额

完整代码:

package org.example

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object ClickHouseAnalysis {
 def main(args: Array[String]): Unit = {

   // ====================================================
   // 一、Spark 环境配置与初始化
   // ====================================================

   // 1.1 配置 Spark 应用基本信息(运行方式、名称、序列化方式)
   val sparkConf = new SparkConf()
     .setMaster("local[*]") // 使用本地所有CPU核心运行
     .setAppName("ClickHouseAnalysis") // 应用名称
     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 提升效率

   // 1.2 创建 SparkSession(Spark 的主入口)
   val spark = SparkSession.builder()
     .config(sparkConf)
     .enableHiveSupport() // 虽然这次不用 Hive,但保留也不会影响
     .getOrCreate()

   println("SparkSession 启动成功。")

   // ====================================================
   // 二、读取 ClickHouse 中的数据表
   // ====================================================

   // 2.1 设置 ClickHouse 数据库连接地址
   val clickhouseUrl = "jdbc:clickhouse://192.168.36.100:8123/analysisdb"

   // 2.2 读取 Orders 表(订单表,包含用户和订单状态等信息)
   val ordersDF = spark.read
     .format("jdbc")
     .option("url", clickhouseUrl)
     .option("dbtable", "Orders")
     .load()

   // 2.3 读取 OrderItems 表(订单项表,包含商品价格、订单编号等)
   val orderItemsDF = spark.read
     .format("jdbc")
     .option("url", clickhouseUrl)
     .option("dbtable", "OrderItems")
     .load()

   // 2.4 打印两张表的记录数量,确认数据是否成功读取
   println(s"Orders 表记录数:${ordersDF.count()}")
   println(s"OrderItems 表记录数:${orderItemsDF.count()}")

   // ====================================================
   // 三、订单数据关联与初步转换
   // ====================================================

   // 3.1 通过订单编号(oid)将两张表连接起来,形成完整订单信息
   val joinedDF = ordersDF.join(orderItemsDF, "oid") // 使用公共字段 oid 自动去重

   // 3.2 选择用户ID和商品价格字段,准备后续统计字段
   val withTotalPriceDF = joinedDF.select(
     col("cid").as("user_id"),            // 用户 ID
     col("price").as("price_per_item")    // 商品单价
   ).withColumn(
     "order_total", col("price_per_item") // 本实验将“单价”视作“销售额”(简化处理)
   )

   // ====================================================
   // 四、用户维度的订单分析
   // ====================================================

   // 4.1 按用户分组,统计每位用户的总消费金额和订单项数量
   val userSpendingDF = withTotalPriceDF.groupBy("user_id").agg(
     sum("order_total").as("total_spent"), // 每位用户总花费
     count("*").as("order_count")          // 每位用户订单项数量
   )

   // 4.2 计算每位用户的平均消费金额
   val userAverageSpendingDF = userSpendingDF.withColumn(
     "avg_spent_per_user",
     col("total_spent") / col("order_count") // 平均消费 = 总金额 / 项数
   )

   // 4.3 按照平均消费金额从高到低排序
   val sortedUserAverageSpendingDF = userAverageSpendingDF.orderBy(desc("avg_spent_per_user"))

   // 4.4 显示前 10 位用户的消费情况(高消费用户排行)
   println("前 10 位用户的消费分析结果:")
   sortedUserAverageSpendingDF.show(10, truncate = false)

   // ====================================================
   // 五、关闭 SparkSession
   // ====================================================

   // 5.1 关闭 Spark,释放资源
   spark.stop()
   println("SparkSession 已关闭,程序执行完毕。")
 }
}

程序运行结果:

SparkSession 启动成功。
Orders 表记录数:99442
OrderItems 表记录数:112650
10 位用户的消费分析结果:
+--------------------------------+----------------+-----------+------------------+
|user_id                         |total_spent     |order_count|avg_spent_per_user|
+--------------------------------+----------------+-----------+------------------+
|c6e2731c5b391845f6800c97401a43a9|6735.0          |1          |6735.0            |
|f48d464a0baaea338cb25f816991ab1f|6729.0          |1          |6729.0            |
|3fd6777bbce08a352fddd04e4a7cc8f6|6499.0          |1          |6499.0            |
|df55c14d1476a9a3467f131269c2477f|4799.0          |1          |4799.0            |
|24bbf5fd2f2e1b359ee7de94defc4a15|4690.0          |1          |4690.0            |
|3d979689f636322c62418b6346b1c6d2|4590.0          |1          |4590.0            |
|1afc82cd60e303ef09b4ef9837c9505c|4399.8701171875 |1          |4399.8701171875   |
|35a413c7ca3c69756cb75867d6311c0d|4099.990234375  |1          |4099.990234375    |
|e9b0d0eb3015ef1c9ce6cf5b9dcbee9f|4059.0          |1          |4059.0            |
|c6695e3b1e48680db36b487419fb0398|3999.89990234375|1          |3999.89990234375  |
+--------------------------------+----------------+-----------+------------------+
only showing top 10 rows

SparkSession 已关闭,程序执行完毕。



发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1