下面是我正在使用的命令,但是当我创建 kafka 管道命令时,出现以下错误:错误 1933 ER_EXTRACTOR_EXTRACTOR_GET_LATEST_OFFSETS:无法获取管道的源元数据。无法创建消费者:ssl.ca.location 失败:没有更多可用的错误信息
使用的命令:
CREATE PIPELINE ticketmaster_pipeline AS
LOAD DATA KAFKA ‘b-3.etainmentnonprod.z2xjta.c25.kafka.us-east-1.amazonaws.com:9094,b-1.etainmentnonprod.z2xjta.c25.kafka.us-east-1.amazonaws.com:9094,b-2.etainmentnonprod.z2xjta.c25.kafka.us-east-1.amazonaws.com:9094/ticketmaster’
CONFIG '{“sasl.username”: “AWS_ACCESS_KEY”,
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"ssl.ca.location": "/etc/pki/ca-trust/extracted/java/cacerts"}'
CREDENTIALS ‘{“sasl.password”: “AWS_PSWD/Z”}’
INTO TABLE ticketmaster_kafka
FORMAT JSON (event_url ← event_url,event_id ← event_id,timestamp ← timestamp,event_name ← event_name,venue ← venue,event_datetime ← event_datetime,city ← city,state ← state,section ← section,row ← row,qty ← qty,seat_ids ← seat_ids,seat_numbers ← seat_numbers,inventory_type ← inventory_type,price ← price);
我在 AWS MSK insatnce 上设置了 kafka,并且我在主题中也有数据
这是一个身份验证问题,我们可以通过删除除 CONFIG '{“security.protocol”: “ssl”}' 之外的所有内容来解决它
请试试这个:
创建管道ticketmaster_pipeline 作为加载数据 KAFKA 'b-3.etainmentnonprod.z2xjta.c25.kafka.us-east-1.amazonaws.com:9094,b-1.etainmentnonprod.z2xjta.c25.kafka.us-east-1。 amazonaws.com:9094,b-2.etainmentnonprod.z2xjta.c25.kafka.us-east-1.amazonaws.com:9094/ticketmaster' CONFIG '{"security.protocol": "SSL"}'
INTO TABLE ticketmaster_kafka FORMAT JSON (event_url ← event_url, event_id ← event_id, 时间戳← 时间戳, event_name ← event_name, 地点← 地点, event_datetime ← event_datetime, 城市← 城市, 州← 州, 部分← 部分, 行← 行, 数量← 数量, seat_ids ← seat_ids,seat_numbers ← seat_numbers,inventory_type ← inventory_type,price ← price);