我正在研究PySpark 的聚合函数这个示例Window
。
这是数据框:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
simpleData = (("James", "Sales", 3000), \
("Michael", "Sales", 4600), \
("Robert", "Sales", 4100), \
("Maria", "Finance", 3000), \
("James", "Sales", 3000), \
("Scott", "Finance", 3300), \
("Jen", "Finance", 3900), \
("Jeff", "Marketing", 3000), \
("Kumar", "Marketing", 2000),\
("Saif", "Sales", 4100) \
)
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James |Sales |3000 |
|Michael |Sales |4600 |
|Robert |Sales |4100 |
|Maria |Finance |3000 |
|James |Sales |3000 |
|Scott |Finance |3300 |
|Jen |Finance |3900 |
|Jeff |Marketing |3000 |
|Kumar |Marketing |2000 |
|Saif |Sales |4100 |
+-------------+----------+------+
教程中的一个 WindowSpec 按“部门”对行进行分区,并按每个部门内的“薪水”排序:
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("department").orderBy("salary")
为了熟悉Window
操作,我尝试添加一个列“MaxRowNum”,其中包含每个分区内的最大行数。为了方便起见,中间列“RowNum”包含每个分区内的行数:
from pyspark.sql.functions import row_number
df \
.withColumn('RowNum',row_number().over(windowSpec)) \
.withColumn('MaxRowNum',max(col('RowNum')).over(windowSpec)) \
.show()
+-------------+----------+------+------+---------+
|employee_name|department|salary|RowNum|MaxRowNum|
+-------------+----------+------+------+---------+
| Maria| Finance| 3000| 1| 1|
| Scott| Finance| 3300| 2| 2|
| Jen| Finance| 3900| 3| 3|
| Kumar| Marketing| 2000| 1| 1|
| Jeff| Marketing| 3000| 2| 2|
| James| Sales| 3000| 1| 2|
| James| Sales| 3000| 2| 2|
| Robert| Sales| 4100| 3| 4|
| Saif| Sales| 4100| 4| 4|
| Michael| Sales| 4600| 5| 5|
+-------------+----------+------+------+---------+
如上所示,“RowNum”值是正确的,但“MaxRowNum”不包含每个分区内的最大行数。它们仅包含行数,但绑定行除外,绑定行包含两个行数中较大的一个。
从本教程的后面部分开始,我发现了一个没有排序的 WindowSpec,它给了我正确的结果(参见“MaxRowCORRECT”列):
windowSpecAgg = Window.partitionBy("department") # No sorting
df.withColumn("row",row_number().over(windowSpec)) \
.withColumn('MaxRowNum',max(col('row')).over(windowSpec)) \
.withColumn("MaxRowCORRECT",max(col("row")).over(windowSpecAgg)) \
.show()
+-------------+----------+------+---+---------+-------------+
|employee_name|department|salary|row|MaxRowNum|MaxRowCORRECT|
+-------------+----------+------+---+---------+-------------+
| Maria| Finance| 3000| 1| 1| 3|
| Scott| Finance| 3300| 2| 2| 3|
| Jen| Finance| 3900| 3| 3| 3|
| Kumar| Marketing| 2000| 1| 1| 2|
| Jeff| Marketing| 3000| 2| 2| 2|
| James| Sales| 3000| 1| 2| 5|
| James| Sales| 3000| 2| 2| 5|
| Robert| Sales| 4100| 3| 4| 5|
| Saif| Sales| 4100| 4| 4| 5|
| Michael| Sales| 4600| 5| 5| 5|
+-------------+----------+------+---+---------+-------------+
我的理解是,窗口聚合函数对每个分区的整体进行操作。上面的代码表明情况不一定如此。我浏览了 Windows 文档,但找不到这种条件行为的明确描述。
是否有一致且完整记录的 Windows 函数操作方案?我在文档中的哪个地方遗漏了它?
背景
根据mazaneicha 的回答,我意识到我需要了解窗口函数分类的背景知识。PySpark 链接到相关术语会产生空白页(currentRow
、unboundedPreceding
、
unboundedFollowing
)。这些东西似乎来自 SQL。虽然我没有在rowFrame
和上找到任何内容,但以下页面提供了上述其他术语的背景知识( rowsBetweenrangeFrame
的文档也是如此):
这是在对无序窗口进行聚合时使用不同默认值作为窗口框架的效果
orderBy
。根据Spark 在线文档:因此,为了使其按照您的期望工作,您需要明确设置界限: