在 Pyspark 中,尝试根据长度数组列“Col1”查找偏移量。不想使用 UDF,因此尝试使用转换来获得解决方案。但面临错误。请建议任何解决方法
Col1 Offset
[3,4,6,2,1] [[0,3],[4,8],[9,15],[16,18],[19,20]]
[10,5,4,3,2] [[0,10],[11,16],[17,21],[22,25],[26,28]]
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, expr
spark = SparkSession.builder \
.appName("Calculate Offset Column") \
.getOrCreate()
data = [([3.0, 4.0, 6.0, 2.0, 1.0],),
([10.0, 5.0, 4.0, 3.0, 2.0],)]
df = spark.createDataFrame(data, ["Col1"])
df = df.withColumn("Offsets",
f.expr("""transform(Col1, (x, i) -> struct(coalesce(sum(Col1) over (order by i rows between unbounded preceding and current row) - x, 0) as start,
sum(Col1) over (order by i rows between unbounded preceding and current row) as end))"""))
错误:运算符 !Window 中的 Col1#454 中缺少解析的属性 i#462 [Col1#454,transform(Col1#454,lambdafunction(struct(start,coalesce((sum(cast(Col1 as double))) windowspecdefinition( lambda i#462 ASC NULLS FIRST、specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) - lambda x#461),cast(0 as double)), end, sum(cast(Col1 as double)) windowspecdefinition (lambda i#462 ASC NULLS FIRST,指定窗口帧(RowFrame,unboundedpreceding$(),currentrow$()))),lambda x#461,lambda i#462,false)) AS Offsets#458],[lambda i#462 ASC 首先为空]。;
已在 Spark 3.5.0 上测试并运行。
注意:它可能不是最漂亮的选项,您可以用一些漂亮的 PySpark 代码替换大部分
selectExpr
(例如,pyspark.sql.Window
)。不过,就窗口函数而言,我个人更喜欢 SQL,因为在我看来它们不太冗长。
编辑:添加
array_sort
. 不确定这对您是否重要,但collect_list
在洗牌后可能会以不同的顺序返回元素。由于该Offset
列预计是单调上升的,因此array_sort
可以用来获得确定性的输出。