我正在尝试将 AutoLoader 与 Databricks 一起使用,但我遇到了以下建议和错误:
org.apache.spark.sql.catalyst.util.UnknownFieldException:
[UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_FILE]
Encountered unknown fields during parsing: [col_name_here],
which can be fixed by an automatic retry: true
我无法使用 Google 找到捕获任何此错误语言的单个文档,因此我不确定如何以及在何处可以使用此假定的修复。
我的代码如下:
xd = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("cloudFiles.schemaLocation", "dbfs:/mnt/temp/checkpoints/schema") \
.option("cloudFiles.schemaEvolutionMode","addNewColumns") \
.option("pathGlobfilter", "20230808_*") \
.load("/mnt/test_loc") \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/mnt/temp/checkpoints") \
.option("mergeSchema", "true") \
.option("overwriteSchema", "true") \
.toTable("dBronze.Table1")
您只需要再次重新运行代码,它就会拾取架构中的更改。如果它是自动化作业,您可以在作业级别配置重试(请参阅文档)。
PS 您可以尝试查看Delta Live Tables - 它会自动重试此类任务。