Estou trabalhando neste exemplo de funções de agregação para PySpark Window
.
Aqui está o dataframe:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
simpleData = (("James", "Sales", 3000), \
("Michael", "Sales", 4600), \
("Robert", "Sales", 4100), \
("Maria", "Finance", 3000), \
("James", "Sales", 3000), \
("Scott", "Finance", 3300), \
("Jen", "Finance", 3900), \
("Jeff", "Marketing", 3000), \
("Kumar", "Marketing", 2000),\
("Saif", "Sales", 4100) \
)
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James |Sales |3000 |
|Michael |Sales |4600 |
|Robert |Sales |4100 |
|Maria |Finance |3000 |
|James |Sales |3000 |
|Scott |Finance |3300 |
|Jen |Finance |3900 |
|Jeff |Marketing |3000 |
|Kumar |Marketing |2000 |
|Saif |Sales |4100 |
+-------------+----------+------+
Um dos WindowSpec's no tutorial particiona as linhas por "departamento" e classifica por "salário" dentro de cada departamento:
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("department").orderBy("salary")
Para me familiarizar com Window
as operações, tentei adicionar uma coluna "MaxRowNum" contendo o número máximo de linhas dentro de cada partição. Para facilitar isso, uma coluna intermediária "RowNum" contém o número de linhas dentro de cada partição:
from pyspark.sql.functions import row_number
df \
.withColumn('RowNum',row_number().over(windowSpec)) \
.withColumn('MaxRowNum',max(col('RowNum')).over(windowSpec)) \
.show()
+-------------+----------+------+------+---------+
|employee_name|department|salary|RowNum|MaxRowNum|
+-------------+----------+------+------+---------+
| Maria| Finance| 3000| 1| 1|
| Scott| Finance| 3300| 2| 2|
| Jen| Finance| 3900| 3| 3|
| Kumar| Marketing| 2000| 1| 1|
| Jeff| Marketing| 3000| 2| 2|
| James| Sales| 3000| 1| 2|
| James| Sales| 3000| 2| 2|
| Robert| Sales| 4100| 3| 4|
| Saif| Sales| 4100| 4| 4|
| Michael| Sales| 4600| 5| 5|
+-------------+----------+------+------+---------+
Conforme mostrado acima, os valores "RowNum" estão corretos, mas "MaxRowNum" não contêm o número máximo de linhas dentro de cada partição. Eles contêm apenas o número da linha, exceto para linhas empatadas, onde contêm o maior dos dois números de linha.
Mais adiante no tutorial, encontrei um WindowSpec que não tinha classificação e ele me deu o resultado correto (veja a coluna "MaxRowCORRECT"):
windowSpecAgg = Window.partitionBy("department") # No sorting
df.withColumn("row",row_number().over(windowSpec)) \
.withColumn('MaxRowNum',max(col('row')).over(windowSpec)) \
.withColumn("MaxRowCORRECT",max(col("row")).over(windowSpecAgg)) \
.show()
+-------------+----------+------+---+---------+-------------+
|employee_name|department|salary|row|MaxRowNum|MaxRowCORRECT|
+-------------+----------+------+---+---------+-------------+
| Maria| Finance| 3000| 1| 1| 3|
| Scott| Finance| 3300| 2| 2| 3|
| Jen| Finance| 3900| 3| 3| 3|
| Kumar| Marketing| 2000| 1| 1| 2|
| Jeff| Marketing| 3000| 2| 2| 2|
| James| Sales| 3000| 1| 2| 5|
| James| Sales| 3000| 2| 2| 5|
| Robert| Sales| 4100| 3| 4| 5|
| Saif| Sales| 4100| 4| 4| 5|
| Michael| Sales| 4600| 5| 5| 5|
+-------------+----------+------+---+---------+-------------+
Meu entendimento é que as funções de agregação do Windows operam sobre a totalidade de cada partição. O código acima mostra que esse não é necessariamente o caso. Eu escaneei a documentação do Windows, mas não consegui encontrar a descrição inequívoca desse comportamento condicional.
Existe realmente um esquema consistente e totalmente documentado para as operações das funções do Windows? Onde eu perdi isso na documentação?
Fundo
Após a resposta de mazaneicha , percebi que precisava de informações básicas sobre taxonomia de funções Window. Os links do PySpark para os termos relevantes produzem páginas vazias ( currentRow
, unboundedPreceding
,
unboundedFollowing
). Essas coisas parecem vir do SQL. Embora eu não tenha encontrado nada sobre rowFrame
e rangeFrame
, as páginas a seguir fornecem informações básicas sobre os outros termos acima (assim como a documentação para rowsBetween ):