我有一个时间序列数据集。我希望创建一个新列来表示最后报告的(非空)值。我想我已经弄清楚了这部分,使用lag
和的组合last
我还想知道最后报告的(非空)值的时间戳。我从不希望timestamp_ms
它是空的,尽管它val
可以是空的。
示例数据
df = spark.createDataFrame([
Row(timestamp_ms=1672531200000, val='19'),
Row(timestamp_ms=1672532100000, val='20'),
Row(timestamp_ms=1672533000000, val=None),
Row(timestamp_ms=1672533900000, val='22'),
Row(timestamp_ms=1672534800000, val=None),
Row(timestamp_ms=1672535700000, val=None),
Row(timestamp_ms=1672536600000, val='25'),
Row(timestamp_ms=1672537500000, val='20'),
Row(timestamp_ms=1672538400000, val='27')
])
df.show()
示例代码
返回最后的滞后值并尝试返回报告该值的时间戳。
df_lag = df.withColumn("lag_prev_val", F.lag("val")\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("last_lag_prev_val", F.last("lag_prev_val", True)\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("last_lag_prev_time", F.lag("timestamp_ms")\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)
df_lag.show()
电流输出
last_lag_prev_time
表示先前报告的时间戳,而不是与last_lag_prev_val
时间戳毫秒 | 瓦尔 | lag_prev_val | last_lag_prev_val | last_lag_prev_time |
---|---|---|---|---|
1672531200000 | 19 | 无效的 | 无效的 | 无效的 |
1672532100000 | 20 | 19 | 19 | 1672531200000 |
1672533000000 | 无效的 | 20 | 20 | 1672532100000 |
1672533900000 | 22 | 无效的 | 20 | 1672533000000 |
1672534800000 | 无效的 | 22 | 22 | 1672533900000 |
1672535700000 | 无效的 | 无效的 | 22 | 1672534800000 |
1672536600000 | 二十五 | 无效的 | 22 | 1672535700000 |
1672537500000 | 20 | 二十五 | 二十五 | 1672536600000 |
1672538400000 | 二十七 | 20 | 20 | 1672537500000 |
理想输出
我想要的输出(加粗差异)是用于last_lag_prev_time
表示与用于填充“last_lag_prev_val”的timestamp_ms
原始值来自同一行的列val
时间戳毫秒 | 瓦尔 | lag_prev_val | last_lag_prev_val | last_lag_prev_time |
---|---|---|---|---|
1672531200000 | 19 | 无效的 | 无效的 | 无效的 |
1672532100000 | 20 | 19 | 19 | 1672531200000 |
1672533000000 | 无效的 | 20 | 20 | 1672532100000 |
1672533900000 | 22 | 无效的 | 20 | 1672532100000 |
1672534800000 | 无效的 | 22 | 22 | 1672533900000 |
1672535700000 | 无效的 | 无效的 | 22 | 1672533900000 |
1672536600000 | 二十五 | 无效的 | 22 | 1672533900000 |
1672537500000 | 20 | 二十五 | 二十五 | 1672536600000 |
1672538400000 | 二十七 | 20 | 20 | 1672537500000 |
一种解决方案是仅考虑没有 的行中的时间戳,
val
即NULL
,我们可以通过创建一个名为 的列来实现val_timestamp_ms
。然后我们可以从这个新列中获取最后一个时间戳并应用滞后。例如: