我只是想将 Rate 与 Structured Streaming 结合使用,以便将每个 MicroBatch 写入多个表名。即在 pyspark 中刷新多个接收器逻辑以准备某些认证。
没有错误,但没有发生持久性。我看的时候已经有一段时间了;一定是一些基本的东西。
在 Databricks 社区版上进行如下编码,没有 Hive 目录。基本内容。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit
spark = SparkSession.builder \
.appName("SimulateKAFKAandMultipleSinks") \
.getOrCreate()
rate_stream = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 1) \
.load()
message_stream = rate_stream.select(
(rate_stream["value"] + 1000).alias("message_id"),
rate_stream["timestamp"].alias("event_time"),
(concat(lit("T"), rate_stream["value"] % 5)).alias("table_id")
)
def append_to_parquet(df, table_id):
table_path = f"/mnt/parquet/{table_id}"
df.write \
.format("parquet") \
.mode("append") \
.option("path", table_path) \
.save()
def process_batch(df, batch_id):
partitioned_df = df.repartition("table_id")
def process_partition(iterator):
for partition in iterator:
first_row_value = df.first()
table_id_value = first_row_value['table_id']
print(f"Writing partition for table_id: {table_id_value}")
partition_df = partition.filter(col("table_id") == table_id_value)
append_to_parquet(partition_df, table_id_value)
partitioned_df.rdd.mapPartitions(process_partition)
query = message_stream.writeStream \
.foreachBatch(process_batch) \
.outputMode("append") \
.option("checkpointLocation", "/mnt/parquet/checkpoints/") \
.start()
query.awaitTermination()
更新。我这边有疏忽,但足够有教育意义。可以使用批处理 KAFKA 来实现,而不是通过结构化流。