离线数据分析 第二部分
创建数据表
(1)进入ClickHouse
# 启动客户端
clickhouse-client
(2)在ClickHouse中创建分析库analysisdb,用于存放数据表
# 查看数据库
show databases;
# 先删除数据库
drop database if exists analysisdb
# 再建数据库
create database if not exists analysisdb
(3)切换当前数据库为刚刚创建的analysisdb
use analysisdb
(4)创建订单表Orders,该表字段包括cid(客户ID)、oid(订单ID)、purchase(下单时间)、orderStatus(订单状态)、delivery(配送完成时间),其中cid、oid、orderStatus为String类型,purchase为date类型,delivery为Int32类型,并按orderStatus分区,在每个分区内按cid和oid排序
-- 创建数据表 Orders
CREATE TABLE IF NOT EXISTS analysisdb.Orders ( \
cid String, \
oid String, \
purchase Date, \
orderStatus String, \
delivery Int32 \
) ENGINE = MergeTree() \
PARTITION BY (orderStatus) \
ORDER BY (cid, oid);
(5)创建订单详情表OrderItems,该表字段包括oid、itemId、pid、limitDate(配送截止日期)、price,其中oid、itemId、pid为String类型,limitDate为date类型,price为Float32类型,按limitDate分区,在每个分区内按oid排序
-- 创建数据表 OrderItems
CREATE TABLE IF NOT EXISTS analysisdb.OrderItems ( \
oid String, \
itemId String, \
pid String, \
limitDate Date, \
price Float32 \
) ENGINE = MergeTree() \
PARTITION BY toYYYYMM(limitDate) \
ORDER BY (oid);
(6)创建产品名称表ProductInfo,该表字段包括categoryName和pid,且均为String类型,按pid排序
-- 创建数据表 ProductInfo
CREATE TABLE IF NOT EXISTS analysisdb.ProductInfo( \
categoryName String, \
pid String \
)ENGINE = MergeTree() \
ORDER BY (pid) ;
(7)查看已创建的3个数据表
show tables;
(8)创建一份名为OrdersToClickhouse的Scala Object文件,将Hive的Orders表数据存入Clickhouse数据表
实验任务4:
基于 Spark 的 Hive 的orders表数据清洗并写入 ClickHouse的 Orders 表
🎯实验任务描述:
本实验通过一个完整的订单数据处理案例,引导学生使用 Spark 从 Hive 表中读取原始订单数据,完成空值处理、字段去重、时间字段转换、字段重命名等清洗操作,并将结果写入 ClickHouse 数据库中的目标表。重点掌握 Spark 与 Hive 的读取集成、Spark SQL 的数据清洗能力,以及 Spark 与 ClickHouse 的 JDBC 写入流程。
📌实验步骤概览:
创建 SparkSession 并启用 Hive 支持
读取 Hive 中的
orders表进行空值过滤与去重处理
完成时间字段格式转换与日期提取
保留核心字段并进行字段重命名
配置 ClickHouse JDBC 写入参数
将处理后的数据写入 ClickHouse 中的
Orders表释放 Spark 资源,结束程序运行
✅实验要求:
1. 环境准备
已安装并正确配置以下组件:
Apache Spark(建议支持 Hive)
Apache Hive,包含
orders表及测试数据ClickHouse,并预先创建好目标数据库
analysisdb及表Orders已添加 ClickHouse JDBC 驱动到项目pom中。
2. 编程实现要求
使用 Spark 编写 Scala 程序,完成以下功能:
从 Hive 表
orders中读取数据;过滤掉
orderid、customerid、status任意为空的数据;按
customerid去重,保留每位客户的一条订单记录;将
PurchaseTimestamp转换为timestamp类型;从时间戳中提取日期为新列
purchase;删除无关字段,如
carrierdate、approvedat等;重命名字段,如将
customerid改为cid、orderid改为oid;使用 JDBC 写入 ClickHouse 的
Orders表,采用追加模式。
🧠能力提升目标:
掌握 Spark 读取 Hive 表的基本方式;
熟悉 Spark SQL 中的数据清洗、时间处理与列转换;
掌握 JDBC 的写入方法及 ClickHouse 的连接参数配置;
理解 Spark 在多数据源场景下的数据协同处理能力。
ClickHouse 表结构(Orders 表)
| 字段名 | 数据类型 | 来源字段名 | 字段说明 |
|---|---|---|---|
cid | String | customerid | 客户 ID(字段重命名) |
oid | String | orderid | 订单 ID(字段重命名) |
purchase | Date | purchasetimestamp | 购买时间提取出的日期部分 |
orderStatus | String | status | 订单状态(字段重命名) |
delivery | Int32 | deliverydatetime | 配送所用天数(整型,原始字段) |
完整代码如下
// 包名,用于组织代码
package org.example
// 引入 SparkSession
import org.apache.spark.sql.SparkSession
// 引入常用函数:引用列、格式化时间、字符串转时间戳
import org.apache.spark.sql.functions._
// 定义主对象,程序入口
object OrdersToClickhouse {
def main(args: Array[String]): Unit = {
// 1. 初始化 Spark 配置
println("启动 SparkSession , 并开启 Hive 支持...")
val spark = SparkSession.builder()
.appName("HiveToClickhouse") // 设置应用名称
.master("local[*]") // 本地运行,使用所有 CPU 核心
.enableHiveSupport() // 开启对 Hive 的支持,才能访问 Hive 表
.getOrCreate()
// 2. 从 Hive 表中读取数据
val hiveTable = "orders" // Hive 表名
println(s"从 Hive 表 '$hiveTable' 读取数据 ...")
val df = spark.table(hiveTable) // 使用 Spark SQL 读取 Hive 表
println(s"读取完成,数据条数:${df.count()}")
// 显示处理后的数据
println("数据清洗完成,展示前5条记录:")
df.show(5, truncate = false)
// 3. 开始数据清洗
println("开始数据清洗...")
// 第一步:过滤空值(删除 orderid、customerid、status 中任意字段为空的记录)
println("第一步:过滤空值(orderid, customerid, status 不能为空)...")
val filteredDf = df.filter(
col("orderid").isNotNull &&
col("customerid").isNotNull &&
col("status").isNotNull &&
col("customerid") =!= "Customer Id" // 排除表头
)
println(s"过滤完成,剩余数据条数:${filteredDf.count()}")
// 第二步:去重,如果一个 customerid 出现多次,只保留一条记录
println("第二步:去重(按 customerid 去重)...")
val deduplicatedDf = filteredDf.dropDuplicates("customerid")
println(s"去重完成,剩余数据条数:${deduplicatedDf.count()}")
// 第三步:转换时间格式(把字符串格式的时间数据(比如 "25/03/2024 14:30:00")转 timestamp格式[如2024-03-25 14:30:00])
println("第三步:将 PurchaseTimestamp 字符串转换为 timestamp 类型 ...")
val timestampFormat = "dd/MM/yyyy HH:mm:ss" // 指定原时间字符串的格式
val dfWithTimestamp = deduplicatedDf.withColumn(
"PurchaseTimestamptimestamp", // 新列名
to_timestamp(col("PurchaseTimestamp"), timestampFormat) // 转换函数
)
// 第四步:提取购买日期:从 timestamp 中提取日期部分,如 2023-04-10
println("第四步:提取购买日期(purchase 字段) ...")
val dateFormat = "yyyy-MM-dd"
val dfWithDate = dfWithTimestamp.withColumn(
"Purchasetdate", // 新列名
date_format(col("PurchaseTimestamptimestamp"), dateFormat)
)
// 第五步:删除无用列:只保留我们需要的字段
println("第五步:删除无用列 ...")
val dfWithDrop = dfWithDate.drop(
"approvedat", "carrierdate", "customerdate", "deliverydate",
"purchasetimestamp", "PurchaseTimestamptimestamp"
)
// 第六步:字段重命名:使字段名更简洁或符合 ClickHouse 的目标表字段
println("第六步:字段重命名 ...")
val dfRenamed = dfWithDrop
.withColumnRenamed("customerid", "cid") // 客户 ID
.withColumnRenamed("orderid", "oid") // 订单 ID
.withColumnRenamed("status", "orderStatus") // 订单状态
.withColumnRenamed("deliverydatetime", "delivery") // 配送时间
.withColumnRenamed("Purchasetdate", "purchase") // 购买日期
// 显示处理后的数据
println("数据清洗完成,展示前 5 条记录:")
dfRenamed.show(5, truncate = false)
// 4. 配置 ClickHouse 连接参数
println("配置 ClickHouse JDBC 连接参数 ...")
val clickhouseJdbcUrl = "jdbc:clickhouse://192.168.36.100:8123/analysisdb" // ClickHouse 数据库地址
val clickhouseProps = new java.util.Properties()
clickhouseProps.setProperty("user", "default") // 用户名
clickhouseProps.setProperty("password", "") // 密码(为空)
clickhouseProps.setProperty("driver", "ru.yandex.clickhouse.ClickHouseDriver") // JDBC 驱动
// 5. 写入 ClickHouse 表
println("开始写入 ClickHouse 表 'Orders' ...")
dfRenamed.write
.mode("append") // 追加模式写入数据
.jdbc(clickhouseJdbcUrl, "Orders", clickhouseProps)
println("写入 ClickHouse 完成。")
// 6. 从 ClickHouse 读取刚写入的数据,显示前 5 条
println("从 ClickHouse 读取 'Orders' 表,显示前 5 条记录 ...")
val clickhouseDf = spark.read
.jdbc(clickhouseJdbcUrl, "Orders", clickhouseProps)
clickhouseDf.show(5, truncate = false)
// 7. 停止 Spark
println("停止 Spark 会话 ...")
spark.stop()
println("程序执行完毕。")
}
}查看clickhouse中的生成的Orders表的数据总行数
# 切换数据库 use analysisdb # 在 clickhouse 的 Shell中查询Orders表的数据总行数 select count(*) from analysisdb.Orders; # 在 clickhouse 的 Shell中查询Orders表的前3行数据 select * from analysisdb.Orders limit 3;
# 运行结果 master :) select count(*) from analysisdb.Orders; SELECT count(*) FROM analysisdb.Orders Query id: 786d3745-f4e9-455d-9730-6562ea9e7a47 ┌─count()─┐ │ 99442 │ └─────────┘ 1 rows in set. Elapsed: 0.013 sec.
打包运行
cd /opt/apps/spark ./bin/spark-submit \ --master yarn \ --deploy-mode client \ --class org.example.OrdersToClickhouse \ --conf spark.hadoop.hive.metastore.uris=thrift://master:9083 \ spark_task-1.0-jar-with-dependencies.jar
(12)创建名为OrderItemsToClickhouse的Scala对象文件,用于将名为"OrderItems"的Hive表数据清洗后存入ClickHouse的OrderItems表。
实验任务5:
基于 Spark 的 Hive 表 OrderItems订单明细数据清洗并写入 ClickHouse的 OrderItems 表实践
🎯实验任务描述:
本实验通过订单明细数据处理案例,指导学生使用 Spark 从 Hive 表 OrderItems 中读取原始数据,完成空值过滤、重复项去除、时间字段格式转换及字段重命名等数据清洗工作,最终通过 JDBC 接口将结果写入 ClickHouse 表 OrderItems。重点掌握 Spark 与 Hive 集成读取、数据清洗流程以及 Spark 写入 ClickHouse 的实践方式。
📌实验步骤概览:
创建 SparkSession 并启用 Hive 支持
读取 Hive 中的
OrderItems表进行关键字段的空值过滤
按订单及产品维度进行去重
将配送截止时间转换为 timestamp 并提取日期
保留核心字段并重命名为业务含义明确的字段名
配置 ClickHouse JDBC 连接参数
将处理结果写入 ClickHouse 的
OrderItems表释放 Spark 资源
✅实验要求:
1. 环境准备
已安装并正确配置以下组件:
Apache Spark(需支持 Hive)
Apache Hive,包含
OrderItems表及测试数据ClickHouse,已创建数据库
analysisdb和目标表OrderItems项目中已引入 ClickHouse JDBC 驱动依赖
2. 编程实现要求
使用 Spark 编写 Scala 程序,完成如下任务:
读取 Hive 表
OrderItems数据;过滤空值(
orderid、itemid、productid、shippinglimitdate);按 (
orderid,itemid,productid) 进行去重;将
shippinglimitdate转换为 timestamp 类型;提取日期字段生成新列
limitDate;删除冗余字段(如原时间列和中间字段);
重命名字段以增强语义清晰度;
使用 JDBC 将数据追加写入 ClickHouse 表
OrderItems。
🧠能力提升目标:
熟练掌握 Spark 与 Hive 表的数据交互读取方式;
提高使用 Spark SQL 进行空值处理、去重、时间转换的能力;
掌握 ClickHouse JDBC 的参数配置及数据写入技巧;
理解清洗流程中字段重命名对数据建模与分析的作用;
提升跨平台数据传输与集成实操经验。
Hive 表结构(OrderItems 表)
ClickHouse 表结构(OrderItems 表)
| 字段名 | 数据类型 | 来源字段名 | 字段说明 |
|---|---|---|---|
oid | String | orderid | 订单 ID(字段重命名) |
itemId | String | itemid | 订单项 ID(字段重命名) |
pid | String | productid | 商品 ID(字段重命名) |
limitDate | Date | shippinglimitdate | 配送截止日期(转换后的新列) |
price | Float32 | price | 商品价格(原字段保留) |
完整代码如下:
// 包名,用于组织代码
package org.example
// 引入 SparkSession
import org.apache.spark.sql.SparkSession
// 引入常用函数:引用列、格式化时间、字符串转时间戳
import org.apache.spark.sql.functions._
object OrderItemsToClickhouse {
def main(args: Array[String]): Unit = {
// 1. 初始化 Spark 配置
println("启动 SparkSession , 并开启 Hive 支持...")
val spark = SparkSession.builder()
.appName("HiveToClickhouse") // 设置应用名称
.master("local[*]") // 本地运行,使用所有 CPU 核心
.enableHiveSupport() // 开启对 Hive 的支持,才能访问 Hive 表
.getOrCreate()
// 2. 从 Hive 表中读取数据
val hiveTable = "OrderItems"
println(s"从 Hive 表 '$hiveTable' 读取数据 ...")
val df = spark.table(hiveTable)
println(s"读取完成,数据条数:${df.count()}")
println("数据表前5行记录:")
df.show(5, truncate = false)
// 3. 开始数据清洗
println("开始数据清洗...")
// 第一步:过滤空值(指定关键字段)
println("第一步:过滤空值(orderid, itemid, productid不能为空)...")
val filteredDf = df.filter(
col("orderid").isNotNull &&
col("itemid").isNotNull &&
col("productid").isNotNull
)
println(s"过滤完成,剩余数据条数:${filteredDf.count()}")
// 第二步:去重(按 orderid, itemid, productid 去重)
println("第二步:去重(按 orderid, itemid, productid 去重)...")
val deduplicatedDf = filteredDf.dropDuplicates("orderid", "itemid", "productid")
println(s"去重完成,剩余数据条数:${deduplicatedDf.count()}")
// 第三步:转换 shippinglimitdate 为 timestamp 类型
println("第三步:将 ShippingLimitDate 字符串转换为 timestamp 类型 ...")
val timestampFormat = "dd/MM/yyyy HH:mm:ss"
val dfWithTimestamp = deduplicatedDf.withColumn(
"date_timestamp",
to_timestamp(col("ShippingLimitDate"), timestampFormat)
)
// 第四步:提取年月日(limitDate 字段)
println("第四步:提取日期部分,生成 limitDate 字段 ...")
val dateFormat = "yyyy-MM-dd"
val dfWithDate = dfWithTimestamp.withColumn(
"limitDate",
date_format(col("date_timestamp"), dateFormat)
)
// 第五步:删除多余字段
println("第五步:删除无用字段 ...")
val dfWithDrop = dfWithDate.drop("shippinglimitdate", "date_timestamp", "freightvalue")
// 第六步:字段重命名(更清晰)
println("第六步:字段重命名 ...")
val dfRenamed = dfWithDrop
.withColumnRenamed("orderid", "oid")
.withColumnRenamed("itemid", "itemId")
.withColumnRenamed("productid", "pid")
.withColumnRenamed("deliverydatetime", "delivery")
// 显示处理后的数据
println("数据清洗完成,展示前 5 条记录:")
dfRenamed.show(5, truncate = false)
// 4. 配置 ClickHouse 连接参数
println("配置 ClickHouse JDBC 连接参数 ...")
val clickhouseJdbcUrl = "jdbc:clickhouse://192.168.36.100:8123/analysisdb"
val clickhouseProps = new java.util.Properties()
clickhouseProps.setProperty("user", "default")
clickhouseProps.setProperty("password", "")
clickhouseProps.setProperty("driver", "ru.yandex.clickhouse.ClickHouseDriver")
// 5. 写入 ClickHouse 表
println("开始写入 ClickHouse 表 'OrderItems' ...")
dfRenamed.write
.mode("append") // 追加写入
.jdbc(clickhouseJdbcUrl, "OrderItems", clickhouseProps)
println("写入 ClickHouse 完成。")
// 6. 从 ClickHouse 读取刚写入的数据,显示前 5 条
println("从 ClickHouse 读取 'OrderItems' 表,显示前 5 条记录 ...")
val clickhouseDf = spark.read
.jdbc(clickhouseJdbcUrl, "OrderItems", clickhouseProps)
clickhouseDf.show(5, truncate = false)
// 7. 停止 Spark
println("停止 Spark 会话 ...")
spark.stop()
println("程序执行完毕。")
}
}查看clickhouse中的生成OrderItems表的数据总行数
# 查看 clickhouse 中的生成OrderItems表的数据总行数 select count(*) from analysisdb.OrderItems; # 在 clickhouse 的 Shell中查询OrderItems表的前3行数据 select * from analysisdb.OrderItems limit 3;
运行结果
SELECT count(*) FROM OrderItems
Query id: 126b826e-1391-4ddc-ad40-d39f8d08575a ┌─count()─┐ │ 112650 │ └─────────┘ 1 rows in set. Elapsed: 0.004 sec.
(13)创建名为ProductInfoToClickhouse的Scala对象文件,用于将名为"ProductInfo"的Hive表数据清洗后存入ClickHouse的ProductInfo表。
实验任务6:
基于 Spark 的商品信息数据清洗并写入 ClickHouse 实践
实验任务描述:
本实验以商品信息表为基础,指导学生使用 Spark 从 Hive 表 productinfo 中读取原始商品数据,通过字段空值清理、去重、字段精简和重命名等步骤完成数据清洗,最终将结果通过 JDBC 接口写入 ClickHouse 表 ProductInfo。重点掌握 Spark 对字符串和结构化字段的清洗技巧,以及 ClickHouse 的高效写入流程。
实验步骤概览:
创建 SparkSession 并启用 Hive 支持
读取 Hive 中的
productinfo表对所有字段进行空值与字符串清理
根据
productid进行去重删除无关字段,保留核心信息
重命名关键字段,增强语义清晰度
配置 ClickHouse JDBC 连接参数
将处理结果写入 ClickHouse 的
ProductInfo表释放 Spark 资源
实验要求:
1. 环境准备
已安装并正确配置以下组件:
Apache Spark(支持 Hive)
Apache Hive,包含
productinfo表及商品测试数据ClickHouse,预先创建好目标数据库
analysisdb和表ProductInfo项目中已引入 ClickHouse JDBC 驱动依赖
2. 编程实现要求
使用 Spark 编写 Scala 程序,完成以下数据处理任务:
从 Hive 表
productinfo中读取原始数据;对所有字段进行空值处理,字符串列需
TRIM()后非空;使用
productid进行去重;删除冗余字段(如描述长度、尺寸、照片数量等);
重命名关键字段(如
productid → pid,categoryname → categoryName);将处理结果通过 JDBC 接口写入 ClickHouse 表
ProductInfo。
能力提升目标:
掌握 Spark 对结构化数据的空值和非空字符串处理方法;
提高字段筛选、字段清理及语义增强的实际操作能力;
熟悉 Spark 与 ClickHouse 的高效连接与写入方式;
培养在真实业务数据清洗中的字段优化与映射意识。
Hive 表结构(productinfo 表)
| 字段名 | 数据类型 | 说明 |
|---|---|---|
productid | STRING | 商品 ID |
categoryname | STRING | 商品类别名称 |
descriptionlenght | INT | 描述长度(不保留) |
heightcm | DOUBLE | 商品高度(厘米)(不保留) |
lengthcm | DOUBLE | 商品长度(厘米)(不保留) |
namelenght | INT | 名称长度(不保留) |
photosqty | INT | 图片数量(不保留) |
weightg | DOUBLE | 商品重量(克)(不保留) |
widthcm | DOUBLE | 商品宽度(厘米)(不保留) |
ClickHouse 表结构(ProductInfo 表)
| 字段名 | 数据类型 | 来源字段名 | 说明 |
|---|---|---|---|
pid | String | productid | 商品 ID(重命名) |
categoryName | String | categoryname | 商品类别名称(重命名) |
完整代码如下:
package org.example
// 引入 SparkSession
import org.apache.spark.sql.SparkSession
// 导入 Apache Spark SQL 中的 DataTypes 类,用于定义和操作数据类型
import org.apache.spark.sql.types.DataTypes
object ProductInfoToClickhouse {
def main(args: Array[String]): Unit = {
// 1. 初始化 Spark 配置
println("启动 SparkSession , 并开启 Hive 支持...")
val spark = SparkSession.builder()
.appName("HiveToClickhouse") // 设置应用名称
.master("local[*]") // 本地运行,使用所有 CPU 核心
.enableHiveSupport() // 开启对 Hive 的支持,才能访问 Hive 表
.getOrCreate()
// 2. 从 Hive 表中读取数据
val hiveTable = "productinfo"
println(s"从 Hive 表 '$hiveTable' 读取数据 ...")
val df= spark.table(hiveTable)
println(s"读取完成,数据条数:${df.count()}")
println("数据表展示前 5 条记录:")
df.show(5, truncate = false)
// 3. 开始数据清洗
println("开始数据清洗...")
// 第一步:过滤空值(包含字符串去除空格后的非空判断)
println("第一步:过滤空值(包括 TRIM 后非空的字符串列)...")
val filterCondition = df.schema.fields.map { field =>
val columnName = field.name
val dataType = field.dataType
if (dataType == DataTypes.StringType) {
s"TRIM($columnName) <> '' AND $columnName IS NOT NULL"
} else {
s"$columnName IS NOT NULL"
}
}.mkString(" AND ")
val filteredDf = df.filter(filterCondition)
println(s"过滤完成,剩余数据条数:${filteredDf.count()}")
// 第二步:去重(按 productid 去重)
println("第二步:根据 productid 去重 ...")
val deduplicatedDf = filteredDf.dropDuplicates("productid")
println(s"去重完成,剩余数据条数:${deduplicatedDf.count()}")
// 第三步:删除不需要的列
println("第三步:删除无用字段 ...")
val dfWithDrop = deduplicatedDf.drop(
"descriptionlenght", "heightcm", "lengthcm",
"namelenght", "photosqty", "weightg", "widthcm"
)
// 第四步:字段重命名(productid → pid,categoryname → categoryName)
println("第四步:字段重命名 ...")
val dfRenamed = dfWithDrop
.withColumnRenamed("productid", "pid")
.withColumnRenamed("categoryname", "categoryName")
// 显示处理后的前5条数据
println("数据清洗完成,展示前 5 条记录:")
dfRenamed.show(5, truncate = false)
// 4. 配置 ClickHouse JDBC 连接参数
println("配置 ClickHouse JDBC 连接参数 ...")
val clickhouseJdbcUrl = "jdbc:clickhouse://192.168.36.100:8123/analysisdb"
val clickhouseProps = new java.util.Properties()
clickhouseProps.setProperty("user", "default")
clickhouseProps.setProperty("password", "")
clickhouseProps.setProperty("driver", "ru.yandex.clickhouse.ClickHouseDriver")
// 5. 写入 ClickHouse 表
println("开始写入 ClickHouse 表 'ProductInfo' ...")
dfRenamed.write
.mode("append") // 追加写入
.jdbc(clickhouseJdbcUrl, "ProductInfo", clickhouseProps)
println("写入 ClickHouse 完成。")
// 6. 从 ClickHouse 读取刚写入的数据,显示前 5 条
println("从 ClickHouse 读取 'OrderItems' 表,显示前 5 条记录 ...")
val clickhouseDf = spark.read
.jdbc(clickhouseJdbcUrl, "ProductInfo", clickhouseProps)
clickhouseDf.show(5, truncate = false)
// 7. 停止 Spark
println("停止 Spark 会话 ...")
spark.stop()
println("程序执行完毕。")
}
}查看clickhouse中的生成的OrderItems表的数据总行数
# 查看clickhouse中的生成的OrderItems表的数据总行数 select count(*) from ProductInfo;
运行结果
SELECT count(*) FROM ProductInfo
Query id: 8cd7b05c-3c31-4d60-ad2d-cc0d0414e39d ┌─count()─┐ │ 32340 │ └─────────┘ 1 rows in set. Elapsed: 0.003 sec.
四、数据分析
1. Hive数据分析
启动环境
# 启动Hadoop start-dfs.sh start-yarn.sh # 启动Hive服务 hive --service metastore & hive --service hiveserver2 & # 启动hive的客户端IDEA
HiveQL
实验任务7:
(1)统计分析各种支付方式的使用情况
# 选择默认数据库 use default; # 统计分析各种支付方式的使用数量 SELECT PayType , COUNT(Oid) AS total FROM details GROUP BY PayType;
# 运行结果 +--------------+--------+ | paytype | total | +--------------+--------+ | NULL | 1 | | boleto | 19784 | 银行付款票据 | credit_card | 76505 | 信用卡 | debit_card | 1528 | 借记卡 | not_defined | 3 | | voucher | 3866 | 代金券 +--------------+--------+
将分析结果存入pay_type_analyse中
# 统计分析各种支付方式的使用数量,将分析结果存入pay_type_analyse中 insert into pay_type_analyse SELECT PayType , COUNT(Oid) AS total FROM Details where PayType is not null GROUP BY PayType;
(2)统计分析不同地域的销售情况

# 统计分析不同地域的销售情况 SELECT U.Statename, SUM(P.Price) AS total FROM Usertb U LEFT JOIN Details D ON U.Cid = D.Uid LEFT JOIN ProductList P ON D.Oid = P.Oid GROUP BY U.Statename;
# 运行结果 +----------------------+---------------------+ | statename | total | +----------------------+---------------------+ | Acre | 16108.449999999992 | | Alagoas | 79022.53000000004 | | Amapá | 12151.269999999993 | | Amazonas | 21455.910000000018 | | Bahia | 489701.570000006 | | Ceará | 221158.31999999803 | | Distrito Federal | 292844.2299999968 | | Espírito Santo | 262677.1299999957 | | Goiás | 275896.6599999962 | | Maranhão | 116451.51999999983 | | MatoGrosso | 144508.37999999925 | | MatoGrosso do Sul | 111570.4999999999 | | Minas Gerais | 1522408.3699999005 | | Paraná | 647553.4000000175 | | Paraíba | 111140.20999999985 | | Pará | 173956.2199999987 | | Pernambuco | 256667.9599999968 | | Piauí | 83226.23999999996 | | Rio Grande do Norte | 82514.91999999995 | | Rio Grande do Sul | 717682.2600000211 | | Rio de Janeiro | 1742382.1499998374 | | Rondônia | 45625.37000000009 | | Roraima | 6958.519999999996 | | Santa Catarina | 498155.3700000074 | | Sergipe | 57116.060000000056 | | São Paulo | 4978608.120001085 | | Tocantins | 48900.790000000045 | +----------------------+---------------------+
将分析结果存入area_sale_analyse中
# 统计分析不同地域的销售情况 , 将分析结果存入area_sale_analyse中 INSERT INTO area_sale_analyse SELECT U.Statename, SUM(P.Price) AS total FROM Usertb U LEFT JOIN Details D ON U.Cid = D.Uid LEFT JOIN ProductList P ON D.Oid = P.Oid GROUP BY U.Statename;
Spark离线分析
(3)查看在不同时间段用户下单的情况
实验任务8:
基于 Spark 的订单数据时间分析实验
🎯实验任务描述:
本实验通过一个完整的订单分析案例,引导学生使用 Spark 从 Hive 表中读取订单数据,完成时间字段的解析、清洗与转换,并以小时为维度进行统计分析。重点掌握 Spark SQL 中时间字段的处理流程,提升学生对数据预处理与分组分析的能力。
📌实验步骤概览:
创建 SparkSession 并启用 Hive 支持
读取 Hive 中的
Orders表数据将
PurchaseTimestamp字段转换为标准时间类型过滤无效时间并提取订单下单的小时信息
统计每小时的订单数量并展示结果
关闭 SparkSession
✅实验要求:
1. 环境准备
安装并配置好 Spark 和 Hive。
Hive 中已创建并导入
Orders表,包含订单字段PurchaseTimestamp。时间字段格式:
dd/MM/yyyy HH:mm:ss。
2. 编程实现要求
使用 Spark 编写 Scala 程序,完成以下功能:
读取 Hive 表
Orders中的订单数据;将字符串格式的
PurchaseTimestamp字段转换为timestamp类型;过滤掉转换失败(为 null)的记录;
使用
hour()函数提取下单时间中的小时部分;统计各个小时的订单数量,并以表格形式输出结果;
可选:打印前 10 行的转换示例,辅助调试与理解。
🧠能力提升目标:
掌握 Spark 与 Hive 表集成的查询与数据提取方式;
理解
unix_timestamp和cast的时间转换机制;掌握使用 Spark SQL 进行数据清洗与字段提取;
能够运用聚合函数进行分组统计分析;
提升日志类、订单类数据的时间字段处理能力。
完整代码如下:
package org.example
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SparkSession, functions}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType
object HiveAnalysis {
def main(args: Array[String]): Unit = {
// 1. 初始化 Spark 配置
println("启动 SparkSession , 并开启 Hive 支持...")
val spark = SparkSession.builder()
.appName("HiveToClickhouse") // 设置应用名称
.master("local[*]") // 本地运行,使用所有 CPU 核心
.enableHiveSupport() // 开启对 Hive 的支持,才能访问 Hive 表
.getOrCreate()
// 2. 从 Hive 表读取数据
println("3.从 Hive 表 Orders 读取数据 ...")
val ordersDF = spark.sql("SELECT * FROM Orders")
ordersDF.show(5, truncate = false) // 显示前 5 条记录(不截断列内容)
println(s"Hive中的Orders表读取完成,共有数据条数:${ordersDF.count()}")
// 3. 将字符串时间转换为 Timestamp 类型,并删除转换失败(为 null)的记录
println("4.将 PurchaseTimestamp 转换为 Timestamp,并清洗格式错误的记录 ...")
val dateFormat = "dd/MM/yyyy HH:mm:ss"
// 在 DataFrame 中新增一个列 "order_timestamp_parsed",
// 把原来的 "PurchaseTimestamp"(字符串时间)转换为 Timestamp 类型(yyyy-MM-dd HH:mm:ss)
val dfWithTimestamp = ordersDF
.withColumn(
"order_timestamp_parsed", // 新列名
to_timestamp( // 内置函数:把字符串转换为 Timestamp 类型
col("PurchaseTimestamp"),// 取出 "PurchaseTimestamp" 列的值
dateFormat // 按照指定的时间格式解析:dd/MM/yyyy HH:mm:ss
)
)
// 4. 打印前几行时间转换示例
println("5.前 5 行时间字段转换示例:")
dfWithTimestamp.select("PurchaseTimestamp", "order_timestamp_parsed")
.show(5, truncate = false)
// 5. 过滤掉时间转换失败的数据(null),并提取小时字段
println("6.提取订单时间中的小时字段 ...")
val ordersDFWithHour = dfWithTimestamp
.filter(col("order_timestamp_parsed").isNotNull)
.withColumn("hour", hour(col("order_timestamp_parsed")))
// 6. 按小时统计订单数量
println("7.按小时统计订单数量 ...")
val orderCountByHour = ordersDFWithHour
.groupBy("hour")
.agg(count("*").as("order_count"))
.orderBy("hour")
// 7. 显示结果
println("8.每个小时的订单数量如下:")
orderCountByHour.show(24, truncate = false)
// 8. 关闭 SparkSession
println("9.关闭 SparkSession,程序执行完毕。")
spark.stop()
}
}# 运行结果 每个小时的订单数量如下: +----+-----------+ |hour|order_count| +----+-----------+ |0 |2394 | |1 |1170 | |2 |510 | |3 |272 | |4 |206 | |5 |188 | |6 |502 | |7 |1231 | |8 |2967 | |9 |4785 | |10 |6177 | |11 |6578 | |12 |5995 | |13 |6518 | |14 |6569 | |15 |6454 | |16 |6675 | |17 |6150 | |18 |5769 | |19 |5982 | |20 |6193 | |21 |6217 | |22 |5816 | |23 |4123 | +----+-----------+ only showing top 24 rows
方法二:Hive Sql实现
-- 1. 过滤掉无效 PurchaseTimestamp 的记录,先尝试转换为 timestamp WITH orders_with_ts AS ( SELECT *, unix_timestamp(PurchaseTimestamp, 'dd/MM/yyyy HH:mm:ss') AS ts FROM Orders WHERE PurchaseTimestamp IS NOT NULL ), -- 2. 过滤掉转换失败的行(ts 为 null 或 0),并提取小时字段 orders_with_hour AS ( SELECT hour(from_unixtime(ts)) AS hour FROM orders_with_ts WHERE ts IS NOT NULL AND ts != 0 ) -- 3. 按小时统计订单数量 SELECT hour, COUNT(*) AS order_count FROM orders_with_hour GROUP BY hour ORDER BY hour;
2. Hbase数据分析
实验任务9:
Hbase查询
(1)查看订单ID为“000229ec398224ef6ca0657da4fc703e”的订单详细信息
scan 'OrderItemsHB', { FILTER => "SingleColumnValueFilter('orderinfo', 'orderid', =, 'binary:000229ec398224ef6ca0657da4fc703e')" }# 代码注释
scan 'OrderItemsHB', \ -- 🔍 从名为 OrderItemsHB 的表中扫描所有行(就是去查表里的数据)
{
FILTER => "SingleColumnValueFilter( \
'orderinfo', \ -- 列族名:orderinfo,相当于一个“字段分类文件夹” \
'orderid', \ -- 列名:orderid,就是我们要检查的字段 \
=, \ -- 比较方式:=,表示“等于” \
'binary:000229ec398224ef6ca0657da4fc703e')" -- 要匹配的值,表示只要这个字段的值正好等于这个,就返回这一行
}
# SingleColumnValueFilter 是用来查 “某一列的值是否满足某个条件” 的过滤器。
# binary: 表示要“完全一模一样”地匹配这个值。
# 用 HBase 存储的所有值其实都是二进制(字节数组),而不是字符串。
# 所以当你写 'binary:abc123',HBase 的意思是:
# “把这个值转成字节,然后去一条条比,谁的字节完全一样就留下,其他都不要。”
(2)列出OrdersHB表中“row10”的用户订单信息
# 按行键(rowkey)查找名为 OrdersHB 的表中,rowkey 为 row10 的那一行数据。 get 'OrdersHB', 'row10'
(3)列出购买时间(Purchasetdate)在2016年8月1日到2016年10月1日内创建的所有订单
scan 'OrdersHB', {FILTER => "SingleColumnValueFilter('timeinfo', 'Purchasetdate', >=, 'binary:2016-08-01') AND SingleColumnValueFilter('timeinfo', 'Purchasetdate', <=, 'binary:2016-10-01')"}
3. ClickHouse数据分析
ClickHouse SQL
切换当前数据库为刚刚创建的analysisdb
# 切换当前数据库为analysisdb use analysisdb
查询订单表Orders
ClickHouse 表结构(Orders 表)
| 字段名 | 数据类型 | 来源字段名 | 字段说明 |
|---|---|---|---|
cid | String | customerid | 客户 ID(字段重命名) |
oid | String | orderid | 订单 ID(字段重命名) |
purchase | Date | purchasetimestamp | 购买时间提取出的日期部分 |
orderStatus | String | status | 订单状态(字段重命名) |
delivery | Int32 | deliverydatetime | 配送所用天数(整型,原始字段) |
实验任务10:
(1)统计每天的订单数量,并按从多到少的顺序进行排序
SELECT \ purchase AS order_date, \ COUNT(*) AS order_count \ FROM analysisdb.Orders \ GROUP BY purchase \ ORDER BY order_count DESC;
(2)统计每个用户的总消费金额和平均消费金额,并按平均消费金额进行排序(只显示前10行)
ClickHouse 表结构(Orders 表)
| 字段名 | 数据类型 | 来源字段名 | 字段说明 |
|---|---|---|---|
cid | String | customerid | 客户 ID(字段重命名) |
oid | String | orderid | 订单 ID(字段重命名) |
purchase | Date | purchasetimestamp | 购买时间提取出的日期部分 |
orderStatus | String | status | 订单状态(字段重命名) |
delivery | Int32 | deliverydatetime | 配送所用天数(整型,原始字段) |
ClickHouse 表结构(OrderItems 表)
| 字段名 | 数据类型 | 来源字段名 | 字段说明 |
|---|---|---|---|
oid | String | orderid | 订单 ID(字段重命名) |
itemId | String | itemid | 订单项 ID(字段重命名) |
pid | String | productid | 商品 ID(字段重命名) |
limitDate | Date | shippinglimitdate | 配送截止日期(转换后的新列) |
price | Float32 | price | 商品价格(原字段保留) |
# 统计每个用户的总消费金额和平均消费金额,并按平均消费金额进行排序(只显示前10行) SELECT \ o.cid AS user_id, \ SUM(oi.price) AS total_spent, \ AVG(oi.price) AS avg_spent_per_order \ FROM analysisdb.Orders AS o \ INNER JOIN analysisdb.OrderItems AS oi ON o.oid = oi.oid \ GROUP BY o.cid \ ORDER BY total_spent DESC \ limit 10;
(3)统计销售额最高的商品
ClickHouse 表结构(OrderItems 表)
| 字段名 | 数据类型 | 来源字段名 | 字段说明 |
|---|---|---|---|
oid | String | orderid | 订单 ID(字段重命名) |
itemId | String | itemid | 订单项 ID(字段重命名) |
pid | String | productid | 商品 ID(字段重命名) |
limitDate | Date | shippinglimitdate | 配送截止日期(转换后的新列) |
price | Float32 | price | 商品价格(原字段保留) |
🏁 ClickHouse 表结构(ProductInfo 表)
| 字段名 | 数据类型 | 来源字段名 | 说明 |
|---|---|---|---|
pid | String | productid | 商品 ID(重命名) |
categoryName | String | categoryname | 商品类别名称(重命名) |
# 统计销售额最高的商品 SELECT \ p.pid AS product_id, \ p.categoryName AS product_category, \ SUM(oi.price) AS total_sales \ FROM analysisdb.OrderItems AS oi \ INNER JOIN analysisdb.ProductInfo AS p ON oi.pid = p.pid \ GROUP BY \ p.pid, \ p.categoryName \ ORDER BY total_sales DESC \ LIMIT 1;

Spark离线分析
(4)使用Spark统计每个用户的平均消费金额
完整代码:
package org.example
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object ClickHouseAnalysis {
def main(args: Array[String]): Unit = {
// ====================================================
// 一、Spark 环境配置与初始化
// ====================================================
// 1.1 配置 Spark 应用基本信息(运行方式、名称、序列化方式)
val sparkConf = new SparkConf()
.setMaster("local[*]") // 使用本地所有CPU核心运行
.setAppName("ClickHouseAnalysis") // 应用名称
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 提升效率
// 1.2 创建 SparkSession(Spark 的主入口)
val spark = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport() // 虽然这次不用 Hive,但保留也不会影响
.getOrCreate()
println("SparkSession 启动成功。")
// ====================================================
// 二、读取 ClickHouse 中的数据表
// ====================================================
// 2.1 设置 ClickHouse 数据库连接地址
val clickhouseUrl = "jdbc:clickhouse://192.168.36.100:8123/analysisdb"
// 2.2 读取 Orders 表(订单表,包含用户和订单状态等信息)
val ordersDF = spark.read
.format("jdbc")
.option("url", clickhouseUrl)
.option("dbtable", "Orders")
.load()
// 2.3 读取 OrderItems 表(订单项表,包含商品价格、订单编号等)
val orderItemsDF = spark.read
.format("jdbc")
.option("url", clickhouseUrl)
.option("dbtable", "OrderItems")
.load()
// 2.4 打印两张表的记录数量,确认数据是否成功读取
println(s"Orders 表记录数:${ordersDF.count()}")
println(s"OrderItems 表记录数:${orderItemsDF.count()}")
// ====================================================
// 三、订单数据关联与初步转换
// ====================================================
// 3.1 通过订单编号(oid)将两张表连接起来,形成完整订单信息
val joinedDF = ordersDF.join(orderItemsDF, "oid") // 使用公共字段 oid 自动去重
// 3.2 选择用户ID和商品价格字段,准备后续统计字段
val withTotalPriceDF = joinedDF.select(
col("cid").as("user_id"), // 用户 ID
col("price").as("price_per_item") // 商品单价
).withColumn(
"order_total", col("price_per_item") // 本实验将“单价”视作“销售额”(简化处理)
)
// ====================================================
// 四、用户维度的订单分析
// ====================================================
// 4.1 按用户分组,统计每位用户的总消费金额和订单项数量
val userSpendingDF = withTotalPriceDF.groupBy("user_id").agg(
sum("order_total").as("total_spent"), // 每位用户总花费
count("*").as("order_count") // 每位用户订单项数量
)
// 4.2 计算每位用户的平均消费金额
val userAverageSpendingDF = userSpendingDF.withColumn(
"avg_spent_per_user",
col("total_spent") / col("order_count") // 平均消费 = 总金额 / 项数
)
// 4.3 按照平均消费金额从高到低排序
val sortedUserAverageSpendingDF = userAverageSpendingDF.orderBy(desc("avg_spent_per_user"))
// 4.4 显示前 10 位用户的消费情况(高消费用户排行)
println("前 10 位用户的消费分析结果:")
sortedUserAverageSpendingDF.show(10, truncate = false)
// ====================================================
// 五、关闭 SparkSession
// ====================================================
// 5.1 关闭 Spark,释放资源
spark.stop()
println("SparkSession 已关闭,程序执行完毕。")
}
}
程序运行结果:
SparkSession 启动成功。
Orders 表记录数:99442
OrderItems 表记录数:112650
前 10 位用户的消费分析结果:
+--------------------------------+----------------+-----------+------------------+
|user_id |total_spent |order_count|avg_spent_per_user|
+--------------------------------+----------------+-----------+------------------+
|c6e2731c5b391845f6800c97401a43a9|6735.0 |1 |6735.0 |
|f48d464a0baaea338cb25f816991ab1f|6729.0 |1 |6729.0 |
|3fd6777bbce08a352fddd04e4a7cc8f6|6499.0 |1 |6499.0 |
|df55c14d1476a9a3467f131269c2477f|4799.0 |1 |4799.0 |
|24bbf5fd2f2e1b359ee7de94defc4a15|4690.0 |1 |4690.0 |
|3d979689f636322c62418b6346b1c6d2|4590.0 |1 |4590.0 |
|1afc82cd60e303ef09b4ef9837c9505c|4399.8701171875 |1 |4399.8701171875 |
|35a413c7ca3c69756cb75867d6311c0d|4099.990234375 |1 |4099.990234375 |
|e9b0d0eb3015ef1c9ce6cf5b9dcbee9f|4059.0 |1 |4059.0 |
|c6695e3b1e48680db36b487419fb0398|3999.89990234375|1 |3999.89990234375 |
+--------------------------------+----------------+-----------+------------------+
only showing top 10 rows
SparkSession 已关闭,程序执行完毕。