Estou apenas tentando usar o Rate com o Structured Streaming, para gravar em vários nomes de tabelas por MicroBatch. Ou seja, apenas atualizando a lógica de vários sinks em preparação para alguma certificação, no pyspark.
Nenhum erro, mas não há persistência ocorrendo. Faz um tempo que não olho; deve ser algo básico.
Codificação como segue no Databricks Community Edition, sem Hive Catalog. Coisas básicas.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit
spark = SparkSession.builder \
.appName("SimulateKAFKAandMultipleSinks") \
.getOrCreate()
rate_stream = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 1) \
.load()
message_stream = rate_stream.select(
(rate_stream["value"] + 1000).alias("message_id"),
rate_stream["timestamp"].alias("event_time"),
(concat(lit("T"), rate_stream["value"] % 5)).alias("table_id")
)
def append_to_parquet(df, table_id):
table_path = f"/mnt/parquet/{table_id}"
df.write \
.format("parquet") \
.mode("append") \
.option("path", table_path) \
.save()
def process_batch(df, batch_id):
partitioned_df = df.repartition("table_id")
def process_partition(iterator):
for partition in iterator:
first_row_value = df.first()
table_id_value = first_row_value['table_id']
print(f"Writing partition for table_id: {table_id_value}")
partition_df = partition.filter(col("table_id") == table_id_value)
append_to_parquet(partition_df, table_id_value)
partitioned_df.rdd.mapPartitions(process_partition)
query = message_stream.writeStream \
.foreachBatch(process_batch) \
.outputMode("append") \
.option("checkpointLocation", "/mnt/parquet/checkpoints/") \
.start()
query.awaitTermination()
Atualização. Supervisão é minha parte, mas educacional o suficiente. Pode fazer isso com KAFKA em lote, não via streaming estruturado.
O problema com seu código não persistindo dados provavelmente decorre de alguns problemas fundamentais na forma como você está usando o Structured Streaming no Spark.
tente este