李翔-大数据技术

Big data technology!

第4章 Spark的开发基础


Spark SQL综合练习一:了解Spark SQL与数据读取

实验目标

  • 熟悉Spark SQL基本概念。

  • 学会使用Spark SQL从MySQL读取数据。

实验步骤

  1. 环境准备:配置Spark与MySQL连接。

  2. 数据准备:在MySQL中创建ods数据库和score表,并插入数据。

  3. 数据读取:使用Spark SQL读取MySQL中score表的数据,并展示。

代码示例

  • 数据准备:在MySQL创建数据库:ods;数据表:score,并向表中插入一些记录。

# 删除数据库
drop DATABASE if EXISTS ods cascade;

# 创建数据库
create database if not EXISTS ods;

# 选择数据库
use ods;

# 创建数据表
create table score(
   student_id int not null primary key,
   name varchar(20),
   score int
);

# 插入数据
INSERT INTO score(student_id, name, score) VALUES
(1, '陈晨', 92),
(2, '刘翔', 102),
(3, '杨洋', 70),
(4, '吴越', 50),
(5, '周宇', 60),
(6, '徐飞', -62),
(7, '郭靖', 56),
(8, '黄蓉', 74),
(9, '林冲', 85),
(10, '宋江', 70);

select * from score;
  • Scala代码展示如何使用Spark SQL读取MySQL数据。

// 定义包名
package org.example

// 导入 SparkSession 类,为Spark功能的入口点,可以创建DataFrame,执行SQL查询
import org.apache.spark.sql.SparkSession

// 定义一个名为 SparkJob01 的对象
object sparkJob01 {
 // 程序的入口点
 def main(args: Array[String]): Unit = {
   // 创建 SparkSession 对象
   val spark = SparkSession.builder()
     .appName("example") // 设置应用程序名称
     .master("local[*]") // 本地运行模式,允许在单机上用多线程模拟分布式计算,其中 local[*] 使用与机器逻辑核心数相等的线程。
     //.config("spark.driver.bindAddress", "localhost")  //配置了 Spark Driver 绑定的地址为本地主机【Ubuntu下idea下运行程序必须,否则运行报错,但是打成jar包在集群运行时需要注释此句】
     .getOrCreate()      // 创建或获取一个 SparkSession

   // 定义数据库的连接 URL
   val DB_URL = "jdbc:mysql://192.168.36.100:3306/ods?useSSL=false"

   // 使用 Spark 读取 MySQL 数据
   val peoplesDF = spark.read
     .format("jdbc")                            // 指定格式为 jdbc
     .option("driver", "com.mysql.jdbc.Driver") // 指定 JDBC 驱动程序
     .option("url", DB_URL)                     // 指定数据库 URL
     .option("dbtable", "score")                // 指定要读取的表
     .option("user", "root")                    // 指定数据库用户名
     .option("password", "123456")              // 指定数据库密码
     .load() // 加载数据

   // 打印 DataFrame 的 schema
   peoplesDF.printSchema()
   // 显示 DataFrame 的内容
   peoplesDF.show()

   // 关闭SparkSession,释放资源
   spark.stop()  
 }
}

运行结果:

root
|-- student_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- score: integer (nullable = true)

+----------+----+-----+
|student_id|name|score|
+----------+----+-----+
|         1|陈晨|   92|
|         2|刘翔|  102|
|         3|杨洋|   70|
|         4|吴越|   50|
|         5|周宇|   88|
|         6|徐飞|  -62|
|         7|郭靖|   56|
|         8|黄蓉|   74|
|         9|林冲|   85|
|        10|宋江|   70|
+----------+----+-----+



Spark SQL综合练习二:基础数据清洗

实验目标

  • 学会基础的数据转换与处理。

实验步骤

  1. 数据读取:从MySQL读取scores数据表。

  2. 数据探索与处理:执行数据清洗,检查并处理数据质量问题,如缺失值、异常值处理。

  3. 数据转存:把上述清洗过后的数据转存到Hive的ods库下的数据表score

  4. 任务提交:把spark项目打为Jar包,提交到集群中去运行

代码示例

  • 在Hive中创建数据库:ods,在ods数据库下创建数据表:score

-- 删除数据库ods
DROP DATABASE IF EXISTS ods cascade;

-- 创建数据库ods
CREATE DATABASE IF NOT EXISTS ods;

-- 选择数据库
use ods;

-- 创建一个名为 score 的表,如果这个表已经存在,则不会创建新表
CREATE TABLE IF NOT EXISTS ods.score (
   student_id INT,    -- 定义一个名为 student_id 的列,数据类型为 INT(整数)
   name STRING,       -- 定义一个名为 name 的列,数据类型为 STRING(字符串)
   score INT          -- 定义一个名为 score 的列,数据类型为 INT(整数)
)
ROW FORMAT DELIMITED      -- 指定表的行格式为定界格式,即字段由特定字符分隔
FIELDS TERMINATED BY ','  -- 定义字段之间由逗号分隔
STORED AS TEXTFILE;       -- 指定表数据存储为文本文件

-- 查看数据表
select * from ods.score
  • Scala代码展示检查并处理数据质量问题,如缺失值、异常值处理。

// 定义包名
package org.example

// 导入 SparkSession 类
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 定义一个名为 SparkJob02 的对象
object SparkJob02 {
 // 程序的入口点
 def main(args: Array[String]): Unit = {
   // 设置Hadoop用户为"root",这通常是为了在使用Hadoop环境时具有相应的权限。
   System.setProperty("HADOOP_USER_NAME", "root")

   // 创建 SparkSession 对象
   val spark = SparkSession.builder()
     .appName("example") // 设置应用程序名称
     .master("local[*]") // 设置运行模式为本地模式,使用所有可用的核心
     .enableHiveSupport() // 启用对Hive的支持
     .config("hive.metastore.uris", "thrift://192.168.36.100:9083")
     .getOrCreate() // 创建或获取一个 SparkSession

   // 定义数据库的连接 URL
   val DB_URL = "jdbc:mysql://192.168.36.100:3306/ods?useSSL=false"

   // 使用 Spark 读取 MySQL 数据
   val peoplesDF = spark.read
     .format("jdbc")                            // 指定格式为 jdbc
     .option("driver", "com.mysql.jdbc.Driver") // 指定 JDBC 驱动程序
     .option("url", DB_URL)                     // 指定数据库 URL
     .option("dbtable", "score")                // 指定要读取的表
     .option("user", "root")                    // 指定数据库用户名
     .option("password", "123456")              // 指定数据库密码
     .load() // 加载数据

   // 打印 DataFrame 的 schema
   // println("表结构")
   // peoplesDF.printSchema()
   // 显示 DataFrame 的内容
   println("清洗前的表数据")
   peoplesDF.show()

   // 数据清洗:替换缺失值为50,替换小于0和大于100的成绩为60
   val cleanedDF = peoplesDF
     .na.fill(50) // 缺失值替换为50
     .withColumn("score", when(col("score") < 0 || col("score") > 100, 60).otherwise(col("score")))
  // 将score列中小于0或大于100的值替换为60,其他值保持不变。

   println("清洗后的表数据")
   cleanedDF.show()

   // 将清洗后的数据覆盖存储到Hive表ods.score
   cleanedDF.write.mode("overwrite").saveAsTable("ods.score")
   //cleanedDF.write.mode("overwrite").insertInto("ods.score")

   println("数据成功写到Hive")

   // 关闭SparkSession,释放资源
   spark.stop()  
 }
}

项目打包:

方法一:在 IDEA 中通过Maven 工具窗口执行Maven 生命周期中:Lifecycle-Clean-Package来打包

方法二:在 IDEA 的在Terminal 视图中输入命令:mvn clean package

任务提交:把IDEA的target目录中打好的项目Jar包,提交到集群中去运行

准备工作:在Spark集群连接MySQL需要MySQL驱动程序,故需把mysql-connector-java-5.1.40-bin.jar 复制到Spark下的jars目录下。

# 提交任务
[root@master ~]# spark-submit --master yarn --deploy-mode client \
--class org.example.sparkJob02 Spark2024-1.0.jar
  1. spark-submit: 提交 Spark 应用程序到集群的脚本。

  2. --master yarn: 指定 Spark 应用程序将在 YARN 管理下的集群上执行,使用 YARN 作为资源管理器。

  3. --deploy-mode client: 设置应用程序在本地机器上运行,而非集群节点。

  4. --class org.example.sparkJob03: 定义应用的主类。

  5. Spark2024-1.0.jar: 指定包含应用程序代码的 JAR 文件。


任务优化:实现在spark中创建Hive库与表

  1. 数据读取:从MySQL读取score数据表。

  2. 数据探索与处理:执行数据清洗,检查并处理数据质量问题,如缺失值、异常值处理。

  3. 创建Hive数据库与表:在IDEA的代码中创建Hive的ods库和数据表score

  4. 数据转存:把上述清洗过后的数据转存到Hive的ods库下的数据表score

环境准备:

1.把hive-site.xmlcore-site.xmlhdfs-site.xmlLog4日志复制到IDEA中的resources目录

2.把mysql-connector-java-5.1.40-bin.jar 复制到Spark下的jars目录下。

// 定义包名
package org.example

// 导入 SparkSession 类
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 定义一个名为 SparkJob021 的对象
object sparkJob021 {
 // 程序的入口点
 def main(args: Array[String]): Unit = {
 // 在windows下开发时设置,
 // 设置程序使用的用户对象,需要添加在sparkSession对象创建之前
 System.setProperty("HADOOP_USER_NAME", "root")

 // 创建 SparkSession 对象
 val spark = SparkSession.builder()
 .appName("example") // 设置应用程序名称
 .master("local[*]") // 设置运行模式为本地模式,使用所有可用的核心
 .enableHiveSupport() // 启用对Hive的支持
 .config("hive.metastore.uris", "thrift://192.168.36.100:9083") // Hive Metastore服务地址
 .getOrCreate() // 创建或获取一个 SparkSession 实例

 // 定义数据库的连接 URL
 val DB_URL = "jdbc:mysql://192.168.36.100:3306/ods?useSSL=false"

 // 使用 Spark 读取 MySQL 数据,定义一个DataFrame
 val peoplesDF = spark.read
 .format("jdbc") // 指定格式为 jdbc
 .option("driver", "com.mysql.jdbc.Driver") // 指定 JDBC 驱动程序
 .option("url", DB_URL) // 指定数据库 URL
 .option("dbtable", "score") // 指定要读取的表
 .option("user", "root") // 指定数据库用户名
 .option("password", "123456") // 指定数据库密码
 .load() // 加载数据

 // 打印 DataFrame 的 schema
 // 显示 DataFrame 的内容
 println("清洗前的表数据")
 peoplesDF.show()

 // 数据清洗:替换缺失值为50,替换小于0和大于100的成绩为60
 val cleanedDF = peoplesDF
 .na.fill(50) // 缺失值替换为50
 .withColumn("score", when(col("score") < 0 || col("score") > 100, 60).otherwise(col("score")))
 // 将score列中小于0或大于100的值替换为60,其他值保持不变。

 println("清洗后的表数据")
 cleanedDF.show()

 // 创建 ods 数据库
 spark.sql("DROP DATABASE IF EXISTS ods cascade")

 // 创建 ods 数据库,如果不存在的话
 spark.sql("CREATE DATABASE IF NOT EXISTS ods")

 // 使用新创建的数据库
 //spark.sql("USE ods")

 // 创建表,如果不存在的话
 spark.sql("""
   CREATE TABLE IF NOT EXISTS ods.score (
     student_id INT,
     name STRING,
     score INT
   )
   ROW FORMAT DELIMITED
   FIELDS TERMINATED BY ','
   STORED AS TEXTFILE
""")

 // 将清洗后的数据覆盖存储到Hive表ods.score
 cleanedDF.write.mode("overwrite").saveAsTable("ods.score")
 //cleanedDF.write.mode("overwrite").insertInto("ods.score")

 println("数据成功写到Hive")

 // 关闭SparkSession,释放资源
 // spark.stop()
}
}

项目打包:

方法一:在 IDEA 中通过Maven 工具窗口执行Maven 生命周期中:Lifecycle-Clean-Package来打包

方法二:在 IDEA 的在Terminal 视图中输入命令:mvn clean package

任务提交:把IDEA的target目录中打好的项目Jar包,提交到集群中去运行

准备工作:在Spark集群连接MySQL需要MySQL驱动程序,故需把mysql-connector-java-5.1.40-bin.jar 复制到Spark下的jars目录下。

# 提交任务
[root@master ~]# spark-submit --master yarn --deploy-mode client \
--class org.example.sparkJob031 Spark2024-1.0.jar


Windows下配置映射

报错:运行程序,报master异常

原因:因为我们是在windows下的idea环境下运行spark,同时我们把core-site.xml\hdfs-site.xml\hive-site.xml复制到了idea下的resoures目录下,但是因为这些配置文件中我们使用的master做为主机的地址,而在windows下不知道master映射的什么地址,故报错

解决:

# 打开 "C:\Windows\System32\drivers\etc\hosts"
# 编辑这个hosts文件

# 使用管理员权限打开记事本:
# 1.首先使用管理员权限打开记事本。可以通过搜索“记事本”来做,然后在搜索结果上右键选择“以管理员身份运行”。
# 2.通过记事本打开 hosts 文件:
# 添加以下信息
192.168.36.100 master
192.168.36.101 slave1
192.168.36.102 slave2


Spark SQL综合练习三:读取Hive的数据

实验目标

  • 使用Spark SQL进行读取Hive的数据操作。

实验步骤

  1. 数据读取:从Hive中读取ods.score数据表。

代码示例

  • Scala代码读取Hive的数据表

package org.example
import org.apache.spark
import org.apache.spark.sql.SparkSession

object SparkJob03 {
 def main(args: Array[String]): Unit= {
   // 设置Hadoop用户为"root",这通常是为了在使用Hadoop环境时具有相应的权限。
   System.setProperty("HADOOP_USER_NAME","root")

   //创建SparkSession
   val spark = SparkSession.builder()
     .master("local[*]")
     .appName("bigdata play")
     .enableHiveSupport()   // 启用Hive支持
     .config("hive.metastore.uris", "thrift://192.168.36.100:9083")  // Hive Metastore服务地址,与客户端Spark之间通过Thrift定义的方式进行有效的数据交换
     .getOrCreate()

   // 读取数据.................................................//
   // 方法一:使用spark的API
   // 读取Hive中的ods.score,生成spark的虚拟二维表Dataframe[简称DF]
   val df=spark.read.table("ods.score")
   println("使用spark的API方法查看Dataframe")
   df.show

   // 方法二:使用spark.sql查看Hive 中的数据库和数据表
   // spark.sql("show databases") 返回一个DataFrame
   println("使用spark.sql查看Hive 中的数据库和数据表")
   spark.sql("show databases").show()
   spark.sql("select * from ods.score").show()

   // 关闭SparkSession,释放资源
   spark.stop()
 }
}


Spark SQL综合练习四:数据指标简单计算

实验目标

  • 学会基础的数据转换与处理。

  • 使用Spark SQL进行数据筛选、聚合等操作。

实验步骤

  1. 数据读取:从Hive中读取ods.score数据表。

  2. 数据探索与处理:执行数据筛选、聚合操作,如计算平均分、最高低分、筛选不及格的学生等。

代码示例

  • Scala代码展示数据筛选、聚合操作。

package org.example
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SparkJob04 {
 def main(args: Array[String]): Unit = {
   // 设置Hadoop用户名称,这通常在Windows开发环境中需要设置
   System.setProperty("HADOOP_USER_NAME", "root")

   // 创建SparkSession实例,启用Hive支持
   val spark = SparkSession.builder()
     .master("local[*]")
     .appName("bigdata play")
     .enableHiveSupport() // 启用Hive支持
     .config("hive.metastore.uris", "thrift://192.168.36.100:9083") // 配置Hive Metastore服务的地址
     .getOrCreate()

   // 读取ods.score表中的数据
   val df = spark.read.table("ods.score")
   println("所有学生的成绩" )
   df.show()

   // 计算平均分
     // .agg() 函数用于对DataFrame进行聚合操作,可以计算如总和、平均值、最大值、最小值等统计量。
     // .first():聚合完成后,调用 first() 来获取结果DataFrame的第一行
     // .getDouble(0):这个函数从行的第一列中检索值,并将其作为双精度浮点数(double)返回。
   val avgScore = df.agg(avg("score")).first().getDouble(0)
   println("计算出的平均分是一个Spark的Dataframe二维表")
   df.agg(avg("score")).show()  
   println("平均分:" + avgScore)

   // 计算最高分
   val maxScore = df.agg(max("score")).first().getInt(0)
   println("最高分:" + maxScore)

   // 计算最低分
   val minScore = df.agg(min("score")).first().getInt(0)
   println("最低分:" + minScore)

   // 筛选不及格的学生(这里假设及格分数线为60分)
   val failingStudents = df.filter(col("score") < 60)
   println("不及格学生" )
   failingStudents.show()

   // 关闭SparkSession,释放资源
   // spark.stop()
 }
}

代码解释:

val avgScore = Df.agg(avg("score")).first().getDouble(0)
  1. Df: 这是一个 DataFrame,代表了一个结构化的数据集。在这个上下文中,Df包含了一些数据,其中包括一个名为 score 的列。

  2. .agg(avg("score")): 这部分代码使用了 agg 函数来聚合数据。agg 函数是用来执行聚合操作的,如求和、平均值、最大值等。在这里,avg("score") 被传递给 agg 函数,表示对 score 列求平均值。avg 是一个聚合函数,用于计算其参数所指定列的平均值。

  3. .first(): agg 函数执行完毕后会返回一个新的 DataFrame,这个 DataFrame 只包含一个行(因为是平均值)。first() 函数用于获取这个 DataFrame 的第一行。

  4. .getDouble(0): 在获取了这行数据之后,getDouble(0) 用于从这行数据中提取第一个字段(索引为 0),并将其作为一个双精度浮点数(Double)返回。这里,这个字段就是之前计算出的平均分。

  5. 可以更改为下面的语句

    df.agg(avg("score")) 会默认生成Df,列名为avg("score")

    val avgScore = df.agg(avg("score")).first().getAs[Double]("avg(score)")



代码补充,加载数据

如果之前的hive任务没有完成,所以Hive中没有数据,运行下面代码,直接在hive中创建数据库与表,加载数据

package org.example
// 导入SparkSession类,创建和配置 Spark 应用程序的入口点
import org.apache.spark.sql.SparkSession
// 导入Spark SQL数据保存操作模式的类
import org.apache.spark.sql.SaveMode

object SparkJob04LoadData {
 def main(args: Array[String]): Unit = {
   // 设置Hadoop用户为"root",这通常是为了在使用Hadoop环境时具有相应的权限。
   System.setProperty("HADOOP_USER_NAME", "root")

   // 创建SparkSession对象,是Spark SQL的入口点,并启用Hive支持
   val spark = SparkSession.builder()
     .appName("Hive Data Load")  // 设置应用程序名称
     .master("local[*]")         // 本地模式运行,使用所有可用的核心
     .enableHiveSupport()        // 启用Hive支持
     .config("hive.metastore.uris", "thrift://192.168.36.100:9083") // 设置Hive元数据存储的Thrift服务地址
     .getOrCreate()

   // 创建或更新数据库和表结构
   // 删除数据库(如果存在)并级联删除相关的所有表
   spark.sql("DROP DATABASE IF EXISTS ods CASCADE")
   // 创建新的数据库(如果不存在)
   spark.sql("CREATE DATABASE IF NOT EXISTS ods")
   // 创建或确认表结构存在,设置表的存储格式为TEXTFILE
   spark.sql("""
     CREATE TABLE IF NOT EXISTS ods.score (
       student_id INT,
       name STRING,
       score INT
     )
     ROW FORMAT DELIMITED
   FIELDS TERMINATED BY ','
     STORED AS TEXTFILE
   """)

   // 插入数据到Hive表
   // 启用对 DataFrame操作的隐式转换和隐式类功能,并允许使用列名作为标识符直接访问 DataFrame 的列。
   import spark.implicits._

   // 定义一个序列,每个元素都是一个包含三个字段的元组:student_id(学生ID),name(学生姓名),score(分数)。
   val data = Seq(
     (1, "陈晨", 92),
     (2, "刘翔", 89),
     (3, "杨洋", 70),
     (4, "吴越", 50),
     (5, "周宇", 60),
     (6, "徐飞", 63),
     (7, "郭靖", 56),
     (8, "黄蓉", 74),
     (9, "林冲", 85),
     (10, "宋江", 71)
   )
   // 使用 toDF 方法将序列转换为 DataFrame 并命名列。
   // toDF 函数是隐式转换提供的一个方法,它将 Seq 或其他集合转换为 DataFrame。
   val df = data.toDF("student_id", "name", "score")

   // 将 DataFrame 的内容写入数据库表。  
   df.write.mode(SaveMode.Append).insertInto("ods.score")

   // 从Hive读取数据并展示
   println("从Hive读取数据并展示:")
   spark.sql("SELECT * FROM ods.score").show()

   // 关闭SparkSession
   // spark.stop()
 }
}

项目打包:

方法一:在 IDEA 中通过Maven 工具窗口执行Maven 生命周期中:Lifecycle-Clean-Package来打包

方法二:在 IDEA 的在Terminal 视图中输入命令:mvn clean package

任务提交:把IDEA的target目录中打好的项目Jar包,提交到集群中去运行

# 提交任务
[root@master ~]# spark-submit --master yarn --deploy-mode client --class org.example.SparkJob04LoadData Spark2024-1.0.jar

解释:

  1. spark-submit: 这是提交 Spark 作业的命令工具。

  2. --master yarn: 这个选项指定了 Spark 作业的主节点(master)类型。使用的是 YARN来管理资源。

  3. --deploy-mode client: 指定部署模式为客户端模式。Spark 应用程序【驱动程序】运行在提交作业的机器上(即客户端),而不是在集群的某个工作节点上。

  4. --class org.example.SparkJob04LoadData: 指定要运行的主类,即 Spark 应用程序的入口点。

  5. Spark2024-1.0.jar: 指定包含 Spark 应用程序的 JAR 文件名。




Spark SQL综合练习五:数据指标高级处理

实验目标

  1. 任务1:高级数据筛选

    • 计算所有学生分数的平均值,并筛选出高于平均分的学生数据。

    • 筛选出分数在60到90之间的学生数据。

  2. 任务2:数据分组与排序

    • 将学生根据分数分组(低于60为低,60到80为中,高于80为高),并统计每组的人数。

    • 将学生数据按照分数降序排序。

  3. 任务3:数据聚合与窗口函数

    • 使用窗口函数对学生的分数进行排名。

实验步骤

  1. 配置和初始化Spark环境,包括设置Hadoop用户、创建SparkSession实例,并启用对Hive的支持以读取数据。

  2. 执行高级数据筛选,计算所有学生分数的平均值,并筛选出超过平均值和分数在特定区间内的学生数据。

  3. 对学生数据进行分组和排序,根据分数将学生分入不同的组别,并按照分数降序排序学生数据。

  4. 使用窗口函数对数据进行聚合和排名,展示每个学生的分数排名,并在完成所有数据处理后结束Spark会话。

实验的重难点

  1. 环境配置:

    • 正确配置Spark和Hive,包括权限设置和元存储连接,以确保访问Hive表。

  2. 数据处理技巧:

    • 熟悉使用聚合函数计算平均值,以及进行高级筛选和排序。

  3. 分组与聚合:

    • 实现对数据的有效分组并进行统计,需要对Spark SQL的分组和聚合功能有深入理解。

  4. 窗口函数应用:

    • 使用窗口函数进行数据排名和计算分数差距,这涉及到对窗口规范和函数的准确应用。

  5. 代码效率和调试:

    • 确保代码正确性和优化执行效率,特别是在处理大规模数据时的性能考虑。

代码示例

// 定义包名称,用于组织和管理类
package org.example

// 引入必要的Spark SQL库
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

// 定义一个名为SparkJob的对象,包含主要的执行逻辑
object SparkJob05 {
 def main(args: Array[String]): Unit = {
   // 设置Hadoop用户为"root",这通常是为了在使用Hadoop环境时具有相应的权限。
   System.setProperty("HADOOP_USER_NAME", "root")

   // 创建一个SparkSession实例,用于执行Spark操作
   val spark = SparkSession.builder()
     .master("local[*]")            // 设置本地运行模式,使用所有可用的核心
     .appName("bigdata play")       // 应用程序名称
     .enableHiveSupport()           // 启用对Hive的支持,允许读取Hive表
     .config("hive.metastore.uris", "thrift://192.168.36.100:9083") // 配置Hive元存储的地址
     .getOrCreate()                 // 创建或获取一个SparkSession

   // 读取名为ods.score的Hive表数据
   val df = spark.read.table("ods.score")
   println("所有学生的成绩:")
   df.show() // 显示读取的数据

   // 任务1:执行高级数据筛选
   // 计算分数的平均值【agg方法:是用来进行聚合函数】
   val averageScore = df.agg(avg("score")).first().getDouble(0)
   // 筛选出高于平均分的数据
   val aboveAverage = df.filter(col("score") > averageScore)
   println("筛选出高于平均分的数据:")  
   aboveAverage.show()

   // 筛选出分数在60到90之间的数据,包括指定的边界值60和90
   val scoreRange = df.filter(col("score").between(60, 90))
   println("筛选出分数在60到90之间的数据:")  
   scoreRange.show()

   // 任务2:按照分数降序排序数据
   val sortedScores = df.orderBy(col("score").desc)
   println("按照分数降序排序数据:")  
   sortedScores.show()

   // 任务3:数据分组  
   println("成绩分组并统计数量:")
   // 新增"score_group"列,然后根据分数判断值,然后分组并计数
   val scoreGroups = df.withColumn("score_group",
                                   when(col("score") < 60, "低")
                                   .when(col("score").between(60, 80), "中")
                                   .otherwise("高"))
                       .groupBy("score_group")
                       .count()
   scoreGroups.show()


   // 任务4:使用窗口函数进行数据聚合与排名
   println("数据聚合与窗口函数:")
   // 定义一个窗口规范,按分数降序
   val windowSpec = Window.orderBy(col("score").desc)
   // 应用rank窗口函数,根据指定的窗口规范windowSpec计算每个学生的排名
     // .over(windowSpec) 表示 rank() 函数应用于按 score 降序排列的整个数据集【窗口】。
   val rankedData = df.withColumn("rank", rank().over(windowSpec))
   rankedData.show()  

   // 任务5:使用窗口函数计算与最高分的差距
     // 定义一个窗口规范,其中数据按分数降序排列,窗口从分区起始到当前行。窗口的大小和内容会随着每一行的处理而动态变化。
   val maxScoreWindow = Window.orderBy(col("score").desc).rangeBetween(Window.unboundedPreceding, Window.currentRow)
     // 计算每个学生分数与当前窗口内最高分的差距,并添加为新列
   val resultData = rankedData.withColumn("score_diff", max(col("score")).over(maxScoreWindow) - col("score"))  
   println("数据降序排名:")  
   resultData.show()

   // 完成所有操作后,停止Spark会话
   spark.stop()
 }
}


代码解释:

val scoreGroups = df.withColumn("score_group", 
                               when(col("score") < 60, "低")
                               .when(col("score").between(60, 80), "中")
                               .otherwise("高"))
                   .groupBy("score_group")
                   .count()

新增列:使用 withColumn 方法添加了一个名为 "score_group" 的新列到 DataFrame df。这个新列的值是根据 "score" 列的值来确定的,使用了 whenotherwise 函数来进行条件判断:

  • 如果 "score" 的值小于 60,"score_group" 列的值为 "低"。

  • 如果 "score" 的值在 60 到 80 之间(包括 60 和 80),"score_group" 列的值为 "中"。

  • 如果 "score" 的值大于 80,"score_group" 列的值为 "高"。


窗口规范定义

val maxScoreWindow = Window.orderBy(col("score").desc).rangeBetween(Window.unboundedPreceding, Window.currentRow)

这行代码定义了一个名为 maxScoreWindow 的窗口规范,具体含义如下:

  • 排序 (orderBy(col("score").desc)): 窗口中的数据按照 score 列的值进行降序排序。这意味着最高的分数将位于窗口的最前面。

  • 范围 (rangeBetween(Window.unboundedPreceding, Window.currentRow)): 这定义了窗口的范围从当前分区的第一行到当前行(包括当前行)。这个设置允许窗口包括从分区开始到当前处理行的所有行。

窗口应用和新列的计算

val resultData = rankedData.withColumn("score_diff", max(col("score")).over(maxScoreWindow) - col("score"))

这行代码向 rankedData DataFrame 添加了一个新列 score_diff,这个新列的值由以下计算得出:

  • 窗口函数计算 (max(col("score")).over(maxScoreWindow)): 这部分计算当前窗口(根据上面定义的 maxScoreWindow 规范)中 score 列的最大值。因为窗口是从每个分区的开始直到当前行并且数据是按分数降序排序的,所以窗口的第一行总是当前窗口的最大分数。

  • 计算分数差 (- col("score")): 从当前窗口的最大分数中减去当前行的分数,得到当前行的分数与窗口中最高分的差距。

窗口的创建和逻辑

在这段代码中,不是为每一行创建一个单独的窗口,而是窗口随着DataFrame中行的处理动态扩展。对于每一行,窗口包含从开始到这一行的所有行(因为是按分数降序,所以最高分总在窗口的开始)。随着处理到DataFrame的下一行,窗口包含的行范围相应扩展,包括新的当前行。

总结

所以,虽然不是为每一行创建一个独立的窗口,但是窗口的大小和内容会随着每一行的处理而动态变化。这种窗口的定义允许计算每一行相对于它之前所有行(包括自身)的最大分数的差距。这是一种强大的数据分析方法,尤其适用于需要考虑累积或连续数据范围的场景。


Spark SQL综合练习六:数据存储动态分区

实验目标

  • 实践使用Spark SQL进行动态分区。

实验步骤

动态分区写入:利用Spark将清洗后的数据根据成绩进行分段(小于60为"不及格",60-80为"良好",80-100为"优秀"),并动态分区写入到Hive中。

代码示例

  • 1.预先在Hive中创建grade_score表,并启用动态分区。

-- 设置Hive以启用动态分区
SET hive.exec.dynamic.partition = true;

-- 设置动态分区模式为非严格模式,在非严格模式下,不需要指定所有的分区列
SET hive.exec.dynamic.partition.mode = nonstrict;

-- 创建数据库dws
CREATE DATABASE IF NOT EXISTS dws;

-- 创建一个新的表grade_score,如果这个表不存在的话
CREATE TABLE IF NOT EXISTS dws.grade_score(
  student_id INT,
  name STRING,
  score INT
)
PARTITIONED BY (grade STRING)
STORED AS TEXTFILE;
  • 在Spark应用中,对DataFrame添加一列grade,根据成绩范围将成绩分为"不及格"、"良好"、"优秀"。

  • 使用Spark DataFrame的write.partitionBy("grade")方法,将数据动态分区写入Hive表。

  • 注意分区值不能为“中文”,会报错。


  • 2.完整的Spark应用示例

package org.example
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SparkJob06 {
  def main(args: Array[String]): Unit = {
    // 创建SparkSession实例,启用Hive支持
    val spark = SparkSession.builder()
      .master("local[*]") // 设置本地模式,使用所有可用的CPU核心
      .appName("ETL Job") // 设置应用程序的名称
      // 兼容Hive Parquet存储格式
      .config("spark.sql.parquet.writeLegacyFormat", "true")
      // 打开Hive动态分区的标志
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      // 启用动态分区覆盖模式
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("hive.metastore.uris", "thrift://192.168.36.100:9083") // 配置Hive Metastore服务的地址
      .enableHiveSupport() // 启用Hive支持
      .getOrCreate() // 获取或创建SparkSession实例

    // 读取ods.score表中的数据
    val scoresDf = spark.read.table("ods.score")
    println("所有学生的成绩:")
    scoresDf.show()

    // 使用Spark的udf方法定义一个成绩分段的用户自定义函数,此函数根据传入的整数成绩返回相应的字母等级。
    // 输入参数是整数类型的成绩(score: Int),返回值是字符串类型的等级("A", "B", "C")。
    val scoreToGradeUdf = udf(
      // 定义一个匿名函数,它将整数类型的成绩(score)作为输入。
      (score: Int) =>
        // 使用模式匹配来根据成绩判断返回的等级。
        // 模式匹配类似于多个if-else条件语句,但更为直观和强大。
        score match {
          case score if score < 60 => "C"  // 如果成绩小于60,返回"C"
          case score if score >= 60 && score <= 80 => "B"  // 如果成绩在60到80之间,返回"B"
          case score if score > 80 => "A"  // 如果成绩大于80,返回"A"
        }
    )

    // 使用用户自定义函数应用于Spark DataFrame中的列,添加成绩分段列,用于动态分区
    val ScoresGradeDf = scoresDf.withColumn("grade", scoreToGradeUdf(col("score")))
    println("学生的成绩等级:")
    ScoresGradeDf.show()

    // 动态分区写入Hive
    ScoresGradeDf.write
      .mode("overwrite")
      .partitionBy("grade")
      .format("hive")               // 设置写入格式为Hive,表示数据将存储在Hive表中
      .saveAsTable("dws.grade_score")
      
    println("成绩成功写入分区表")
      
    // 关闭SparkSession,释放资源
    spark.stop()
  }
}


解释下面的语法的格式:

val scoreToGradeUdf = udf((score: Int) => score match {
      case score if score < 60 => "C"
      case score if score >= 60 && score <= 80 => "B"
      case score if score > 80 => "A"
    })

这行Scala代码定义了一个用户自定义函数(UDF),用于将数值分数转换为字母等级。详细解释:

  1. val scoreToGradeUdf =: 这部分声明了一个名为scoreToGradeUdf的不可变变量。val关键字用于定义不可变变量,这意味着一旦scoreToGradeUdf被赋值,它的值就不能被更改。

  2. udf: 这是Spark SQL中的一个方法,用于将普通的Scala函数包装成用户自定义函数(UDF),使其能够在Spark的DataFrame的列上应用。此UDF可以被应用到DataFrame的列上,将数值成绩转换为等级标签。

  3. ((score: Int) => ...): 这部分定义了一个匿名函数,接受一个名为scoreInt类型参数。这个匿名函数使用了Scala的箭头语法,即(参数列表) => 函数体,用于定义函数体。

  4. score match {...}: 这部分使用了Scala的模式匹配语法,类似于其他编程语言中的switch-case语句。它根据score的值匹配不同的情况(case)。

  5. case语句:

    • case score if score < 60 => "C": 如果score小于60,这个case匹配,函数返回字符串"C"。

    • case score if score >= 60 && score <= 80 => "B": 如果score在60到80之间(包含60和80),这个case匹配,函数返回字符串"B"。

    • case score if score > 80 => "A": 如果score大于80,这个case匹配,函数返回字符串"A"。

总结来说,这段代码定义了一个名为scoreToGradeUdf的UDF,它根据传入的分数(score)返回相应的等级("A"、"B"、"C")。这个UDF可以应用于Spark DataFrame中的列,以对每个分数值进行等级转换。


运行结果

image-20240317160054431

image-20240317160152582


任务优化:实现在spark中创建Hive库与表

优化任务:在IDEA的代码中创建Hive的dws库和数据表grade_score

package org.example
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SparkJob06 {
  def main(args: Array[String]): Unit = {
    // 创建SparkSession实例,启用Hive支持
    val spark = SparkSession.builder()
      .master("local[*]") // 设置本地模式,使用所有可用的CPU核心
      .appName("ETL Job") // 设置应用程序的名称
      // 打开Hive动态分区的标志
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      // 启用动态分区覆盖模式
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("hive.metastore.uris", "thrift://192.168.36.100:9083") // 配置Hive Metastore服务的地址
      .enableHiveSupport()   // 启用Hive支持
      .getOrCreate()         // 获取或创建SparkSession实例

    // 删除dws数据库
    spark.sql("DROP DATABASE IF EXISTS dws cascade")

    // 创建dws数据库
    spark.sql("CREATE DATABASE IF NOT EXISTS dws")

    // 创建分区表grade_score
    spark.sql("""
      CREATE TABLE IF NOT EXISTS dws.grade_score(
        student_id INT,
        name STRING,
        score INT
      )
      PARTITIONED BY (grade STRING)
      STORED AS TEXTFILE
    """)

    // 读取ods.score表中的数据
    val scoresDf = spark.read.table("ods.score")
    println("所有学生的成绩:")
    scoresDf.show()

    // 使用Spark的udf方法定义一个成绩分段的用户自定义函数,允许在DataFrame的列上进行SQL操作中。
    // 输入参数是整数类型的成绩(score: Int),返回值是字符串类型的等级("A", "B", "C")。
    // 定义一个匿名函数,它将整数类型的成绩(score)作为输入。
    // 使用模式匹配来根据成绩判断返回的等级。
    // 模式匹配类似于多个if-else条件语句,但更为直观和强大。  
    val scoreToGradeUdf = udf(  
      (score: Int) => score match {
          case score if score < 60 => "C"                  // 如果成绩小于60,返回"C"
          case score if score >= 60 && score <= 80 => "B"  // 如果成绩在60到80之间,返回"B"
          case score if score > 80 => "A"                  // 如果成绩大于80,返回"A"
        }
    )

    // 应用用户自定义函数,添加成绩分段列,用于动态分区
    val ScoresGradeDf = scoresDf.withColumn("grade", scoreToGradeUdf(col("score")))
    println("学生的成绩等级:")
    ScoresGradeDf.show()

    // 动态分区写入Hive
    ScoresGradeDf.write
      .mode("overwrite")
      .partitionBy("grade")
      .format("hive")               // 设置写入格式为Hive,表示数据将存储在Hive表中
      .saveAsTable("dws.grade_score")

    println("成绩成功写入分区表")

    // 关闭SparkSession,释放资源
    spark.stop()
  }
}



Spark SQL综合练习七:电商平台信息查询

实验目标

本练习旨在通过一系列步骤,加深对Spark SQL功能的理解和应用。我们将通过加载数据、执行查询、聚合分析以及联表查询等操作,探索Spark SQL处理大数据的能力。练习基于一个假想的电商平台数据,包括用户信息、订单信息和产品信息。

数据集简介

  • 用户信息(users): 包含用户ID、姓名、年龄和性别。

  • 订单信息(orders): 包含订单ID、用户ID、订单金额和订单日期。

  • 产品信息(products): 包含产品ID、产品名称和产品类别。

users.csv

id,name,age,gender
1,张伟,28,男
1,张伟,28,男
2,李娜,16,女
3,王强,35,男
4,赵敏,26,女
5,李明,40,男
6,周丽,24,女
7,吴刚,30,男
8,陈静,29,女
9,林峰,42,男
10,王芳,38,女

orders.csv

id,user_id,product_id,amount,order_date
1,1,101,150,"2020-01-01"
2,2,,210,"2020-01-03"
3,3,102,0,"2020-01-05"
4,4,104,120,"2020-01-07"
5,5,105,160,"2020-01-09"
6,6,101,200,"2020-01-11"
7,7,103,260,"2020-01-13"
8,8,102,300,"2020-01-15"
9,9,104,110,"2020-01-17"
10,10,105,190,"2020-01-19"
11,11,106,220,"2020-01-21"
12,12,107,180,"2020-01-23"
13,13,108,500,"2020-01-25"
14,14,109,130,"2020-01-27"
15,15,110,170,"2020-01-29"
16,16,101,210,"2020-01-31"
17,17,102,250,"2020-02-02"
18,18,103,320,"2020-02-04"
19,19,104,150,"2020-02-06"
20,20,105,200,"2020-02-08"
21,21,106,230,"2020-02-10"
22,22,107,190,"2020-02-12"
23,23,108,510,"2020-02-14"
24,24,109,140,"2020-02-16"
25,25,110,175,"2020-02-18"
26,26,101,215,"2020-02-20"
27,27,102,255,"2020-02-22"
28,28,103,330,"2020-02-24"
29,29,104,155,"2020-02-26"
30,30,105,205,"2020-02-28"
31,1,106,220,"2020-03-02"
32,2,108,310,"2020-03-04"
33,3,110,470,"2020-03-06"
34,4,101,130,"2020-03-08"
35,5,103,165,"2020-03-10"
36,6,105,210,"2020-03-12"
37,7,107,270,"2020-03-14"
38,8,109,310,"2020-03-16"
39,9,106,115,"2020-03-18"
40,10,108,195,"2020-03-20"
41,11,110,225,"2020-03-22"
42,12,101,185,"2020-03-24"
43,13,103,490,"2020-03-26"
44,14,105,135,"2020-03-28"
45,15,107,175,"2020-03-30"
46,16,109,220,"2020-04-01"
47,17,106,260,"2020-04-03"
48,18,108,320,"2020-04-05"
49,19,110,155,"2020-04-07"
50,20,101,205,"2020-04-09"
51,21,103,240,"2020-04-11"
52,22,105,190,"2020-04-13"
53,23,107,520,"2020-04-15"
54,24,109,145,"2020-04-17"
55,25,106,180,"2020-04-19"
56,26,108,220,"2020-04-21"
57,27,110,265,"2020-04-23"
58,28,101,340,"2020-04-25"
59,29,103,160,"2020-04-27"
60,30,105,210,"2020-04-29"
61,1,107,235,"2020-05-01"
62,2,109,320,"2020-05-03"
63,3,106,460,"2020-05-05"
64,4,108,140,"2020-05-07"
65,5,110,170,"2020-05-09"
66,6,101,215,"2020-05-11"
67,7,103,275,"2020-05-13"
68,8,105,305,"2020-05-15"
69,9,107,120,"2020-05-17"
70,10,109,190,"2020-05-19"
71,11,106,230,"2020-05-21"
72,12,108,180,"2020-05-23"
73,13,110,500,"2020-05-25"
74,14,101,140,"2020-05-27"
75,15,103,170,"2020-05-29"
76,16,105,220,"2020-05-31"
77,17,107,265,"2020-06-02"
78,18,109,330,"2020-06-04"
79,19,106,160,"2020-06-06"
80,20,108,210,"2020-06-08"
81,21,110,245,"2020-06-10"
82,22,101,195,"2020-06-12"
83,23,103,510,"2020-06-14"
84,24,105,150,"2020-06-16"
85,25,107,185,"2020-06-18"
86,26,109,225,"2020-06-20"
87,27,106,265,"2020-06-22"
88,28,108,350,"2020-06-24"
89,29,110,165,"2020-06-26"
90,30,101,215,"2020-06-28"
91,3,107,280,"2020-07-01"
92,7,104,320,"2020-07-04"
93,10,102,190,"2020-07-06"
94,14,108,210,"2020-07-08"
95,20,105,260,"2020-07-10"
96,22,103,220,"2020-07-12"
97,25,110,330,"2020-07-14"
98,29,106,180,"2020-07-16"
99,1,109,300,"2020-07-18"
100,6,101,220,"2020-07-20"
101,11,107,340,"2020-07-22"
102,16,104,210,"2020-07-24"
103,18,102,190,"2020-07-26"
104,21,108,220,"2020-07-28"
105,26,105,260,"2020-07-30"
106,30,103,240,"2020-08-01"
107,2,108,310,"2020-08-03"
108,5,110,150,"2020-08-05"
109,8,103,210,"2020-08-07"
110,13,101,330,"2020-08-09"
111,17,104,280,"2020-08-11"
112,19,107,220,"2020-08-13"
113,23,105,310,"2020-08-15"
114,24,102,160,"2020-08-17"
115,27,109,290,"2020-08-19"
116,28,106,230,"2020-08-21"
117,4,108,320,"2020-08-23"
118,9,110,190,"2020-08-25"
119,12,103,220,"2020-08-27"
120,15,101,340,"2020-08-29"
121,20,104,270,"2020-08-31"
122,26,107,240,"2020-09-02"
123,29,105,320,"2020-09-04"
124,30,102,170,"2020-09-06"
125,7,109,300,"2020-09-08"
126,14,106,240,"2020-09-10"
127,3,104,280,"2020-09-12"
128,6,102,190,"2020-09-14"
129,10,109,310,"2020-09-16"
130,12,107,150,"2020-09-18"
131,18,105,220,"2020-09-20"
132,21,103,330,"2020-09-22"
133,23,101,290,"2020-09-24"
134,25,108,260,"2020-09-26"
135,27,106,210,"2020-09-28"
136,29,104,310,"2020-09-30"
137,2,102,200,"2020-10-02"
138,5,109,300,"2020-10-04"
139,8,107,190,"2020-10-06"
140,11,105,310,"2020-10-08"
141,13,103,230,"2020-10-10"
142,16,101,260,"2020-10-12"
143,19,108,290,"2020-10-14"
144,22,106,220,"2020-10-16"
145,24,104,310,"2020-10-18"
146,26,102,210,"2020-10-20"
147,28,109,320,"2020-10-22"
148,30,107,190,"2020-10-24"
149,1,105,330,"2020-10-26"
150,4,103,240,"2020-10-28"
151,7,101,270,"2020-10-30"
152,9,108,300,"2020-11-01"
153,14,106,190,"2020-11-03"
154,17,104,320,"2020-11-05"
155,20,102,230,"2020-11-07"
156,23,109,260,"2020-11-09"
157,2,105,250,"2020-11-11"
158,4,107,260,"2020-11-13"
159,6,109,220,"2020-11-15"
160,8,101,240,"2020-11-17"
161,10,103,270,"2020-11-19"
162,12,105,300,"2020-11-21"
163,14,107,230,"2020-11-23"
164,16,109,210,"2020-11-25"
165,18,101,220,"2020-11-27"
166,20,103,290,"2020-11-29"
167,22,105,310,"2020-12-01"
168,24,107,260,"2020-12-03"
169,26,109,240,"2020-12-05"
170,28,101,250,"2020-12-07"
171,30,103,280,"2020-12-09"
172,1,105,260,"2020-12-11"
173,3,107,220,"2020-12-13"
174,5,109,210,"2020-12-15"
175,7,101,230,"2020-12-17"
176,9,103,240,"2020-12-19"
177,11,105,270,"2020-12-21"
178,13,107,260,"2020-12-23"
179,15,109,250,"2020-12-25"
180,17,101,240,"2020-12-27"
181,19,103,290,"2020-12-29"
182,21,105,310,"2020-12-31"
183,23,107,260,"2021-01-02"
184,25,109,240,"2021-01-04"
185,27,101,250,"2021-01-06"
186,29,103,280,"2021-01-08"
187,1,106,280,"2021-01-10"
188,3,108,230,"2021-01-12"
189,5,110,190,"2021-01-14"
190,7,102,310,"2021-01-16"
191,9,104,220,"2021-01-18"
192,11,106,330,"2021-01-20"
193,13,108,240,"2021-01-22"
194,15,110,260,"2021-01-24"
195,17,102,280,"2021-01-26"
196,19,104,230,"2021-01-28"
197,21,106,250,"2021-01-30"
198,23,108,270,"2021-02-01"
199,25,110,290,"2021-02-03"
200,27,102,220,"2021-02-05"
201,29,104,210,"2021-02-07"
202,2,106,320,"2021-02-09"
203,4,108,240,"2021-02-11"
204,6,110,260,"2021-02-13"
205,8,102,280,"2021-02-15"
206,10,104,230,"2021-02-17"
207,12,106,250,"2021-02-19"
208,14,108,270,"2021-02-21"
209,16,110,220,"2021-02-23"
210,18,102,240,"2021-02-25"
211,20,104,260,"2021-02-27"
212,22,106,280,"2021-03-01"
213,24,108,230,"2021-03-03"
214,26,110,210,"2021-03-05"
215,28,102,220,"2021-03-07"
216,30,104,240,"2021-03-09"
217,2,107,210,"2021-03-11"
218,4,109,260,"2021-03-13"
219,6,101,220,"2021-03-15"
220,8,103,310,"2021-03-17"
221,10,105,180,"2021-03-19"
222,12,107,290,"2021-03-21"
223,14,109,230,"2021-03-23"
224,16,101,240,"2021-03-25"
225,18,103,260,"2021-03-27"
226,20,105,280,"2021-03-29"
227,22,107,220,"2021-03-31"
228,24,109,240,"2021-04-02"
229,26,101,260,"2021-04-04"
230,28,103,210,"2021-04-06"
231,30,105,220,"2021-04-08"
232,1,107,230,"2021-04-10"
233,3,109,250,"2021-04-12"
234,5,101,270,"2021-04-14"
235,7,103,210,"2021-04-16"
236,9,105,230,"2021-04-18"
237,11,107,250,"2021-04-20"
238,13,109,220,"2021-04-22"
239,15,101,240,"2021-04-24"
240,17,103,260,"2021-04-26"
241,19,105,280,"2021-04-28"
242,21,107,220,"2021-04-30"
243,23,109,240,"2021-05-02"
244,25,101,260,"2021-05-04"
245,27,103,280,"2021-05-06"
246,29,105,230,"2021-05-08"
247,2,107,250,"2021-05-10"
248,4,109,270,"2021-05-12"
249,6,101,210,"2021-05-14"
250,8,103,230,"2021-05-16"
251,10,105,250,"2021-05-18"
252,12,107,270,"2021-05-20"
253,14,109,210,"2021-05-22"
254,16,101,230,"2021-05-24"
255,18,103,250,"2021-05-26"
256,20,105,270,"2021-05-28"
257,22,107,210,"2021-05-30"
258,24,109,230,"2021-06-01"
259,26,101,250,"2021-06-03"
260,28,103,270,"2021-06-05"
261,30,105,210,"2021-06-07"
262,1,107,230,"2021-06-09"
263,3,109,250,"2021-06-11"
264,5,101,270,"2021-06-13"
265,7,103,210,"2021-06-15"
266,9,105,230,"2021-06-17"
267,11,107,250,"2021-06-19"
268,13,109,270,"2021-06-21"
269,15,101,210,"2021-06-23"
270,17,103,230,"2021-06-25"
271,19,105,250,"2021-06-27"
272,21,107,270,"2021-06-29"
273,23,109,210,"2021-07-01"
274,25,101,230,"2021-07-03"
275,27,103,250,"2021-07-05"
276,29,105,270,"2021-07-07"
277,2,107,210,"2021-07-09"
278,4,109,230,"2021-07-11"
279,6,101,250,"2021-07-13"
280,8,103,270,"2021-07-15"
281,10,105,210,"2021-07-17"
282,12,107,230,"2021-07-19"
283,14,109,250,"2021-07-21"
284,16,101,270,"2021-07-23"
285,18,103,210,"2021-07-25"
286,20,105,230,"2021-07-27"
287,22,107,250,"2021-07-29"
288,24,109,270,"2021-07-31"
289,26,101,210,"2021-08-02"
290,28,103,230,"2021-08-04"
291,30,105,250,"2021-08-06"
292,1,107,270,"2021-08-08"
293,3,109,210,"2021-08-10"
294,5,101,230,"2021-08-12"
295,7,103,250,"2021-08-14"
296,9,105,270,"2021-08-16"
297,11,107,210,"2021-08-18"
298,13,109,230,"2021-08-20"
299,15,101,250,"2021-08-22"
300,17,103,270,"2021-08-24"

products.csv

id,name,category
101,烤面包机,电子产品
102,炊具套装,家居与厨房
103,小说,书籍
104,洗发水,美容与个人护理
105,瑜伽垫,运动与户外
106,T恤,服装
107,棋盘游戏,玩具与游戏
108,笔记本电脑,计算机
109,草坪椅,花园与户外
110,电钻,工具与家居改善
111,,电子产品


模块 1:数据加载到Hive的ODS库(包括创建数据库和表)

实验任务

  1. 环境准备与配置

    • 设置使用的用户为root

    • 初始化Spark会话,配置为本地模式,并启用对Hive的支持。

    • 配置Spark以兼容Hive Parquet格式,启用动态分区,并设置动态分区模式和分区覆盖模式。

  2. 数据库与表的设置

    • ods_users 表存储用户信息。

    • ods_orders 表存储订单信息,并根据order_month进行分区。

    • ods_products 表存储产品信息。

    • 删除并重新创建ods数据库,以确保实验的初始状态。

    • ods数据库中创建三个表:ods_usersods_ordersods_products

  3. 数据读取与处理

    • 从CSV文件中读取用户、订单和产品数据。

    • 对订单数据进行类型转换,确保数据类型与Hive表定义一致。

    • order_date列提取年-月作为order_month列,用于后续的分区。

  4. 数据验证与清洗

    • 检查并处理包含空值的记录。

    • 去除重复的数据记录。

  5. 数据写入Hive表

    • 将清洗后的用户数据写入ods_users表。

    • 将处理后的订单数据根据order_month分区,并写入ods_orders表。

    • 将清洗后的产品数据写入ods_products表。

  6. 结果展示

    • 在数据处理的关键步骤后使用show()方法展示DataFrame的内容,用于观察和验证各阶段的数据处理结果。

通过这些步骤,实验旨在展示如何使用Spark进行数据处理并与Hive集成,处理流程包括从数据读取到转换、验证、清洗,最后将数据加载到Hive表中,整个过程覆盖了数据处理的多个关键方面。

package org.example

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, date_format}
import org.apache.spark.sql.types.{DateType, DoubleType, IntegerType}

object SparkJob071 {
  def main(args: Array[String]): Unit = {
    // 设定Hadoop用户为root,确保Spark作业有足够的权限访问HDFS和Hive
    System.setProperty("HADOOP_USER_NAME", "root")

    // 初始化一个Spark会话,配置为本地运行模式,同时启用Hive支持和设置相关参数
    val spark = SparkSession.builder()
      .appName("ETL Job") // 为Spark应用程序设置一个名称,便于识别
      .master("local[*]") // 设置为本地运行模式,使用所有可用核心
      .config("spark.sql.parquet.writeLegacyFormat", "true") // 为了兼容Hive的Parquet文件格式
      .config("hive.exec.dynamic.partition", "true") // 启用Hive的动态分区功能
      .config("hive.exec.dynamic.partition.mode", "nonstrict") // 设置动态分区模式为非严格模式
      .config("hive.exec.max.dynamic.partitions.per.node", "1000") // 设置每个节点的最大动态分区数
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic") // 启用动态分区数据的覆盖模式
      .enableHiveSupport() // 启用对Hive的支持
      .getOrCreate()

    // 执行数据加载到Hive的主要函数
    loadDataToHive(spark)

    // 执行完毕后,停止Spark会话
    spark.stop()
  }

  def loadDataToHive(spark: SparkSession): Unit = {
    // 删除已存在的ods数据库,级联删除包含的表和数据
    spark.sql("drop DATABASE IF EXISTS ods cascade")
    // 创建一个新的ods数据库,如果不存在
    spark.sql("CREATE DATABASE IF NOT EXISTS ods")

    // 在ods数据库中创建ods_users表,用于存储用户信息
    spark.sql("""
      CREATE TABLE IF NOT EXISTS ods.ods_users (
        id INT,
        name STRING,
        age INT,
        gender STRING
      )
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      STORED AS TEXTFILE
    """)

    // 创建ods_orders表,用于存储订单信息,该表按order_month分区
    spark.sql("""
      CREATE TABLE IF NOT EXISTS ods.ods_orders (
        id INT,
        user_id INT,
        product_id INT,
        amount DOUBLE,
        order_date DATE
      )
      PARTITIONED BY (order_month STRING)
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      STORED AS TEXTFILE
    """)

    // 创建ods_products表,用于存储产品信息
    spark.sql("""
      CREATE TABLE IF NOT EXISTS ods.ods_products (
        id INT,
        name STRING,
        category STRING
      )
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      STORED AS TEXTFILE
    """)
      
    // 查看Hive中ods数据库下的所有表,isTemporary 显示是持久表(不是临时表)
    println("Hive中ods数据库下的表:")  
    spark.sql("SHOW TABLES IN ods").show()
      
    // 读取用户数据,展示表内容,并进行数据清洗
    // 在Spark中读取CSV文件时,默认所有列都是字符串类型(StringType),所以name和gender无需转换。
    val usersDF = spark.read.option("header", "true").csv("file:///D:/data/users.csv")
      .withColumn("id", col("id").cast(IntegerType)) // 转换id为整型
      .withColumn("age", col("age").cast(IntegerType)) // 转换age为整型
      .na.drop() // 删除包含null的行
      .dropDuplicates() // 删除重复行

    // 显示用户数据帧内容
    println("usersDF表内容:")
    usersDF.show()

    // 将处理后的用户数据写入ods_users表
    usersDF.write
      .mode("overwrite")
      .insertInto("ods.ods_users")

    // 读取订单数据,进行类型转换,展示表内容,添加年-月分区列,进行数据清洗,并写入Hive表
    val ordersDF = spark.read.option("header", "true").csv("file:///D:/data/orders.csv")
      .withColumn("id", col("id").cast(IntegerType)) // 转换id为整型
      .withColumn("user_id", col("user_id").cast(IntegerType)) // 转换user_id为整型
      .withColumn("product_id", col("product_id").cast(IntegerType)) // 转换product_id为整型
      .withColumn("amount", col("amount").cast(DoubleType)) // 转换amount为双精度浮点型
      .withColumn("order_date", col("order_date").cast(DateType)) // 转换order_date为日期类型
      .na.drop() // 删除包含null的行
      .dropDuplicates() // 删除重复行
      
    // 显示订单数据帧内容  
    println("ordersDF表内容")
    ordersDF.show()
      
    // 添加order_month列用于分区,并展示处理后的DataFrame内容
    val processedOrdersDF = ordersDF
      .withColumn("order_month", date_format(col("order_date"), "yyyy-MM"))
    println("ordersDF表添加“年-月”列")
    processedOrdersDF.show()
    // 将处理后的订单数据写入ods_orders表
    processedOrdersDF.na.drop().dropDuplicates().write
      .mode("overwrite")
      .insertInto("ods.ods_orders")

    // 读取产品数据,并进行数据的清洗
    val productsDF = spark.read.option("header", "true").csv("file:///D:/data/products.csv")
      .withColumn("id", col("id").cast(IntegerType)) // 转换id为整型
      .na.drop() // 删除包含null的行
      .dropDuplicates() // 删除重复行

    // 显示产品数据帧内容
    println("productsDF表内容:")
    productsDF.show()

    // 将处理后的产品数据写入ods_products表
    productsDF.write
      .mode("overwrite")
      .insertInto("ods.ods_products")

  }
}

在这个示例中,我们首先检查ods数据库是否存在,如果不存在则创建。


模块 2:从ODS层清洗数据后存储到DWD层(包括创建表)

实验任务

  1. 初始化Spark会话

    • 创建一个SparkSession实例,命名为"DWD Data Processor"。

    • 设置运行模式为本地模式,使用所有可用的核心。

    • 启用对Hive的支持,以便能从Hive表中读取数据。

  2. 定义并执行数据清洗与存储函数

    • 调用cleanDataAndStoreToDWD函数,传入Spark会话,用于处理数据并存储到DWD层。

  3. 数据处理与存储逻辑

    • 删除已存在的DWD数据库,包括其所有表和数据。

    • 创建一个新的DWD数据库,如果它还不存在。

    • 在DWD数据库中创建用户表和订单表,如果它们不存在。特别注意,订单表按order_month进行分区。

    • 从ODS层读取用户和订单数据。

  4. 数据清洗与存储

    • 对用户数据进行清洗,筛选出年龄大于等于18岁的用户。

    • 对订单数据进行清洗,筛选出金额大于0的订单。

    • 将清洗后的用户数据存储到DWD层的用户表。

    • 将清洗后的订单数据存储到DWD层的订单表,并按order_month进行分区。

  5. 结束Spark会话

    • 在数据处理和存储完成后,关闭Spark会话。

// 导入必要的Spark SQL库
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 定义一个名为sparkJob072的对象,它是这个作业的主体
object sparkJob072 {
  // main函数是程序的入口点
  def main(args: Array[String]): Unit = {
    // 初始化Spark会话
    val spark = SparkSession.builder()
      .appName("DWD Data Processor") // 设置应用程序名称
      .master("local[*]") // 设置运行模式为本地模式,使用所有可用的核心
      .enableHiveSupport() // 启用对Hive的支持,允许与Hive交互
      .getOrCreate() // 创建Spark会话,如果存在则获取现有的

    // 调用自定义的cleanDataAndStoreToDWD函数,处理并存储数据到DWD层
    cleanDataAndStoreToDWD(spark)

    // 执行完毕后关闭Spark会话
    spark.stop()
  }

  // 定义一个处理数据并将其存储到DWD层的函数
  def cleanDataAndStoreToDWD(spark: SparkSession): Unit = {
    // 删除已存在的DWD数据库,包括其所有表和数据
    spark.sql("DROP DATABASE IF EXISTS dwd cascade")
    // 创建一个新的DWD数据库,如果它还不存在
    spark.sql("CREATE DATABASE IF NOT EXISTS dwd")

    // 在DWD数据库中创建用户表,如果它不存在
    spark.sql("""
      CREATE TABLE IF NOT EXISTS dwd.dwd_users (
        id INT,
        name STRING,
        age INT,
        gender STRING
      )
      STORED AS TEXTFILE
    """)

    // 在DWD数据库中创建订单表,这个表是按照order_month进行分区的,如果它不存在
    spark.sql("""
      CREATE TABLE IF NOT EXISTS dwd.dwd_orders (
        id INT,
        user_id INT,
        product_id INT,
        amount DOUBLE,
        order_date STRING
      )
      PARTITIONED BY (order_month STRING)
      STORED AS TEXTFILE
    """)

    // 从ODS层读取用户和订单数据
    val odsUsersDF = spark.table("ods.ods_users")
    val odsOrdersDF = spark.table("ods.ods_orders")

    // 导入Spark SQL的隐式转换支持,支持列引用、和类型的转换
    import spark.implicits._

    // 对用户数据进行清洗,筛选出年龄大于等于18岁的用户
    val cleanedUsersDF = odsUsersDF.filter($"age" >= 18)
    cleanedUsersDF.show()

    // 对订单数据进行清洗,筛选出金额大于0的订单
    val cleanedOrdersDF = odsOrdersDF.filter($"amount" > 0)
    cleanedOrdersDF.show()

    // 将清洗后的用户数据存储到DWD层的用户表
    cleanedUsersDF.write
      .mode("overwrite")
      .saveAsTable("dwd.dwd_users")

    // 将清洗后的订单数据存储到DWD层的订单表,并按order_month分区
    cleanedOrdersDF.write
      .mode("overwrite")
      .partitionBy("order_month")
      .saveAsTable("dwd.dwd_orders")
  }
}


模块 3:指标计算

实验任务:

  1. 初始化Spark会话

    • 创建一个SparkSession实例,命名为"Metrics Calculator"。

    • 设置运行模式为本地模式,使用所有可用的核心。

    • 启用对Hive的支持,以便能从Hive表中读取数据。

  2. 定义和执行数据指标计算

    • 从DWD层读取用户数据(dwd.dwd_users)和订单数据(dwd.dwd_orders)。

  3. 计算并展示数据指标

    • 对订单数据按用户ID分组。

    • 聚合计算每个用户的订单数量。

    • 将聚合结果与用户数据连接,以获取用户名。

    • 选择用户ID、用户名和订单数量的列,展示结果。

    • 对所有订单数据聚合计算平均金额。

    • 展示平均金额的结果。

    • 对订单数据按用户ID分组。

    • 聚合计算每个用户的订单总金额。

    • 将聚合结果与用户数据连接,以获取用户名。

    • 选择用户ID、用户名和总金额的列,展示结果。

    • 指标1:用户的总订单金额

    • 指标2:所有订单的平均金额

    • 指标3:每个用户的订单数量

// 导入必要的Spark SQL库
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 定义一个对象sparkJob073,作为Spark作业的入口
object sparkJob073 {
  // main函数定义了程序的入口点
  def main(args: Array[String]): Unit = {
    // 初始化一个Spark会话
    val spark = SparkSession.builder()
      .appName("Metrics Calculator") // 设置应用名称为"Metrics Calculator"
      .master("local[*]") // 设置运行模式为本地模式,使用所有可用核心
      .enableHiveSupport() // 启用对Hive的支持,允许从Hive表读取数据
      .getOrCreate() // 创建Spark会话,如果存在则获取现有的会话

    // 调用calculateMetrics函数,传入Spark会话,用于计算和展示数据指标
    calculateMetrics(spark)

    // 计算完成后,关闭Spark会话
    spark.stop()
  }

  // 定义一个函数calculateMetrics,用于计算和展示数据指标
  def calculateMetrics(spark: SparkSession): Unit = {
    // 从DWD层读取用户数据和订单数据
    val dwdUsersDF = spark.table("dwd.dwd_users")
    val dwdOrdersDF = spark.table("dwd.dwd_orders")

    // 指标1:计算每个用户的总订单金额
    val userTotalAmount = dwdOrdersDF
      .groupBy("user_id") // 按用户ID分组
      .agg(sum("amount").alias("total_amount")) // 聚合计算每个用户的订单总金额
      .join(dwdUsersDF, dwdOrdersDF("user_id") === dwdUsersDF("id")) // 将订单数据与用户数据进行连接
      .select("user_id", "name", "total_amount") // 选择展示的列

    // 显示计算结果
    userTotalAmount.show()

    // 指标2:计算所有订单的平均金额
    val averageOrderAmount = dwdOrdersDF.agg(avg("amount").alias("average_amount"))
    // 显示计算结果
    averageOrderAmount.show()

    // 指标3:计算每个用户的订单数量
    val userOrderCount = dwdOrdersDF
      .groupBy("user_id") // 按用户ID分组
      .agg(count("id").alias("order_count")) // 聚合计算每个用户的订单数量
      .join(dwdUsersDF, dwdOrdersDF("user_id") === dwdUsersDF("id")) // 将订单数据与用户数据进行连接
      .select("user_id", "name", "order_count") // 选择展示的列

    // 显示计算结果
    userOrderCount.show()
  }
}


3. 查询练习

3.1 查询所有用户信息

SELECT * FROM users

3.2 查询订单总数

SELECT COUNT(*) AS total_orders FROM orders

3.3 查询每个用户的订单总金额

SELECT u.name, SUM(o.amount) AS total_amount
FROM orders o
JOIN users u ON o.user_id = u.id
GROUP BY u.name

4. 聚合分析

4.1 查询每个产品类别的平均订单金额

SELECT p.category, AVG(o.amount) AS avg_amount
FROM orders o
JOIN products p ON o.product_id = p.id
GROUP BY p.category

4.2 查询下单次数最多的用户

SELECT u.name, COUNT(*) AS order_count
FROM orders o
JOIN users u ON o.user_id = u.id
GROUP BY u.name
ORDER BY order_count DESC
LIMIT 1

5. 高级分析

5.1 查询每个性别的平均年龄

SELECT gender, AVG(age) AS avg_age FROM users GROUP BY gender

5.2 查询过去一个月每天的总销售额

SELECT order_date, SUM(amount) AS total_sales
FROM orders
WHERE order_date >= date_sub(current_date(), 30)
GROUP BY order_date
ORDER BY order_date


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1