在此博客文章中,我们将深入探讨Apache Spark窗口函数。 您可能也对我之前有关Apache Spark的帖子感兴趣。
使用Apache Spark开始您的旅程-第1部分
使用Apache Spark开始您的旅程-第2部分
Apache Spark开始您的旅程-第3部分
深入研究Apache Spark DateTime函数
在Apache Spark中使用JSON
首先,让我们看看什么是窗口函数以及何时使用它们。 我们在Apache Spark中使用了各种功能,例如月份(从日期返回月份),四舍五入(舍入值)和地板(为给定的输入提供底值)等,这些功能将在每条记录上执行并返回一个值 每条记录。 然后,我们将对一组数据执行各种聚合函数,并为每个组返回一个值,例如sum,avg,min,max和count。 但是,如果我们想对一组数据执行该操作,并且希望对每个记录有一个单一的值/结果怎么办? 在这种情况下,我们可以使用窗口函数。 他们可以定义记录的排名,累积分布,移动平均值,或标识当前记录之前或之后的记录。
让我们使用一些Scala API示例来了解以下窗口函数:
汇总:min, max, avg, count, 和 sum.
排名:rank,dense_rank,percent_rank,row_num和ntile
分析性:cume_dist,lag和lead
自定义边界:rangeBetween和rowsBetween
为便于参考,GitHub上提供了一个以JSON文件格式导出的Zeppelin笔记本和一个Scala文件。
创建Spark DataFrame
现在,我们创建一个示例Spark DataFrame,我们将在整个博客中使用它。 首先,让我们加载所需的库。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
现在,我们将使用一些虚拟数据创建DataFrame,这些虚拟数据将用于讨论各种窗口函数。
case class Salary(depName: String, empNo: Long, salary: Long)
val empsalary = Seq(
Salary("sales", 1, 5000),
Salary("personnel", 2, 3900),
Salary("sales", 3, 4800),
Salary("sales", 4, 4800),
Salary("personnel", 5, 3500),
Salary("develop", 7, 4200),
Salary("develop", 8, 6000),
Salary("develop", 9, 4500),
Salary("develop", 10, 5200),
Salary("develop", 11, 5200)).toDF()
这是我们的DataFrame的样子:
窗口集合函数
让我们看一些聚合的窗口函数,看看它们如何工作。
首先,我们需要定义窗口的规范。 假设我们要根据部门获取汇总数据。 因此,在此示例中,我们将基于部门名称(列:depname)定义窗口。
为聚合函数创建窗口规范
val byDepName = Window.partitionBy("depName")
在窗口上应用聚合函数
现在,在部门内(列:depname),我们可以应用各种聚合函数。 因此,让我们尝试查找每个部门的最高和最低工资。 在这里,我们仅选择了所需的列(depName,max_salary和min_salary),并删除了重复的记录。
val agg_sal = empsalary
.withColumn("max_salary", max("salary").over(byDepName))
.withColumn("min_salary", min("salary").over(byDepName))
agg_sal.select("depname", "max_salary", "min_salary")
.dropDuplicates()
.show()
输出:
+———+———-+———-+
| depname|max_salary|min_salary|
+———+———-+———-+
| develop| 6000| 4200|
| sales| 5000| 4800|
|personnel| 3900| 3500|
+———+———-+———-+