我们有一个通用的事务表,每天通过标准管道加载历史数据,然后使用流、任务、程序将数据(批处理)处理到这个最终表上。现在,公司改变了加载数据的方式,改为使用 kafka connect,它会创建一个原始表,读取发布到主题中的 avro 消息。
由 kafka 连接器创建的这个原始表自动具有变体列,特别是以下形式的列之一:
[
{
"transactionDetailId": 112233445566,
"brandCode": "31137209",
"transactionOrigin": "Origin 1",
"installmentCreditTransactionId": 1,
...
},
{
"transactionDetailId": 112233445566,
"brandCode": "31137209",
"transactionOrigin": "Origin 1",
"installmentCreditTransactionId": 2,
},
...
]
这个由 kafka connect 创建的原始表仅包含自 2024-10-01(yyyy-mm-dd)以来的新数据,并且历史数据已经加载到这个已经提到的最终表中(已处理的数据)。
我面临的挑战是将此历史事务表数据转换为此变体列。对于其他表,我已经能够使用 snowflake 的函数 OBJECT_CONSTRUCT 将这些事务数据转换为变体列,但对于此表,尤其是此列,事情是这样的
{
"transactionDetailId": 112233445566,
"brandCode": "31137209",
"transactionOrigin": "Origin 1",
"installmentCreditTransactionId": 1,
...
}
是最终表格中的一行,并且
{
"transactionDetailId": 112233445566,
"brandCode": "31137209",
"transactionOrigin": "Origin 1",
"installmentCreditTransactionId": 2,
},
是另一行。
拆分此变量列的属性是“installmentCreditTransactionId”,因此在这种情况下,这两个对象转换为两行。如果我转到事务表,我会看到以下情况:
transactionDetailId brandCode transactionOrigin installmentCreditTransactionId
998877665544 55555555 Origin 2 1
998877665544 55555555 Origin 2 2
998877665544 55555555 Origin 2 3
那应该转换成:
[
{
"transactionDetailId": 998877665544,
"brandCode": "55555555",
"transactionOrigin": "Origin 2",
"installmentCreditTransactionId": 1,
...
},
{
"transactionDetailId": 998877665544,
"brandCode": "55555555",
"transactionOrigin": "Origin 2",
"installmentCreditTransactionId": 2,
},
{
"transactionDetailId": 998877665544,
"brandCode": "55555555",
"transactionOrigin": "Origin 2",
"installmentCreditTransactionId": 3,
},
...
]
我不明白 OBJECT_CONSTRUCT 在这里有何帮助,因为每条消息的 installmentCreditTransactionId id 数量是随机的。
对于如何解决这个问题,您有什么想法吗?使用 OBJECT_AGGR?
编辑 1:这是我目前所拥有的:
SELECT
OBJECT_AGG(tab1.id_1::VARCHAR,
OBJECT_CONSTRUCT(
'col1', null,
'col2', tab1.col2::VARCHAR,
'id_1', tab1.id_1::VARCHAR, -- one of the id's
'id_2', tab1.id_2::VARCHAR, -- one of the id's
'col3', COALESCE(tab1.col3::VARCHAR, '0'),
'id_3', tab1.id_3::VARCHAR, -- one of the id's
'col4', tab1.col4::VARCHAR,
'id_4', tab1.id_4::VARCHAR, -- one of the id's
'col5', CURRENT_TIMESTAMP()::VARCHAR,
'col6', null) OVER (PARTITION BY tab1.id_1::VARCHAR, tab1.id_2::VARCHAR, tab1.id_3::VARCHAR, tab1.id_4::VARCHAR) AS result
FROM
db.schema.table1 tab1
LEFT JOIN
db.schema.table2 tab2
ON
...
LEFT JOIN
db.schema.table3 tab3
ON
...;
但没有返回我需要的,而是返回:
第一行:
{
"1": {
"col1": "28031866",
"col2": "524",
"id_1": "1",
"id_2": "46554737145",
"col3": 0,
"id_3": "39866",
"col_4": "100",
"id_4": "969424002",
"col_5": "2025-03-13",
"col_6": "2024-10-11"
}
}
第二行:
{
"2": {
"col1": "28031866",
"col2": "522",
"id_1": "2",
"id_2": "46554737145",
"col3": 0,
"id_3": "39866",
"col_4": "99",
"id_4": "969424002",
"col_5": "2024-11-13",
"col_6": "2024-10-11"
}
}
第三行:
{
"3": {
"col1": "28031866",
"col2": "522",
"id_1": "3",
"id_2": "46554737145",
"col3": 0,
"id_3": "39866",
"col_4": "99",
"id_4": "969424002",
"col_5": "2025-01-15",
"col_6": "2024-10-11"
}
}
我需要的是这样的:
[
{
"col1": "28031866",
"col2": "522",
"id_1": "1",
"id_2": "46554737145",
"col3": 0,
"id_3": "39866",
"col_4": "99",
"id_4": "969424002",
"col_5": "2025-01-15",
"col_6": "2024-10-11"
},
{
"col1": "28031866",
"col2": "522",
"id_1": "2",
"id_2": "46554737145",
"col3": 0,
"id_3": "39866",
"col_4": "99",
"id_4": "969424002",
"col_5": "2025-01-15",
"col_6": "2024-10-11"
},
{
"col1": "28031866",
"col2": "522",
"id_1": "3",
"id_2": "46554737145",
"col3": 0,
"id_3": "39866",
"col_4": "99",
"id_4": "969424002",
"col_5": "2025-01-15",
"col_6": "2024-10-11"
}
]
在一行中。看看只有 id_1 应该将所有内容聚合在一行中。但我不明白如何实现这一点