我是 PySpark 的新手,目前在 Databricks 中工作,比较两个具有相同列结构的 DataFrame。我将它们相互比较(本质上是将已加载到数据库中的文件与新文件进行比较)。在此过程中,我使用以下代码计算对每个变量所做的更改次数:
Comparison_DF = DF1_Data_To_Compare.withColumn("Value1_Change", when(col("b.Value1") == col("a.Value1"), 0).otherwise(1))
Comparison_DF = Comparison_DF.withColumn("Value2_Change", when(col("b.Value2") == col("a.Value2"), 0).otherwise(1))
# Summarizing the number of changes
Change_To_Value1 = Comparison_DF.select(sum("Value1_Change"))
Change_To_Value2 = Comparison_DF.select(sum("Value2_Change"))
# Forming the change report DataFrame
# columns=["Type of Change", "Number of Occurrences"]
data = [("Change to Value1", Change_To_Value1), ("Change to Value2", Change_To_Value2)]
rdd = spark.sparkContext.parallelize(data)
print(data)
该行rdd = spark.sparkContext.parallelize(data)
返回错误。检查错误回溯后,我意识到Change_To_Value1
和Change_To_Value2
不是变量而是 DataFrames。该print(data)
语句给出以下结果:[('Change to Value1', DataFrame[sum(Value1_Change): bigint]), ('Change to Value2', DataFrame[sum(Value2_Change): bigint])]
。
我需要形成这种 DataFrame 以将其用作更改报告,以便与 SSIS 包返回的结果进行比较。
我在 StackOverflow 或任何其他开源上都没有找到类似的东西。我尝试构建一个循环语句来收集这些 DataFrame 并直接将它们输入到新的 DataFrame 中,但我也失败了。
有没有办法将这些 DataFrame 转换为 int 变量?或者有没有更好的方法来形成这个 DataFrame?