运行一个简单的 pyspark 代码,该代码在 1 个驱动程序(16 个核心)和 2 个工作节点(总共 32 个核心)上运行。我输入的单日数据约为 1 小时数据~33GB 数据。输入数据还有一个国家列,数据中不同国家的数量为 968 个。
我正在按日期和国家/地区分区写入数据。
results.write.partitionBy("date","country").format("delta").save("<path>")
写入目标位置的阶段共有 607 个任务,其中 32 个任务并行运行 [384/607(32 个运行)]。
据我了解,
- Spark 每个分区写入 1 个文件
- 任务数 = 分区数
- 所以任务数 = 分区数 = 文件数
问题- 在此阶段,正在写入目标位置(160/607(32 个正在运行)),我总共有 607 个任务,因此不应触发仅写入 607 个文件。相反,在每个日期+国家/地区文件夹下,它生成了随机数量的文件。
刚刚注意到您正在使用
delta
格式。这与“火花”完全不同。delta.targetFileSize
,delta.tuneFileSizesForRewrites
Delta 是一种更高级别/托管格式。例如,它保留历史记录,允许数据跳过,提供
OPTIMIZE
将底层较小的镶木地板文件组合成更大的文件等等。为了实现这一切,除了镶木地板文件之外,它还在幕后处理大量元数据文件。delta
格式与 csv/parquet/等普通 Spark 格式非常不同。并且毫无可比性。默认:是。
一般来说:不会。正确的说法是 Spark每个分区至少写入1 个文件。
spark.sql.files.maxRecordsPerFile
(写入单个文件的最大记录数。如果该值为零或负数,则没有限制)的非零值可能会导致文件计数大于分区计数。另请注意,如果您正在从某些源读取(与以编程方式创建数据帧相反,例如使用
.repartition(N)
),则某些配置(例如spark.sql.files.maxPartitionBytes
)可能会影响读取器创建的分区数量,这可能不等于分区数量。因此
spark.read.csv('path-to-csvs-with-10-partitions').write.csv('output')
可能会在output
.一种简单的方法来理解分区数量和文件数量之间的关系,您可以使用
paritionBy()
.产生:
看:
也许每个文件夹下的文件数量之和是607?