tenho abaixo o JSON que estou lendo do Kafka e estou tentando converter para StructType usando a função from_json.
schema_session_start = StructType([
StructField("ID", StringType()),
StructField("SID", StringType()),
StructField("EP", LongType()),
StructField("IP", StringType()),
StructField("LN", StringType()),
StructField("VN", StringType()),
StructField("DV", StructType([
StructField("MK", StringType()),
StructField("MDL", StringType()),
StructField("OS", StringType()),
StructField("OSVN", StringType()),
StructField("AR", StringType())
])),
StructField("MC", StringType()),
StructField("FN", StringType()),
StructField("NW", StructType([
StructField("TP", StringType())
])),
StructField("AL", StringType()),
StructField("EN", StringType())
])
valor | PT |
---|---|
{"ID":"651551912131b2.07017577","SID":"169156360280217644","EP":1695895952305,"IP":"10.10.10.10","LN":"","VN":"2.4.0.0 ","DV":{"MK":"Jio","MDL":"JHSD200","OS":"JioOS 2","OSVN":"9","AR":"armeabi-v7a"} ,"MC":"02:00:00:00:00:00","FN":true,"NW":"TP":"wifi_5"},"AL":"GRIPdemo","EN": "Sessão_Início"} | Sessão_Início |
array_df = condition_df.withColumn("value_json",from_json(col("value"),when(condition_df.EN == "Session_Start", schema_session_start)))
Estou recebendo o erro abaixo quando tento converter:
ERRO:root:Ocorreu um erro: o objeto 'StructField' não possui atributo '_get_object_id'
O segundo argumento do
from_json
deve ser string com esquema ouStructType
(consulte a documentação ), mas no seu caso é umColumn
. Se você quiser aplicar um esquema específico apenas a um determinado tipo de evento, precisará fazer isso de maneira diferente - sairwhen
defrom_json
, algo assim: