Eu tenho uma tabela com séries temporais. Ele contém snapshot
e increment
valor
snapshot
significa valor concretoincrement
significa pegar o instantâneo anterior e adicionar valor de incremento para obter o valor do instantâneo
sensor_id - id of sensor
ts - timestamp
temp - temperature at a given ts
value_type - 0 means snapshot, 1 means increment
sensor_id | ts | temp | value_type
sensor_X | 1 | 100 | 0
sensor_X | 2 | 5 | 1
sensor_X | 3 | -2 | 1
sensor_X | 4 | 95 | 0
sensor_Y | 4 | 90 | 0
sensor_Y | 5 | -5 | 1
Meu objetivo é criar uma exibição de série temporal a partir da tabela
1| 2| 3| 4| 5|
sensor_X 100| 105| 103| 95| |
sensor_Y | | | 90| 85|
o usuário consulta dados por intervalo ts e sensor_id:
show me sensor temperature where sensor_id starts with sensor_0* and ts in range (2023-01-20, 2023-01-23)
Eu tento implementar a função agregada
-- thisValue, thisValueType, nextValue, nextValueType
CREATE OR REPLACE FUNCTION calculate_snapshot_internal(bigint, int, bigint, int)
returns bigint
language sql
as
$$
select case
when $3 is null then null -- metric value explicitly set to null, means it removed
when $4 = 1 or $4 = 2 then $3 -- next value is SNAPSHOT or FULL_SNAPSHOT, just set it
when $4 = 0 and $1 is not null then $1 + $3 -- next value is INCREMENTAL, add it to current SNAPSHOT
end
$$;
CREATE aggregate calculate_snapshot (bigint, int) (
sfunc = calculate_snapshot_internal,
stype = bigint
-- initcond = null We can do this simply by omitting the initcond phrase, so that the initial state condition is null
);
e consulta
select sensor_id,
calculate_snapshot(value, valueType) OVER (PARTITION BY sensor_id ORDER BY ts) as snapshot_value
from metric_table;
Não consigo executar CREATE aggregate calculate_snapshot
o erro:
[42883] ERROR: function calculate_snapshot_internal(bigint, bigint, integer) does not exist
Não entendo como é suposto funcionar.
UPD: a explicação do erro é marcada como resposta correta. Aqui está a solução final que funcionou para mim
REATE OR REPLACE FUNCTION calculate_snapshot_internal(_this_value bigint,
_next_value bigint,
_next_value_type int,
OUT _code bigint)
LANGUAGE plpgsql AS
$func$
BEGIN
if _next_value is null then
_code = _this_value; -- just pass by whatever it has, null means missing value
elseif _next_value_type = 1 or _next_value_type = 2 then -- SNAPSHOT value
_code = _next_value;
elseif _next_value_type = 0 and _this_value is not null then -- Add INCREMENT value to accumulated value
_code = _this_value + _next_value;
end if;
END
$func$;
drop aggregate if exists calculate_snapshot(bigint, int);
CREATE aggregate calculate_snapshot(bigint, int) (
sfunc = calculate_snapshot_internal,
stype = bigint
);
Como dizem os documentos, a assinatura do sfunc deve levar o tipo de estado agregado (stype) mais o tipo ou tipos de argumento da função agregada. Como o stype é bigint e os tipos agregados são
(bigint, int)
, isso faz com que o sfunc precise(bigint, bigint, int)
, como também indica a mensagem de erro.Como seu sfunc nunca faz referência a $ 2, por que ele declara aceitá-lo? Não consigo entender o que você quer que isso faça (em detalhes), mas a assinatura do sfunc pelo menos é clara, considerando os outros tipos em uso.