一、基本要求
(1)熟练掌握Scala编程语言;
(2)深入理解Hadoop、Hive、HBase、Spark等组件的原理和使用方法;
(3)能够使用Hive构建数据仓库,设计合理的数据模型,创建和管理数据表,以及进行数据导入和导出操作;
(4)熟练掌握HBase的存储机制和数据访问方式,能够设计高效的HBase表结构,实现数据的快速读写;
(5)熟练使用Spark进行数据处理和分析,包括数据清洗、转换、聚合等操作;
(6)能够从全局的角度思考问题,关注数据的整体性和一致性;
(7)具备清晰的逻辑思维能力,能够理解和分析复杂的数据处理流程,以及设计合理的数据处理和分析方案。
二、前置环境设置
1.三台集群机
2.Java环境
3.Hadoop集群环境部署
4.系统防火墙关闭
5.Hbase环境
6.Hive环境
7.ClickHouse环境
8.Spark环境
9.Windows环境下的Hadoop
10.IDEA
三、数据存储
启动Hive环境
# 在master上操作
start-dfs.sh
start-yarn.sh
hive --service metastore &
hive --service hiveserver2 &
beeline -u jdbc:hive2://master:10000 -n root
启动Hbase环境
# 在三个节点都启动Zookeeper
zkServer.sh start
# 在master节点启动HBase服务
start-hbase.sh
# 在master节点启动Hbase
hbase shell
1. 构建Hive仓库
数据上传
(1)准备本地数据文件
(2)在master节点创建/opt/datas/data目录,用于存放本地离线电商数据
mkdir -p /opt/datas/data
(3)上传的本地“电商离线数据”至master节点/opt/datas/data目录下
# 把目录7 Brazillian E - commerce(1)下的文件上传到至/opt/datas/data目录下
注:Customers为客户信息表,Geodata为地理位置表,Orders为订单表,Order_items为订单详情表,Payments为支付信息表,ProductID为商品信息表,Product_name为商品分类表,Product_reviews为商品评论表,Sheet1为地理位置及代码。
(4)在HDFS中创建目录/hive/orderItems和/hive/orders(需提前启动Hadoop环境),分别用于存放Order_items和Orders数据
hdfs dfs -mkdir -p /hive/orderItems
hdfs dfs -mkdir -p /hive/orders
(5)将orders订单表和order_items订单详情表数据上传至HDFS对应目录下
hdfs dfs -put /opt/datas/data/order_items.csv /hive/orderItems
hdfs dfs -put /opt/datas/data/Orders.csv /hive/orders
构建Hive数仓表
(6)在Hive中创建多个外部表来装载本地数据或映射HDFS上的原始数据
| 序号 | 表名 | 中文注释 | 表类型 |
|---|---|---|---|
| 1 | Customers | 客户信息表 | 外部表 |
| 2 | Geodata | 地理位置表 | 外部表 |
| 3 | Payments | 支付信息表 | 外部表 |
| 4 | ProductName | 商品分类表 | 外部表 |
| 5 | ProductReviews | 商品评论表 | 外部表 |
| 6 | ProductInfo | 商品信息表 | 外部表 |
| 7 | StateCode | 地理位置及代码表 | 外部表 |
| 8 | Orders | 订单表 | 外部表 |
| 9 | OrderItems | 订单详情表 | 外部表 |
| 10 | pay_type_analyse | 支付类型分析表 | 内部表 |
| 11 | area_sale_analyse | 区域销售分析表 | 内部表 |
# 启动 IDEA 去创建下面的数据表
# 选择Hive默认数据库
use default;
# 查看当前是不是在默认的数据库下
select current_database();
# 删除旧表,确保初始化
DROP TABLE IF EXISTS Customers;
DROP TABLE IF EXISTS Geodata;
DROP TABLE IF EXISTS Payments;
DROP TABLE IF EXISTS ProductName;
DROP TABLE IF EXISTS ProductReviews;
DROP TABLE IF EXISTS ProductInfo;
DROP TABLE IF EXISTS StateCode;
DROP TABLE IF EXISTS Orders;
DROP TABLE IF EXISTS OrderItems;
DROP TABLE IF EXISTS pay_type_analyse;
DROP TABLE IF EXISTS area_sale_analyse;
DROP TABLE IF EXISTS Usertb;
DROP TABLE IF EXISTS Details;
DROP TABLE IF EXISTS ProductList;
创建11个Hive数据表
1.创建客户信息表Customers,该表为外部表,指向目录/opt/datas/data/下的Customers.csv,表字段有CustomerCity、customerid、CustomerState、CustomerUniqueId、CustomerZipCodePrefix,其中CustomerZipCodePrefix为INT类型,其余为STRING类型
-- 1.创建客户信息表`Customers`
CREATE EXTERNAL TABLE IF NOT EXISTS Customers (
CustomerCity STRING COMMENT '客户所在城市',
customerid STRING COMMENT '客户ID',
CustomerState STRING COMMENT '客户所在州',
CustomerUniqueId STRING COMMENT '客户唯一标识符',
CustomerZipCodePrefix INT COMMENT '客户邮政编码前缀'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
tblproperties("skip.header.line.count"="1");
客户唯一标识符:如身份证号
客户ID:客户如注册了两个帐户【两部手机】
2.创建地理位置表Geodata,该表为外部表,指向目录/opt/datas/data/下的Geodata.csv,表字段有City、State、Code、Lat、Lng,其中City、State为STRING类型,Code为INT类型,Lat、Lng为DOUBLE类型
-- 2.创建地理位置表`Geodata`
CREATE EXTERNAL TABLE IF NOT EXISTS Geodata (
City STRING COMMENT '城市',
State STRING COMMENT '州',
Code INT COMMENT '地理位置邮政编码前缀',
Lat DOUBLE COMMENT '纬度',
Lng DOUBLE COMMENT '经度'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
tblproperties("skip.header.line.count"="1");
3.创建支付信息表Payments,该表为外部表,指向目录/opt/datas/data/下的Payments.csv,表字段有Orderid、Type、Installments、Sequential、Value,其中Orderid、Type为STRING类型,Installments、Sequential为INT类型,Value为DOUBLE类型
-- 3.创建支付信息表`Payments`
CREATE EXTERNAL TABLE IF NOT EXISTS Payments (
Orderid STRING COMMENT '订单编号(付款)',
Type STRING COMMENT '付款类型',
Installments INT COMMENT '支付分期付款',
Sequential INT COMMENT '付款顺序',
Value DOUBLE COMMENT '支付金额'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
tblproperties("skip.header.line.count"="1");
4.创建商品分类表ProductName,该表为外部表,指向目录/opt/datas/data/下的“Product Name.csv”,表字段有Name和NameEnglish,且均为STRING类型
-- 4.创建商品分类表`ProductName`
CREATE EXTERNAL TABLE IF NOT EXISTS ProductName (
Name STRING COMMENT '产品类别名称(巴西语)',
NameEnglish STRING COMMENT '产品类别名称(英文)'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
tblproperties("skip.header.line.count"="1");
5.创建商品评论表ProductReviews,该表为外部表,指向目录/opt/datas/data/下的“Product Reviews.csv”,表字段有Orderid、CommentTitle、Score,其中Orderid、CommentTitle为STRING类型,Score为INT类型
-- 5.创建商品评论表`ProductReviews`
CREATE EXTERNAL TABLE IF NOT EXISTS ProductReviews (
Orderid STRING COMMENT '订单编号(产品评论)',
CommentTitle STRING COMMENT '评论标题',
Score INT COMMENT '评分分数'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
tblproperties("skip.header.line.count"="1");
6.创建商品信息表ProductInfo,该表为外部表,指向目录/opt/datas/data/下的“ProductID.csv”,表字段有CategoryName、ProductID、DescriptionLenght、HeightCm、LengthCm、NameLenght、PhotosQty、WeightG、WidthCm,其中CategoryName、ProductID为STRING类型,其余均为INT类型
-- 6.创建商品信息表`ProductInfo`
CREATE EXTERNAL TABLE IF NOT EXISTS ProductInfo (
CategoryName STRING COMMENT '产品类别名称',
ProductID STRING COMMENT '产品ID',
DescriptionLenght INT COMMENT '产品描述长度',
HeightCm INT COMMENT '产品高度(厘米)',
LengthCm INT COMMENT '产品长度(厘米)',
NameLenght INT COMMENT '产品名称长度',
PhotosQty INT COMMENT '产品图片数量',
WeightG INT COMMENT '产品重量(克)',
WidthCm INT COMMENT '产品宽度(厘米)'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
tblproperties("skip.header.line.count"="1");
7.创建地理位置及代码表StateCode,该表为外部表,指向目录/opt/datas/data/下的“Sheet1.csv”,表字段有Code和Name,且均为STRING类型
-- 7.创建地理位置及代码表`StateCode`
CREATE EXTERNAL TABLE IF NOT EXISTS StateCode (
Code STRING COMMENT '州代码',
Name STRING COMMENT '州名称'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
tblproperties("skip.header.line.count"="1");
8.创建订单表Orders,该表为外部表,指向HDFS目录中的/hive/orders,表字段有CustomerId、ApprovedAt、CarrierDate、CustomerDate、DeliveryDate、OrderId、PurchaseTimestamp、Status、Deliverydatetime,其中Deliverydatetime为INT类型,其余均为STRING类型
-- 8.创建订单表`Orders`
CREATE EXTERNAL TABLE IF NOT EXISTS Orders (
CustomerId STRING COMMENT '客户ID',
ApprovedAt STRING COMMENT '订单批准时间',
CarrierDate STRING COMMENT '订单交付给物流日期',
CustomerDate STRING COMMENT '订单交付客户日期',
DeliveryDate STRING COMMENT '预计交付日期',
OrderId STRING COMMENT '订单ID',
PurchaseTimestamp STRING COMMENT '订单购买时间戳',
Status STRING COMMENT '订单状态',
Deliverydatetime INT COMMENT '交货天数'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
tblproperties("skip.header.line.count"="1");
9.创建订单详情表OrderItems,该表为外部表,指向HDFS目录中的/hive/orderItems,表字段有OrderId、ItemId、ProductId、ShippingLimitDate、FreightValue、Price,其中OrderId、ProductId、ShippingLimitDate为STRING类型,ItemId为INT类型,其余字段为DOUBLE类型
-- 9.创建订单详情表`OrderItems`
CREATE EXTERNAL TABLE IF NOT EXISTS OrderItems (
OrderId STRING COMMENT '订单ID',
ItemId INT COMMENT '订单项目ID',
ProductId STRING COMMENT '产品ID',
ShippingLimitDate STRING COMMENT '配送截止日期',
FreightValue DOUBLE COMMENT '货运费用',
Price DOUBLE COMMENT '商品价格'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
tblproperties("skip.header.line.count"="1");
10.创建支付类型分析表pay_type_analyse,该表为内部表,表字段有pay_type和total,前者为STRING类型,后者为INT类型
-- 10.创建支付类型分析表`pay_type_analyse`
CREATE TABLE IF NOT EXISTS pay_type_analyse (
pay_type STRING COMMENT '支付类型',
total INT COMMENT '支付总次数'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
11.创建区域销售分析表area_sale_analyse,该表为内部表,表字段有Statename和total,前者为STRING类型,后者为Double类型
-- 11.创建区域销售分析表`area_sale_analyse`
CREATE TABLE IF NOT EXISTS area_sale_analyse (
Statename STRING COMMENT '州名称',
total DOUBLE COMMENT '销售总额'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
Hive数据加载
(7)向Hive数据表中导入数据
将/opt/datas/data目录下的Customers.csv数据导入客户信息表Customers中
LOAD DATA LOCAL INPATH '/opt/datas/data/Customers.csv' INTO TABLE Customers;
将/opt/datas/data目录下的Geodata.csv数据导入地理位置表Geodata中
LOAD DATA LOCAL INPATH '/opt/datas/data/Geodata.csv' INTO TABLE Geodata;
将/opt/datas/data目录下的Payments.csv数据导入支付信息表Payments中
LOAD DATA LOCAL INPATH '/opt/datas/data/Payments.csv' INTO TABLE Payments;
将/opt/datas/data目录下的“Product Name.csv”数据导入商品分类表ProductName中
LOAD DATA LOCAL INPATH '/opt/datas/data/Product Name.csv' INTO TABLE ProductName;
将/opt/datas/data目录下的“Product Reviews.csv”数据导入商品评论表ProductReviews中
LOAD DATA LOCAL INPATH '/opt/datas/data/Product Reviews.csv' INTO TABLE ProductReviews;
将/opt/datas/data目录下的“ProductID.csv”数据导入商品信息表ProductInfo中
LOAD DATA LOCAL INPATH '/opt/datas/data/ProductID.csv' INTO TABLE ProductInfo;
将/opt/datas/data目录下的“Sheet1.csv”数据导入地理位置及代码表StateCode中
LOAD DATA LOCAL INPATH '/opt/datas/data/Sheet1.csv' INTO TABLE StateCode;
将HDFS中/hive/orders目录下的“Orders.csv”数据导入订单表Orders中
LOAD DATA INPATH '/hive/orders/Orders.csv' INTO TABLE Orders;
将HDFS中/hive/orderItems目录下的“order_items.csv”数据导入订单详情表OrderItems中
LOAD DATA INPATH '/hive/orderItems/order_items.csv' INTO TABLE OrderItems;
创建内部表
(8)按照对业务数据进行分析、统计和查询的需求,对hive中的数据进行轻度聚合,创建以下3个内部表
8.1 创建用户详情表Usertb,该表为内部表,表字段有UniqueId、Cid、City、Statename、CodePrefix,其中CodePrefix为INT类型,其余均为STRING类型
-- 12.创建用户详情表Usertb
CREATE TABLE IF NOT EXISTS Usertb (
UniqueId STRING COMMENT '用户唯一ID',
Cid STRING COMMENT '客户ID',
City STRING COMMENT '客户所在城市',
Statename STRING COMMENT '客户所在州名称',
CodePrefix INT COMMENT '邮政编码前缀'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

使用左连接从Customers表和StateCode表中联合查询客户信息,并将这些信息插入到Usertb表中
-- 将客户信息与州代码进行关联,写入用户表 Usertb
INSERT INTO Usertb
SELECT DISTINCT
c.CustomerUniqueId, -- UniqueId:用户唯一标识
c.customerid, -- Cid:客户ID
c.CustomerCity, -- City:所在城市
s.Name, -- Statename:州名称
c.CustomerZipCodePrefix -- CodePrefix:邮政编码前缀
FROM Customers c
LEFT JOIN StateCode s
ON c.CustomerState = s.Code;
8.2 创建订单明细表Details,该表为内部表,表字段有Oid、Uid、OrderTime、PayTime、PayType、OStatus,且均为STRING类型
-- 13. 创建订单明细表Details
CREATE TABLE IF NOT EXISTS Details (
Oid STRING COMMENT '订单ID',
Uid STRING COMMENT '用户ID',
OrderTime STRING COMMENT '订单创建时间',
PayTime STRING COMMENT '支付时间',
PayType STRING COMMENT '支付类型',
OStatus STRING COMMENT '订单状态'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

使用左连接从Orders表和Payments表中联合查询订单明细信息,并将这些信息插入到Details表中
-- 将订单信息与支付信息进行关联,写入订单明细表 Details
INSERT INTO Details
SELECT DISTINCT
o.OrderId, -- Oid:订单编号
o.CustomerId, -- Uid:客户编号
o.PurchaseTimestamp, -- OrderTime:下单时间
o.ApprovedAt, -- PayTime:支付批准时间
p.Type, -- PayType:支付类型(如借记卡、信用卡)
o.Status -- OStatus:订单状态(如已付款、待发货)
FROM Orders o
LEFT JOIN Payments p
ON o.OrderId = p.Orderid;
8.3 创建商品明细表ProductList,该表为内部表,表字段有Pid、Oid、PName、Price、PType,且均为STRING类型
-- 14.创建商品明细表ProductList
CREATE TABLE IF NOT EXISTS ProductList (
Pid STRING COMMENT '产品ID',
Oid STRING COMMENT '订单ID',
PName STRING COMMENT '产品类别名称',
Price STRING COMMENT '产品价格',
PType STRING COMMENT '产品类型'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

使用左连接从ProductInfo表、OrderItems表和ProductName表中联合查询商品明细信息,并将这些信息插入到ProductList表中
-- 将商品信息、订单详情、商品英文名称关联后写入商品明细表 ProductList
INSERT INTO ProductList
SELECT DISTINCT
p.ProductID, -- Pid:产品ID
o.OrderId, -- Oid:订单ID
p.CategoryName, -- PName:产品类别名称(来自商品信息表)
o.Price, -- Price:商品价格(来自订单详情表)
n.NameEnglish -- PType:产品类别英文名(来自商品名称表)
FROM ProductInfo p
LEFT JOIN OrderItems o
ON p.ProductID = o.ProductId
LEFT JOIN ProductName n
ON p.CategoryName = n.Name;
2.Hbase存储数据
启动环境
# 在三个节点都启动Zookeeper
zkServer.sh start
# 在master启动HBase服务
start-hbase.sh
# 在master启动Hbase
hbase shell
如启动Hbase报错:slave1: /opt/apps/hbase/bin/hbase-daemon.sh:行195: /opt/apps/hbase/bin/../logs/hbase-root-regionserver-slave1.out: 结构需要清理 slave1: head: 无法打开"/opt/apps/hbase/bin/../logs/hbase-root-regionserver-slave1.out" 读取数据: 没有那个文件或目录
解决方法:删除日志信息
rm -rf /opt/apps/hbase/logs/*
创建数据表
(1)在Hbase中查看命名空间(需提前启动Zookeeper环境及Hbase环境)
# 创建命名空间,在此案例中省略,使用默认的空间
# create_namespace 'dataanalysis'
# 列出所有命名空间【理解为就是数据库名称】
list_namespace
# 如果表存在先禁用并删除,没有则跳过
disable 'OrderItemsHB'
drop 'OrderItemsHB'
disable 'OrdersHB'
drop 'OrdersHB'
disable 'ProductInfoHB'
drop 'ProductInfoHB'
(2)在Hbase中创建对应数据表
创建订单详情表OrderItemsHB,其中包含两个列族:orderinfo和goodsinfo
# 创建表
create 'OrderItemsHB', 'orderinfo', 'goodsinfo'
# 查看表
list
创建订单表OrdersHB,其中包含两个列族:orderinfo和timeinfo
# 创建 'OrdersHB'表,两个列族:orderinfo和timeinfo
create 'OrdersHB', 'orderinfo', 'timeinfo'
创建商品信息表ProductInfoHB,其中包含列族:productinfo
# 创建商品信息表ProductInfoHB
create 'ProductInfoHB', 'productinfo'
查看所有创建的表
list

数据存入Hbase
(4)从Hive中读取OrderItems表的数据,并将其存入Hbase的OrderItemsHB表格中
实验任务1:
基于 Spark 的 Hive 表订单明细数据 OrderItems 写入 HBase 的OrderItemsHB表实践
实验任务描述:
本实验以订单明细表 OrderItems 为基础,实践使用 Spark 从 Hive 中读取结构化数据,完成空值处理、去重、时间格式转换等数据清洗操作后,将结果写入 HBase 表 OrderItemsHB 中。通过该实验,帮助学生理解 Spark 与 Hive、HBase 的集成机制,并掌握使用 Spark 实现 Hive → HBase 的数据处理与落地流程。
实验步骤概览:
创建 SparkSession 并启用 Hive 支持
读取 Hive 中的
OrderItems表数据对订单明细数据进行空值删除、去重与格式标准化
将处理后的数据写入 HBase 表
OrderItemsHB释放资源,关闭 Spark 应用
实验要求:
1. 环境准备
确保以下组件已正确安装并配置:
Apache Spark(推荐 3.x 版本)
Apache Hive(支持读取表数据)
Apache HBase(含 ZooKeeper)
Hive 中已存在
OrderItems表,包含以下字段:OrderId(订单编号)ItemId(订单项编号)ProductId(商品编号)FreightValue(运费)Price(商品价格)ShippingLimitDate(配送最晚时间)HBase 中预建表
OrderItemsHB,包含以下两个列族:orderinfo:订单基本信息(订单 ID、订单项 ID)goodsinfo:商品与费用信息(商品 ID、价格、运费、配送日期)
2. 编程实现要求
数据读取与清洗:
使用 Spark 读取 Hive 表
OrderItems;删除包含空值(null)的行;
对字段
OrderId+ItemId+ProductId组合去重,避免重复写入;将
ShippingLimitDate转换为标准日期格式yyyy-MM-dd,新列命名为date;使用Spark 中用于为每行生成全局唯一且递增编号的函数
monotonically_increasing_id(),每一行生成唯一 rowkey,作为 HBase rowkey 使用;最终只保留字段:
OrderId、ItemId、ProductId、FreightValue、Price、date、id
数据写入 HBase:
每条记录封装为 HBase 的
Put对象;分别写入以下列族:
Hive 字段名 HBase 列族 HBase 列名 字段说明 OrderIdorderinfoorderid订单编号 ItemIdorderinfoitemid订单项编号 ProductIdgoodsinfoproductid商品编号 FreightValuegoodsinfofreightvalue运费金额 Pricegoodsinfoprice商品单价 ShippingLimitDategoodsinfodate发货时间(已格式化) 使用
foreachPartition把数据分区处理方式批量写入,提高效率;每个分区结束后输出写入日志,方便调试与验证。
能力提升目标:
通过本实验,学生将能够:
掌握 Spark 与 Hive 的读取接口与 DataFrame 操作;
理解订单明细数据的清洗要点与字段标准化处理;
熟练使用 Spark 将处理后的数据写入 HBase;
理解分布式数据导入流程的性能优化策略(如分区并发、批处理);
增强对 HBase 数据建模与表结构设计的认识与实践能力。
完整代码:
package org.example
// ==========================
// 引入所需的库
// ==========================
// Spark 相关类
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// HBase 相关类
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
// Scala 与 Java 集合互操作(HBase 使用 Java 写的,因此需要转类型)
import scala.collection.JavaConverters._
object OrderItemsToHBase {
def main(args: Array[String]): Unit = {
// ========== 第 1 步:创建 SparkSession ==========
// SparkSession 是使用 Spark 的入口,同时支持 Hive 查询功能
val spark = SparkSession.builder()
.appName("SimpleOrderItemsToHBase") // 应用名称,用于日志中显示
.master("local[*]") // 使用本地所有 CPU 核心执行,适合开发测试
.enableHiveSupport() // 启用 Hive 支持,可以读取 Hive 表
.getOrCreate()
println("1. SparkSession 创建完成,Hive 功能已启用")
// ========== 第 2 步:从 Hive 表中读取数据 ==========
val hiveTable = "OrderItems" // Hive 中的目标表名
val df = spark.table(hiveTable) // 通过 Spark SQL 接口读取表内容为 DataFrame
println(s"2. 已从 Hive 表 [$hiveTable] 中读取到 ${df.count()} 条数据")
df.show(5, truncate = false) // 显示前 5 条记录(不截断列内容)
// ========== 第 3 步:数据清洗与转换 ==========
// 目的:清理空值、去重、统一时间格式、生成唯一 ID,方便后续写入 HBase
val cleanedDf = df
.na.drop() // 删除包含 null 的行,空数据没法写入 HBase
.dropDuplicates("OrderId", "ItemId", "ProductId") // 去除重复行,避免重复写入 HBase
.withColumn("date", date_format(
to_timestamp(col("ShippingLimitDate"), "dd/MM/yyyy HH:mm:ss"), "yyyy-MM-dd"
)) // 将 ShippingLimitDate 转换为标准格式 "yyyy-MM-dd",并生成新列 date
.withColumn("id", monotonically_increasing_id()) // 添加唯一 ID 作为 rowkey(long 类型自动递增)
.select("OrderId", "ItemId", "ProductId", "FreightValue", "Price", "date", "id") // 只保留需要写入 HBase 的字段
// Spark SQL(包括 DataFrame API)中,字段名默认是不区分大小写的
println(s"3. 数据清洗完成,剩余数据行数为:${cleanedDf.count()}")
cleanedDf.show(5, truncate = false)
// ========== 第 4 步:将清洗后的数据写入 HBase 表 ==========
// 表名为 OrderItemsHB,假设已提前创建好
println("4. 开始写入数据到 HBase 表 [OrderItemsHB]...")
// 将 DataFrame 转换为 RDD,并按分区处理(提高并发效率)
// Spark把数据分区,并按分区批量处理每组数据
// 相比逐条处理,foreachPartition 能显著减少连接次数,提升性能。
cleanedDf.rdd.foreachPartition { partition =>
// (1)创建 HBase 配置对象,设置 ZooKeeper 地址和端口
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2") // ZooKeeper 主机地址(根据你的集群配置修改)
conf.set("hbase.zookeeper.property.clientPort", "2181") // 默认端口号
// (2)建立与 HBase 的连接
val connection = ConnectionFactory.createConnection(conf)
// (3)获取 HBase 中的目标表对象
val table = connection.getTable(TableName.valueOf("OrderItemsHB"))
// (4)遍历每一行数据,封装为 Put 对象,用于写入 HBase
val puts = partition.map { row =>
// 构造 rowkey,例如:"row0", "row1"...
val rowKey = s"row${row.getAs[Long]("id")}" // 使用唯一 id 构造 rowkey,格式:row1
// 创建 HBase 中一行数据的 Put 对象,rowkey 为 "row+唯一ID"
// 作用:把这一行数据准备好,准备存进 HBase,行名叫 "row加上编号"
val put = new Put(Bytes.toBytes(rowKey)) // 创建 Put 对象(HBase 的一行数据)
// === 写入 orderinfo 列族字段 ===
put.addColumn(
Bytes.toBytes("orderinfo"), // 列族名:orderinfo,用于存储订单基本信息
Bytes.toBytes("orderid"), // 列名:orderid,表示订单编号
Bytes.toBytes(row.getAs[String]("OrderId"))// 列值:从当前行获取 OrderId 字段的值(类型是 String)
)
put.addColumn(
Bytes.toBytes("orderinfo"), // 列族名:orderinfo
Bytes.toBytes("itemid"), // 列名:itemid,表示订单项编号
Bytes.toBytes(row.getAs[Int]("ItemId").toString) // 列值:将 ItemId(Int 类型)转换为字符串再写入
)
// === 写入 goodsinfo 列族字段 ===
put.addColumn(
Bytes.toBytes("goodsinfo"), // 列族名:goodsinfo,用于存储商品信息
Bytes.toBytes("productid"), // 列名:productid,表示商品编号
Bytes.toBytes(row.getAs[String]("ProductId")) // 列值:获取当前行的 ProductId 字段值(String 类型)
)
put.addColumn(
Bytes.toBytes("goodsinfo"), // 列族名:goodsinfo
Bytes.toBytes("freightvalue"), // 列名:freightvalue,表示运费金额
Bytes.toBytes(row.getAs[Double]("FreightValue").toString) // 列值:将 FreightValue(Double)转为字符串写入
)
put.addColumn(
Bytes.toBytes("goodsinfo"), // 列族名:goodsinfo
Bytes.toBytes("price"), // 列名:price,表示商品价格
Bytes.toBytes(row.getAs[Double]("Price").toString) // 列值:将 Price(Double)转为字符串写入
)
put.addColumn(
Bytes.toBytes("goodsinfo"), // 列族名:goodsinfo
Bytes.toBytes("date"), // 列名:date,表示发货时间
Bytes.toBytes(row.getAs[String]("date")) // 列值:从当前行获取格式化后的 date 字段值
)
put // 返回 Put 对象
}.toList // 把迭代器转换为 List,方便批量写入
// (5)批量写入当前分区数据到 HBase(更高效)
if (puts.nonEmpty) {
table.put(puts.asJava) // // 把 Scala List 转成 Java List 后 批量写入数据到 HBase的表
println(s"当前分区写入了 ${puts.size} 条数据到 HBase")
}
// (6)写入完成后,关闭 HBase 表和连接,释放资源
table.close()
connection.close()
}
// ========== 第 5 步:关闭 Spark 应用 ==========
println("5. 所有数据成功写入 HBase,准备关闭 Spark...")
spark.stop()
println("6. 程序执行完毕。")
}
}
在Hbase中检查结果
# 从 OrderItemsHB 表中扫描 1 条数据
hbase(main):002:0> scan 'OrderItemsHB', {LIMIT => 1}
# 运行结果
ROW COLUMN+CELL
row0 column=goodsinfo:date, timestamp=1742657716994, value=2017-04-06
row0 column=goodsinfo:date_only, timestamp=1742656783446, value=2017-04-06
row0 column=goodsinfo:freightvalue, timestamp=1742657716994, value=24.05
row0 column=goodsinfo:price, timestamp=1742657716994, value=109.9
row0 column=goodsinfo:productid, timestamp=1742657716994, value=f71973c922ccaab05514a36a8bc741b8
row0 column=orderinfo:itemid, timestamp=1742657716994, value=1
row0 column=orderinfo:orderid, timestamp=1742657716994, value=019b02e139cfcd97f93366f7a38f5695

# 如果重新运行上面的代码,会向表中插入重复数据,
# 所以要重复运行命令,先清除Hbase表中的数据
# 在Hbase Shell中运行下面的命令
# 1.如果只是想清空表里的数据,但保留表结构【建议此操作】
truncate 'OrderItemsHB'
# 2.如果是删除Hbase的表,【不推荐此方法,删除表后还要创建表】
disable 'OrderItemsHB' # 第一步:先禁用表,防止正在使用
drop 'OrderItemsHB' # 第二步:删除表(永久删除)
打包项目,提交任务到 Spark on YARN 集群中运行
方法一:
# 在master节点运行,通过 spark-submit 提交一个任务到 Spark on YARN 集群中运行
# 1.选择spark
cd /opt/apps/spark
./bin/spark-submit \
--master yarn \
--deploy-mode client \
--class org.example.OrderItemsToHBase \
--conf spark.hadoop.hive.metastore.uris=thrift://master:9083 \
spark_task-1.0-jar-with-dependencies.jar
# 说明:
# --master yarn # 将任务提交到 YARN 集群上执行
# --deploy-mode client # 以 client 模式运行,Driver在提交命令的机器上
# --class org.example.OrderItemsToHBase # 指定包含 main 方法的主类
# --conf spark.hadoop.hive.metastore.uris=thrift://master:9083 # 显式指定 Hive Metastore 的远程访问地址
# spark_task-1.0-jar-with-dependencies.jar # 提交打好的 fat-jar(含所有依赖的 jar 包)
# 肥包(fat-jar):把业务代码 + 所有依赖(包括 hbase-client)都打包到了一起,这样运行时就能找到
# org.apache.hadoop.hbase.client.Put 类了。(否则报错)
基于 Spark 的 Hive 表订单数据Orders表写入 HBase的表OrdersHB 实践
实验任务描述:
本实验通过一个完整的订单数据处理案例,学习使用 Spark 从 Hive 表中读取订单数据,进行空值过滤、字段格式化、时间标准化等清洗操作,并将结果写入 HBase 表中。重点理解 Spark 与 Hive、HBase 的数据联通机制,掌握在实际大数据项目中如何实现 Hive → Spark → HBase 的数据流转。
实验步骤概览:
创建 SparkSession 并启用 Hive 支持
读取 Hive 中的
Orders表数据对订单数据进行清洗、去重与格式转换
将清洗后的数据写入 HBase 表
OrdersHB释放资源,关闭 Spark
实验要求:
1. 环境准备
正确安装并配置以下组件:
Apache Spark(推荐 3.x 版本)
Apache Hive(并启动 Metastore 服务)
Apache HBase(含 ZooKeeper)
Hive 中已存在名为
Orders的订单明细表,并预加载测试数据;HBase 中创建目标表
OrdersHB,包含以下两个列族:orderinfo:用于存储订单基本信息(如客户ID、订单ID、状态、配送时间等)timeinfo:用于存储订单各类时间字段(审核时间、发货时间等)
2. 编程实现要求
使用 Spark Scala 编写完整应用程序,实现以下功能:
数据读取与清洗:
从 Hive 表
Orders中读取数据;自动过滤空字符串、全空格或 null 的无效数据;
按
CustomerId去重,避免重复写入;将时间字段(如
ApprovedAt、CarrierDate、CustomerDate、PurchaseTimestamp等)统一格式为yyyy-MM-dd;使用
monotonically_increasing_id()为每行数据生成唯一 rowkey(用于 HBase 主键);
数据写入 HBase:
将每条数据封装为
Put对象;Hive 表 Orders 与 HBase 表 OrdersHB 字段映射关系
Hive 字段名 HBase 列族 HBase 列名 字段说明 CustomerIdorderinfocustomerid客户 ID OrderIdorderinfoorderid订单 ID Statusorderinfostatus订单状态 DeliveryDatetimeorderinfodeliverydatetime发货时效(单位:小时) ApprovedAtdatetimeinfoApprovedAtdate审核通过日期 CarDatetimeinfoCarDate安排发车时间 CusDatetimeinfoCusDate客户要求送达的日期 PurchasetdatetimeinfoPurchasetdate下单时间(格式化) deliverydatedatetimeinfodeliverydatedate计划送达日期(格式化) 使用
foreachPartition分区写入方式 + 批量提交提升写入效率;打印每个分区写入条数日志,便于调试和验证;
能力提升目标:
通过本实验,学生将能:
掌握 Spark + Hive 的数据读取流程与 DataFrame 使用;
学会使用 Spark SQL 进行数据清洗与格式化处理;
理解 Spark 程序中如何对接 HBase,完成高效批量写入;
提高对实际大数据业务数据清洗和落地写库的实战能力;
初步了解企业级数据中转场景中的数据格式标准化与存储结构设计。
完整代码:
package org.example
// ==========================
// 引入所需的库
// ==========================
// Spark 相关类
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// HBase 相关类
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
// Scala 与 Java 集合互操作(HBase 是 Java 写的)
import scala.collection.JavaConverters._
object OrdersToHbase {
def main(args: Array[String]): Unit = {
// ========== 第 1 步:创建 SparkSession ==========
println("1. 创建 SparkSession,用于读取 Hive 表...")
val spark = SparkSession.builder()
.appName("OrdersToHbase") // 应用名称
.master("local[*]") // 本地所有核,适合测试开发
.enableHiveSupport() // 启用 Hive 支持
.getOrCreate()
// ========== 第 2 步:读取 Hive 表 Orders ==========
val hiveTable = "Orders"
println(s"2. 从 Hive 表 [$hiveTable] 读取数据...")
val df = spark.table(hiveTable)
println(s"原始数据总行数: ${df.count()}")
println("原始数据前5行:")
df.show(5, truncate = false)
// ========== 第 3 步:数据清洗与转换 ==========
println("3. 对数据进行清洗和转换...")
// 3.1 删除空值(字符串列去空格 + 空字符串 + null)
// 目的:清洗数据,把字段为空、空字符串、全是空格的行全部过滤掉,留下干净的数据。
val filterCondition = df.schema.fields.map { field =>
val colName = field.name // 获取列名,比如 "name"
val dataType = field.dataType.typeName // 获取列的类型,比如 "string" 或 "int"
if (dataType == "string")
s"TRIM($colName) <> '' AND $colName IS NOT NULL" // 如果是字符串,要去空格再判断
else
s"$colName IS NOT NULL" // 如果是数字或别的,只判断非空
}.mkString(" AND ") // 所有条件拼接成一整串 SQL 条件
val filteredDf = df.filter(filterCondition)
// 3.2 根据 CustomerId 去重(避免重复写入)
val dedupDf = filteredDf.dropDuplicates("CustomerId")
// 3.3 格式转换(时间列转为 yyyy-MM-dd)
val timestampFormat = "dd/MM/yyyy HH:mm:ss" // Hive 原始时间格式
val dateFormat = "yyyy-MM-dd" // 输出格式
val dateFormat2 = "dd/MM/yyyy" // deliverydate 原格式
// 对去重后的订单数据(dedupDf)进行时间格式转换和字段处理,生成新的 DataFrame
val transformedDf = dedupDf
// 1、转换 ApprovedAt 字段为日期格式,命名为 ApprovedAtdate
.withColumn("ApprovedAtdate",
date_format(
to_timestamp(col("ApprovedAt"), timestampFormat), // 将字符串转为时间戳格式
dateFormat // 再格式化为 yyyy-MM-dd 字符串
)
)
// 2、转换 CarrierDate 字段为日期格式,命名为 CarDate(发货日期)
.withColumn("CarDate",
date_format(
to_timestamp(col("CarrierDate"), timestampFormat), // 将字符串转为时间戳格式
dateFormat // 再格式化为 yyyy-MM-dd 字符串
)
)
// 3、转换 CustomerDate 字段为日期格式,命名为 CusDate(客户下单日期)
.withColumn("CusDate",
date_format(
to_timestamp(col("CustomerDate"), timestampFormat), // 将字符串转为时间戳格式
dateFormat // 再格式化为 yyyy-MM-dd 字符串
)
)
// 4、转换 PurchaseTimestamp 字段为日期格式,命名为 Purchasetdate(下单时间)
.withColumn("Purchasetdate",
date_format(
to_timestamp(col("PurchaseTimestamp"), timestampFormat), // 将字符串转为时间戳格式
dateFormat // 再格式化为 yyyy-MM-dd 字符串
)
)
// 5、直接将 DeliveryDate 转换为日期类型(它原本就是日期字符串),命名为 deliverydatedate
.withColumn("deliverydatedate",
to_date(col("DeliveryDate"), dateFormat2) // 将“只包含日期的字符串”转换为日期类型
)
// 6、添加唯一编号列 id,作为主键 rowkey 使用(自动递增的 long 类型)
.withColumn("id", monotonically_increasing_id()) // 添加id列,为自增长数字
// 7、选择最终要保留的字段,生成最终清洗结果
.select(
"CustomerId", // 客户ID
"OrderId", // 订单ID
"Status", // 订单状态
"DeliveryDatetime", // 实际交付时间(精确到时分秒)
"ApprovedAtdate", // 审核日期(yyyy-MM-dd)
"CarDate", // 发货日期
"CusDate", // 客户下单日期
"Purchasetdate", // 下单时间(格式化)
"deliverydatedate", // 配送完成日期
"id" // 唯一主键,用于写入 HBase 的 rowkey
)
println(s"清洗后数据总行数: ${transformedDf.count()}")
println("清洗后数据前5行:")
transformedDf.show(5, truncate = false)
// ========== 第 4 步:将数据写入 HBase 表 OrdersHB ==========
println("4. 开始将数据写入 HBase 表 [OrdersHB] ...")
transformedDf.rdd.foreachPartition { partition =>
// 4.1 创建 HBase 配置对象
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2") // ZooKeeper 地址
conf.set("hbase.zookeeper.property.clientPort", "2181") // 默认端口
// 4.2 创建 HBase 连接
val connection = ConnectionFactory.createConnection(conf)
val table = connection.getTable(TableName.valueOf("OrdersHB")) // 目标表名
// 4.3 遍历每一行,封装成 Put 写入对象
val puts = partition.map { row =>
// 创建 HBase 中一行数据的 Put 对象,rowkey 为 "row+唯一ID"
// 作用:把这一行数据准备好,准备存进 HBase,行名叫 "row加上编号"
val put = new Put(Bytes.toBytes(s"row${row.getAs[Long]("id")}"))
// === 写入 orderinfo 列族 ===
// 把当前这行数据里的 CustomerId 字段写入到 HBase 的 orderinfo 列族中,列名是 customerid
put.addColumn(
Bytes.toBytes("orderinfo"), // 列族名:orderinfo(表示订单相关信息)
Bytes.toBytes("customerid"), // 列名:customerid
Bytes.toBytes(row.getAs[String]("CustomerId")) // 列值:从这一行取出 CustomerId 字段的值(字符串类型)
)
// 把当前这行数据里的 OrderId 字段写入到 HBase 的 orderinfo 列族中,列名是 orderid
put.addColumn(
Bytes.toBytes("orderinfo"), // 列族名:orderinfo(表示订单相关的信息)
Bytes.toBytes("orderid"), // 列名:orderid
Bytes.toBytes(row.getAs[String]("OrderId")) // 列值:从这一行取出 OrderId 字段的值(字符串类型)
)
// 把当前这行数据里的 Status 字段写入到 HBase 的 orderinfo 列族中,列名是 status
put.addColumn(
Bytes.toBytes("orderinfo"), // 列族名:orderinfo
Bytes.toBytes("status"), // 列名:status(订单状态)
Bytes.toBytes(row.getAs[String]("Status")) // 列值:取出 Status 字段的值(字符串类型)
)
// 把当前这行数据里的 DeliveryDatetime 字段写入到 HBase 的 orderinfo 列族中,列名是 deliverydatetime
put.addColumn(
Bytes.toBytes("orderinfo"), // 列族名:orderinfo
Bytes.toBytes("deliverydatetime"), // 列名:deliverydatetime(发货时效,单位可能是小时)
Bytes.toBytes(row.getAs[Int]("DeliveryDatetime").toString) // 列值:取出 DeliveryDatetime(整数类型),转成字符串再写入
)
// === 写入 timeinfo 列族 ===
// 把当前这行数据里的 ApprovedAtdate 字段写入到 HBase 的 timeinfo 列族中,列名是 ApprovedAtdate
put.addColumn(
Bytes.toBytes("timeinfo"), // 列族名:timeinfo(表示与时间相关的信息)
Bytes.toBytes("ApprovedAtdate"), // 列名:ApprovedAtdate(订单审核通过的时间)
Bytes.toBytes(row.getAs[String]("ApprovedAtdate")) // 列值:取出 ApprovedAtdate 字段的值(字符串类型)
)
// 把当前这行数据里的 CarDate 字段写入到 HBase 的 timeinfo 列族中,列名是 CarDate
put.addColumn(
Bytes.toBytes("timeinfo"), // 列族名:timeinfo
Bytes.toBytes("CarDate"), // 列名:CarDate(安排发车时间)
Bytes.toBytes(row.getAs[String]("CarDate")) // 列值:取出 CarDate 字段的值(字符串类型)
)
// 把当前这行数据里的 CusDate 字段写入到 HBase 的 timeinfo 列族中,列名是 CusDate
put.addColumn(
Bytes.toBytes("timeinfo"), // 列族名:timeinfo
Bytes.toBytes("CusDate"), // 列名:CusDate(客户要求送达的日期)
Bytes.toBytes(row.getAs[String]("CusDate")) // 列值:取出 CusDate 字段的值(字符串类型)
)
// 把当前这行数据里的 Purchasetdate 字段写入到 HBase 的 timeinfo 列族中,列名是 Purchasetdate
put.addColumn(
Bytes.toBytes("timeinfo"), // 列族名:timeinfo
Bytes.toBytes("Purchasetdate"), // 列名:Purchasetdate(下单日期)
Bytes.toBytes(row.getAs[String]("Purchasetdate")) // 列值:取出 Purchasetdate 字段的值(字符串类型)
)
// 把当前这行数据里的 deliverydatedate 字段写入到 HBase 的 timeinfo 列族中,列名是 deliverydatedate
put.addColumn(
Bytes.toBytes("timeinfo"), // 列族名:timeinfo
Bytes.toBytes("deliverydatedate"), // 列名:deliverydatedate(计划送达日期)
Bytes.toBytes(row.getAs[java.sql.Date]("deliverydatedate").toString) // 列值:将 deliverydatedate 字段(Date 类型)转换为字符串写入
)
put // 返回 Put 对象
}.toList
// 4.4 批量写入 HBase
if (puts.nonEmpty) {
table.put(puts.asJava) // 把 Scala List 转成 Java List 后 批量写入数据到 HBase的表
println(s"当前分区写入了 ${puts.size} 条数据到 HBase。")
}
// 4.5 关闭 HBase 表和连接
table.close()
connection.close()
}
// ========== 第 5 步:关闭 Spark ==========
println("5. 所有数据已成功写入 HBase,关闭 SparkSession。")
spark.stop()
println("6. 程序执行完毕。")
}
}
在Hbase中检查结果 :

hbase(main):002:0> scan 'OrdersHB', {LIMIT => 1}
ROW COLUMN+CELL
row0 column=orderinfo:customerid, timestamp=1742658987312, value=01d190d14b00073f76e0a5ec46166352
row0 column=orderinfo:deliverydatetime, timestamp=1742658987312, value=12
row0 column=orderinfo:orderid, timestamp=1742658987312, value=975a0290ce9c9359576f69b6e00799cf
row0 column=orderinfo:status, timestamp=1742658987312, value=delivered
row0 column=timeinfo:ApprovedAtdate, timestamp=1742658987312, value=2017-11-29
row0 column=timeinfo:CarDate, timestamp=1742658987312, value=2017-12-05
row0 column=timeinfo:CusDate, timestamp=1742658987312, value=2017-12-11
row0 column=timeinfo:Purchasetdate, timestamp=1742658987312, value=2017-11-29
row0 column=timeinfo:deliverydatedate, timestamp=1742658987312, value=2017-12-19
🧪实验任务3:
基于 Spark 的 Hive 表订单数据ProductInfo表写入 HBase的表ProductInfoHB实践
🎯实验任务描述:
本实验通过一个商品信息数据处理案例,引导学生使用 Spark 从 Hive 表中读取商品数据,进行空值过滤、字段统一、结构整理等清洗操作,并将结果写入 HBase 表中。重点掌握 Spark 与 Hive、HBase 的集成方式,理解如何将结构化数据通过清洗后落地至 HBase 的指定列族,完成标准化存储,适配后续的查询或建模分析任务。
📌实验步骤概览:
创建 SparkSession 并启用 Hive 支持
读取 Hive 中的
ProductInfo表数据对商品数据进行清洗、去重与添加主键
将清洗后的数据写入 HBase 表
ProductInfoHB释放资源,关闭 Spark
✅实验要求:
1. 环境准备
正确安装并配置以下组件:
Apache Spark(建议使用 3.x 版本)
Apache Hive(启用 Hive Metastore)
Apache HBase(包括 ZooKeeper)
Hive 中提前创建好名为
ProductInfo的商品信息表,并导入数据;HBase 中创建目标表
ProductInfoHB,包含如下列族:productinfo:用于存储商品的各项属性(如类别、尺寸、重量等);
2. 编程实现要求
使用 Spark Scala 编写完整程序,满足以下功能:
✅ 数据读取与清洗:
从 Hive 表
ProductInfo中读取商品信息;自动过滤空值,包括空字符串、空格和 null;
按
ProductID去重,避免重复写入;使用
monotonically_increasing_id()为每条记录生成唯一主键,用作 HBase 中的 rowkey;最终保留字段包括:
CategoryName、ProductID、descriptionlength(注:原拼写为descriptionlenght)HeightCm、LengthCm、namelength(注:原拼写为namelenght)PhotosQty、WeightG、WidthCm
✅ 数据写入 HBase:
将每行记录转换为
Put对象,并指定写入目标列族;Hive 表 ProductInfo 与 HBase 表 ProductInfoHB 字段映射关系:
Hive 字段名 HBase 列族 HBase 列名 字段说明 CategoryNameproductinfocategoryname商品分类名 ProductIDproductinfoproductid商品ID descriptionlenghtproductinfodescriptionlenght商品描述文本长度(字符数) HeightCmproductinfoheightcm商品高度(厘米) LengthCmproductinfolengthcm商品长度(厘米) namelenghtproductinfonamelength商品名称长度 PhotosQtyproductinfophotosqty商品图片数量 WeightGproductinfoweightg商品重量(克) WidthCmproductinfowidthcm商品宽度(厘米) 使用
foreachPartition分区遍历方式,提高写入并发效率;使用批量写入(
table.put(puts))提升性能;打印当前分区写入记录数,方便调试和验证;
🧠能力提升目标:
通过本实验,学生将能够:
掌握 Spark 与 Hive 表的集成读取方式;
熟悉 Spark SQL 进行数据清洗、去重、字段统一处理的方法;
掌握 Spark 与 HBase 集成写入的基本模式;
学会使用
foreachPartition和Put实现批量数据写入;提升数据标准化存储能力,为后续数据查询、可视化或建模打下基础。
完整的代码:
package org.example
// ========== 引入所需库 ==========
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.JavaConverters._
object ProductInfoToHbase {
def main(args: Array[String]): Unit = {
// ========== 第 1 步:创建 SparkSession ==========
println("1. 创建 SparkSession,用于读取 Hive 表...")
val spark = SparkSession.builder()
.appName("ProductInfoToHbase") // 应用名称
.master("local[*]") // 本地运行模式,适合开发测试
.enableHiveSupport() // 启用 Hive 支持
.getOrCreate()
// ========== 第 2 步:读取 Hive 表 ProductInfo ==========
val hiveTable = "ProductInfo"
println(s"2. 从 Hive 表 [$hiveTable] 读取数据...")
val df = spark.table(hiveTable)
println(s"原始数据总行数: ${df.count()}")
println("原始数据前5行:")
df.show(5, truncate = false)
// ========== 第 3 步:数据清洗与转换 ==========
println("3. 对数据进行清洗和转换...")
// 3.1 构建过滤条件:删除空值(字符串列要去除空格后判断)
val filterCondition = df.schema.fields.map { field =>
val colName = field.name
val dataType = field.dataType.typeName
if (dataType == "string")
s"TRIM($colName) <> '' AND $colName IS NOT NULL" // 字符串列:不能是空、不能是null
else
s"$colName IS NOT NULL" // 非字符串列:不能是null
}.mkString(" AND ") // 所有条件用 AND 拼接
// 3.2 执行过滤 + 去重 + 添加唯一 rowkey(id)
val cleanedDf = df.filter(filterCondition)
.dropDuplicates("ProductID") // 去重,避免重复写入
.withColumn("id", monotonically_increasing_id()) // 添加唯一编号列作为 HBase rowkey
.select(
"CategoryName", "ProductID", "descriptionlenght",
"HeightCm", "LengthCm", "namelenght",
"PhotosQty", "WeightG", "WidthCm", "id"
)
println(s"清洗后数据总行数: ${cleanedDf.count()}")
println("清洗后数据前5行:")
cleanedDf.show(5, truncate = false)
// ========== 第 4 步:写入 HBase 表 ProductInfoHB ==========
println("4. 开始将数据写入 HBase 表 [ProductInfoHB] ...")
// 对每个分区数据批量写入,提高效率
cleanedDf.rdd.foreachPartition { partition =>
// 4.1 创建 HBase 配置对象(指定 ZooKeeper 地址)
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2")
conf.set("hbase.zookeeper.property.clientPort", "2181")
// 4.2 获取连接与表对象
val connection = ConnectionFactory.createConnection(conf)
val table = connection.getTable(TableName.valueOf("ProductInfoHB"))
// 4.3 遍历每一行,封装 Put 对象(即每条数据)
val puts = partition.map { row =>
// 构造 rowkey,如 row0、row1...
val put = new Put(Bytes.toBytes(s"row${row.getAs[Long]("id")}"))
// ==== 写入 productinfo 列族字段 ====
put.addColumn(
Bytes.toBytes("productinfo"),
Bytes.toBytes("categoryname"),
Bytes.toBytes(row.getAs[String]("CategoryName"))
)
put.addColumn(
Bytes.toBytes("productinfo"),
Bytes.toBytes("productid"),
Bytes.toBytes(row.getAs[String]("ProductID"))
)
put.addColumn(
Bytes.toBytes("productinfo"),
Bytes.toBytes("descriptionlenght"),
Bytes.toBytes(row.getAs[Int]("descriptionlenght").toString)
)
put.addColumn(
Bytes.toBytes("productinfo"),
Bytes.toBytes("heightcm"),
Bytes.toBytes(row.getAs[Int]("HeightCm").toString)
)
put.addColumn(
Bytes.toBytes("productinfo"),
Bytes.toBytes("lengthcm"),
Bytes.toBytes(row.getAs[Int]("LengthCm").toString)
)
put.addColumn(
Bytes.toBytes("productinfo"),
Bytes.toBytes("namelength"),
Bytes.toBytes(row.getAs[Int]("namelenght").toString)
)
put.addColumn(
Bytes.toBytes("productinfo"),
Bytes.toBytes("photosqty"),
Bytes.toBytes(row.getAs[Int]("PhotosQty").toString)
)
put.addColumn(
Bytes.toBytes("productinfo"),
Bytes.toBytes("weightg"),
Bytes.toBytes(row.getAs[Int]("WeightG").toString)
)
put.addColumn(
Bytes.toBytes("productinfo"),
Bytes.toBytes("widthcm"),
Bytes.toBytes(row.getAs[Int]("WidthCm").toString)
)
put // 返回 Put 对象
}.toList
// 4.4 批量写入 HBase 表
if (puts.nonEmpty) {
table.put(puts.asJava)
println(s"当前分区写入了 ${puts.size} 条数据到 HBase。")
}
// 4.5 关闭资源
table.close()
connection.close()
}
// ========== 第 5 步:关闭 Spark ==========
println("5. 所有数据已成功写入 HBase,关闭 SparkSession。")
spark.stop()
println("6. 程序执行完毕。")
}
}
在Hbase中检查结果 :

hbase(main):025:0> scan 'ProductInfoHB', {LIMIT => 1}
ROW COLUMN+CELL
row0 column=productinfo:categoryname, timestamp=1742659721598, value=informatica_acessorios
row0 column=productinfo:descriptionlenght, timestamp=1742659721598, value=978
row0 column=productinfo:heightcm, timestamp=1742659721598, value=15
row0 column=productinfo:lengthcm, timestamp=1742659721598, value=25
row0 column=productinfo:namelength, timestamp=1742659721598, value=25
row0 column=productinfo:photosqty, timestamp=1742659721598, value=3
row0 column=productinfo:productid, timestamp=1742659721598, value=00e4ded51458037ecda216453eb79d3c
row0 column=productinfo:weightg, timestamp=1742659721598, value=1400
row0 column=productinfo:widthcm, timestamp=1742659721598, value=25