Executando um código pyspark simples que está sendo executado em 1 driver (16 núcleos) e 2 nós de trabalho (total de 32 núcleos). Tenho dados de entrada no valor de 1 hora para dados de aproximadamente 33 GB de um único dia. Os dados de entrada também possuem uma coluna país e o número de países distintos nos dados é 968.
Estou escrevendo os dados particionados por data e país.
results.write.partitionBy("date","country").format("delta").save("<path>")
O estágio que está gravando no local de destino tem um total de 607 tarefas, com 32 tarefas em execução em paralelo [384/607 (32 em execução)].
Pelo meu entendimento,
- spark grava 1 arquivo por partição
- número de tarefas = número de partições
- então número de tarefas = número de partições = número de arquivos
Pergunta - Aqui neste estágio que está gravando no local de destino (160/607 (32 em execução)) eu tenho um total de 607 tarefas, então não devo gravar apenas 607 arquivos. Em vez disso, em cada pasta data + país, foi gerado um número aleatório de arquivos.
Acabei de notar que você está usando
delta
format. Esse é um jogo totalmente diferente de "faísca".delta.targetFileSize
,delta.tuneFileSizesForRewrites
Delta é um formato gerenciado/de nível superior. Por exemplo, ele mantém o histórico, permite pular dados, permite
OPTIMIZE
combinar arquivos parquet menores subjacentes em arquivos maiores e muito mais. Para conseguir tudo isso, ele manipula muitos arquivos de metadados, além de arquivos parquet nos bastidores.delta
O formato é MUITO diferente dos formatos Vanilla Spark, como csv/parquet/etc. e de forma alguma comparável.Por padrão: Sim.
Em geral: Não. A afirmação correta é que o spark grava pelo menos 1 arquivo por partição.
Um valor diferente de zero para
spark.sql.files.maxRecordsPerFile
(Número máximo de registros a serem gravados em um único arquivo. Se esse valor for zero ou negativo, não há limite) pode levar a uma contagem de arquivos maior que a contagem de partições.Observe também que se você estiver lendo de alguma fonte (em vez de criar o dataframe programaticamente, digamos, usando
.repartition(N)
), certas configurações (por exemplospark.sql.files.maxPartitionBytes
) podem afetar quantas partições são criadas pelo leitor, o que pode não ser igual ao número de partições.Portanto,
spark.read.csv('path-to-csvs-with-10-partitions').write.csv('output')
pode produzir mais de 10 partições no formatooutput
.Uma maneira fácil de entender a relação entre o número de partições e o número de arquivos com os quais você pode escrever o dataframe
paritionBy()
.produz:
Ver:
Talvez a soma do número de arquivos em cada pasta seja 607?