我有一张时间序列表。它包含snapshot
和increment
值
snapshot
意味着具体价值increment
意味着拍摄以前的快照并添加增量值以获得快照值
sensor_id - id of sensor
ts - timestamp
temp - temperature at a given ts
value_type - 0 means snapshot, 1 means increment
sensor_id | ts | temp | value_type
sensor_X | 1 | 100 | 0
sensor_X | 2 | 5 | 1
sensor_X | 3 | -2 | 1
sensor_X | 4 | 95 | 0
sensor_Y | 4 | 90 | 0
sensor_Y | 5 | -5 | 1
我的目标是从表构建时间序列视图
1| 2| 3| 4| 5|
sensor_X 100| 105| 103| 95| |
sensor_Y | | | 90| 85|
用户通过 ts range 和 sensor_id 查询数据:
show me sensor temperature where sensor_id starts with sensor_0* and ts in range (2023-01-20, 2023-01-23)
我尝试实现聚合函数
-- thisValue, thisValueType, nextValue, nextValueType
CREATE OR REPLACE FUNCTION calculate_snapshot_internal(bigint, int, bigint, int)
returns bigint
language sql
as
$$
select case
when $3 is null then null -- metric value explicitly set to null, means it removed
when $4 = 1 or $4 = 2 then $3 -- next value is SNAPSHOT or FULL_SNAPSHOT, just set it
when $4 = 0 and $1 is not null then $1 + $3 -- next value is INCREMENTAL, add it to current SNAPSHOT
end
$$;
CREATE aggregate calculate_snapshot (bigint, int) (
sfunc = calculate_snapshot_internal,
stype = bigint
-- initcond = null We can do this simply by omitting the initcond phrase, so that the initial state condition is null
);
并查询
select sensor_id,
calculate_snapshot(value, valueType) OVER (PARTITION BY sensor_id ORDER BY ts) as snapshot_value
from metric_table;
我无法执行CREATE aggregate calculate_snapshot
错误:
[42883] ERROR: function calculate_snapshot_internal(bigint, bigint, integer) does not exist
不明白它应该如何工作。
UPD:错误解释被标记为正确答案。这是对我有用的最终解决方案
REATE OR REPLACE FUNCTION calculate_snapshot_internal(_this_value bigint,
_next_value bigint,
_next_value_type int,
OUT _code bigint)
LANGUAGE plpgsql AS
$func$
BEGIN
if _next_value is null then
_code = _this_value; -- just pass by whatever it has, null means missing value
elseif _next_value_type = 1 or _next_value_type = 2 then -- SNAPSHOT value
_code = _next_value;
elseif _next_value_type = 0 and _this_value is not null then -- Add INCREMENT value to accumulated value
_code = _this_value + _next_value;
end if;
END
$func$;
drop aggregate if exists calculate_snapshot(bigint, int);
CREATE aggregate calculate_snapshot(bigint, int) (
sfunc = calculate_snapshot_internal,
stype = bigint
);
正如文档所说,sfunc 的签名必须采用聚合状态类型 (stype) 加上聚合函数参数类型或类型。由于 stype 是 bigint 并且聚合类型是
(bigint, int)
,这使得 sfunc 需要(bigint, bigint, int)
,正如错误消息所指示的那样。由于您的 sfunc 从未引用 $2,为什么它声明接受它?我不明白你想要它做什么(详细),但考虑到正在使用的其他类型,至少 sfunc 的签名是清楚的。