我有下面的 JSON,我正在从 Kafka 读取它,然后尝试使用 from_json 函数将其转换为 StructType。
schema_session_start = StructType([
StructField("ID", StringType()),
StructField("SID", StringType()),
StructField("EP", LongType()),
StructField("IP", StringType()),
StructField("LN", StringType()),
StructField("VN", StringType()),
StructField("DV", StructType([
StructField("MK", StringType()),
StructField("MDL", StringType()),
StructField("OS", StringType()),
StructField("OSVN", StringType()),
StructField("AR", StringType())
])),
StructField("MC", StringType()),
StructField("FN", StringType()),
StructField("NW", StructType([
StructField("TP", StringType())
])),
StructField("AL", StringType()),
StructField("EN", StringType())
])
价值 | CN |
---|---|
{“ID”:“651551912131b2.07017577”,“SID”:“169156360280217644”,“EP”:1695895952305,“IP”:“10.10.10.10”,“LN”:“”,“VN”:“2.4.0.0 ","DV":{"MK":"Jio","MDL":"JHSD200","OS":"JioOS 2","OSVN":"9","AR":"armeabi-v7a"} ,"MC":"02:00:00:00:00:00","FN":true,"NW":"TP":"wifi_5"},"AL":"GRIPdemo","EN": “会话_开始”} | 会话开始 |
array_df = condition_df.withColumn("value_json",from_json(col("value"),when(condition_df.EN == "Session_Start", schema_session_start)))
当我尝试转换时出现以下错误:
错误:根:发生错误:“StructField”对象没有属性“_get_object_id”
的第二个参数
from_json
应该是带有模式的字符串或StructType
(请参阅文档),但在您的情况下它是一个Column
. 如果您只想将特定模式应用于给定的事件类型,那么您需要采取不同的做法 - 移至when
之外from_json
,如下所示: