我在 Microsoft Fabric 中使用 Spark Notebook。我想从 Lakehouse 中的元数据构建列映射。该映射应写入带有表列表的数据框中的“映射”列中。
我目前的尝试如下:
# Create initial list of table data
dataframe_tablelist = spark.createDataFrame(
[
("abcd", "AB", "t1"),
("efgh", "CD", "t2"),
("efgh", "CD", "t3"),
],
["database", "entity", "table_name"]
)
def construct_mapping(database, entity, table_name):
meta_name = "Metadata_" + database + "_" + entity + "_" + table_name
metadata = spark.sql(f"""select * from {meta_name}""")
# Here I would construct the mapping from the metadata
return meta_name
udf_constructor = udf(construct_mapping, StringType())
mapping_df = dataframe_tablelist.withColumn("test_column", udf_constructor(dataframe_tablelist.database, dataframe_tablelist.entity, dataframe_tablelist.table_name))
display(mapping_df)
我收到了这个我完全不明白的错误:
PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
我可能可以让它与 collect() 一起工作并逐行附加,但我想以“正确”的方式进行。
您正在 udf 中使用该
spark.sql
函数。这意味着该函数被传递给您的工作节点,而您的 spark 会话并不存在于您的工作节点上。我建议的解决方案是将其保留为普通的 Python 函数而不是 udf。这意味着
collect()
在我看来,您的想法是一个很好的解决方案。从那里开始,您可以调用列表中的函数,并且由于它由驱动程序节点运行,因此您可以引用您的 spark 会话。还要注意,udf 通常比普通 Python 函数效率低,因此在 udf 中执行这些类型的操作通常性能也会较低。