AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / coding / 问题

问题[apache-spark](coding)

Martin Hope
Dhruv
Asked: 2025-04-29 16:15:24 +0800 CST

Spark中shuffle的定义

  • 5

我知道,与同分区 DataFrame 的连接不被视为宽变换。以下是原文中关于宽变换和窄变换的定义。

窄依赖,其中父 RDD 的每个分区最多被子 RDD 的一个分区使用,宽依赖,其中多个子分区可能依赖于它。

即使 DataFrame 是共分区的,也不一定意味着它们对应的分区位于同一节点上。例如,的分区 P1df1和 的 P1df2可能位于不同的节点上。因此,在连接期间仍然需要进行数据传输(例如,将 的 P1 移动df1到 的 P1 节点df2)。然而,这并不被视为 Shuffle。

我有两个问题?

  1. 那么,shuffle 到底是什么?我知道并非所有网络数据传输都被视为 shuffle。
  2. 哪些类型的数据传输被视为 shuffle?只有涉及宽转换的数据传输才被视为 shuffle 吗?
apache-spark
  • 1 个回答
  • 27 Views
Martin Hope
Pirvu Georgian
Asked: 2025-04-15 17:45:25 +0800 CST

使用 Coalesce 在 Databricks 上实现自适应查询执行 Spark

  • 5

AQE - Adaptive Query Execution作为工程师,当我们谈论Spark/Databricks时,我们可能会忽略一些事情:

如果您使用coalesce()AQE 来减少分区,AQE 将不会对其进行任何操作。它不会检测倾斜,不会重新分区,也不会进行优化。因为它coalesce()不会执行完全重排(例如repartition()),而是合并现有分区而不进行重新分配。这就是数据倾斜会在之后悄然出现coalesce()并中断或减慢作业速度的原因。我发现文档有点不清楚。AQE 会在您执行触发repartition()完全重排的操作后进行干预。这是正确的理解吗?文档似乎对这种情况不太清楚。

在此处输入图片描述

apache-spark
  • 1 个回答
  • 30 Views
Martin Hope
Kashyap
Asked: 2025-03-07 05:07:05 +0800 CST

根据日期或默认值选择一行

  • 5

我有两个数据框:

        df_rates                        df_trades
(rate from currency -> USD)   
+--------+----------+----+    +---+--------+------+----------+    
|currency| rate_date|rate|    | id|currency|amount|trade_date|    
+--------+----------+----+    +---+--------+------+----------+    
|     EUR|2025-01-09|1.19|    |  1|     EUR|  1000|2025-01-09| # exact rate available
|     EUR|2025-01-08|1.18|    |  2|     CAD|  1000|2025-01-09| # 1 day prior rate available
|     CAD|2025-01-08|0.78|    |  3|     AUD|  1000|2025-01-09| # no applicable rate available
|     CAD|2025-01-07|0.77|    |  4|     HKD|  1000|2025-01-09| # no rate available at all
|     AUD|2025-02-09|1.39|    +---+--------+------+----------+    
|     AUD|2025-02-08|1.38|                                              
+--------+----------+----+                                              

对于每笔交易,我都需要应用适当的汇率来计算 usd_amount。选择汇率的方法是:

  • 查找rate​trade_date
  • 如果不可用trade_date则返回最多 7 天

如果以这种方式找不到利率,则usd_amount = null

我有以下有效的代码。但我不确定它是否可以扩展。特别是对于这种情况trade_id = 3(当有可用利率但没有正确的日期范围时),因为实际上利率表有 1000 个利率(可追溯到 5-7 年前)。部分代码标记This PART如下。

是否存在其他逻辑可以更有效地实现这一目标?

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, row_number, date_diff, when

def log_dataframe(df, msg):
    print(msg)
    df.show()

def calc_usd_amount(df_trades, df_rates):
    df = df_trades.join(df_rates, how='left_outer', on='currency').withColumn('date_diff', date_diff('trade_date', 'rate_date'))
    date_diff_no_good = (col('date_diff') < 0) | (col('date_diff') > 7)
    # This PART
    df = (
        df.withColumns({
            'rate_date': when(date_diff_no_good, None).otherwise(col('rate_date')),
            'rate': when(date_diff_no_good, None).otherwise(col('rate')),
        })
        .drop_duplicates(['id', 'rate_date', 'rate'])
    )
    w_spec = row_number().over(Window.partitionBy(col('id'), col('currency')).orderBy(col('rate_date').desc()))
    df = (
        df.filter('rate_date IS NULL OR (rate_date <= trade_date AND rate_date > (trade_date - 7))')
        .withColumn('rate_row_num', w_spec).filter('rate_row_num == 1')
        .withColumn('usd_amount', col('rate') * col('amount'))
    )
    return df.drop('date_diff', 'rate_row_num')

from pyspark.sql import SparkSession
from datetime import date

spark = SparkSession.builder.getOrCreate()
dt = date.fromisoformat
df_trades = spark.createDataFrame(
    data = [
        (1, 'EUR', 1000, dt('2025-01-09')),  # trade date rate available
        (2, 'CAD', 1000, dt('2025-01-09')),  # trade date -1d, rate available
        (3, 'AUD', 1000, dt('2025-01-09')),  # no applicable rate available
        (4, 'HKD', 1000, dt('2025-01-09')),  # no rate available at all
    ],
    schema=['id', 'currency', 'amount', 'trade_date'],
)
df_rates = spark.createDataFrame(
    data = [
        ('EUR', dt('2025-01-09'), 1.19),  # trade date rate available
        ('EUR', dt('2025-01-08'), 1.18),
        ('CAD', dt('2025-01-08'), 0.78),  # trade date -1d, rate available
        ('CAD', dt('2025-01-07'), 0.77),
        ('AUD', dt('2025-02-09'), 1.39),  # no applicable rate available
        ('AUD', dt('2025-02-08'), 1.38),
    ],
    schema=['currency', 'rate_date', 'rate']
)

df_out = calc_usd_amount(df_trades, df_rates)
log_dataframe(df_out, 'df_out')

印刷:

df_out
+--------+---+------+----------+----------+----+----------+
|currency| id|amount|trade_date| rate_date|rate|usd_amount|
+--------+---+------+----------+----------+----+----------+
|     EUR|  1|  1000|2025-01-09|2025-01-09|1.19|    1190.0|
|     CAD|  2|  1000|2025-01-09|2025-01-08|0.78|     780.0|
|     AUD|  3|  1000|2025-01-09|      NULL|NULL|      NULL|
|     HKD|  4|  1000|2025-01-09|      NULL|NULL|      NULL|
+--------+---+------+----------+----------+----+----------+
apache-spark
  • 1 个回答
  • 38 Views
Martin Hope
detcle
Asked: 2025-02-25 20:44:24 +0800 CST

repartition() 是否在触发操作之前始终进行随机排序

  • 6

我读到 repartition() 将被延迟评估,因为它是一种转换,并且转换仅在操作上触发。

但是,我认为在基于列值进行任何重新分区之前,Spark 必须先加载所有数据。换句话说,我的理解是,所有数据仍将按原样加载,不进行任何重新分区或优化,只有这样 Spark 才会进行重新分区。并且无论如何,repartition() 都会始终对数据进行混洗,即使在触发任何操作之前调用它也是如此。我的理解正确吗?

df = spark.createDataFrame(data, ["id", "name", "age"])
repartitioned_df = df.repartition("age")
... # action triggered later
apache-spark
  • 1 个回答
  • 32 Views
Martin Hope
user3579222
Asked: 2025-01-20 00:30:59 +0800 CST

阅读以前的 Spark API

  • 6

在使用以前的 Spark 版本时,我总是在指定列名时感到困惑:我应该使用String还是col object。

3.1.2 版中的 regexp_replace示例:

pyspark.sql.functions.regexp_replace(str,pattern,replacement)[来源]

我正在运行版本 3.1.2 的集群,并且两者都可以正常工作:

df1.withColumn("modality",F.regexp_replace(F.col("name"),"i","")).display()
df1.withColumn("modality",F.regexp_replace("name","i","")).display()

从文档中我本来以为只允许使用字符串,但实际上两者都可以。我如何在 API 文档中看到是否还允许使用 col 对象(在最新的 API 中这一点非常清楚,但在之前的 API 中却不是这样)。

apache-spark
  • 1 个回答
  • 32 Views
Martin Hope
smurphy
Asked: 2025-01-09 11:25:08 +0800 CST

根据另一列中提供的列名数组创建具有值的列

  • 6

我想创建一个新列,其中包含列中列出的列名的值数组lookup。

示例输入

input_df = spark.createDataFrame([
    Row(id=123, alert=1, operation=1, lookup=[]),
    Row(id=234, alert=0, operation=0, lookup=['alert']),
    Row(id=345, alert=1, operation=0, lookup=['operation']),
    Row(id=456, alert=0, operation=1, lookup=['alert', 'operation']),
])

预期输出

ID 警报 手术 抬头 查找值
123 1 1 [] []
234 0 0 [alert] [0]
345 1 0 [operation] [0]
456 0 1 [alert, operation] [0, 1]

我尝试过

input_df.withColumn("lookup_values", F.transform(F.col("lookup"), lambda x: input_df[f'{x}'])).show()

失败并出现以下错误:

AnalysisException:[UNRESOLVED_COLUMN.WITH_SUGGESTION] 无法解析名称为 的列或函数参数Column<'x_1'>。您是指下列之一吗?[ id, alert, operation, lookup]。

这个错误令人惊讶,因为下面的代码虽然没有产生预期的结果,但却没有产生错误:

input_df.withColumn("lookup_values", F.transform(F.col("lookup"), lambda x: input_df['alert'])).show()
ID 警报 手术 抬头 查找值
123 1 1 [] []
234 0 0 [alert] [0]
345 1 0 [operation] [1]
456 0 1 [alert, operation] [0, 0]
apache-spark
  • 2 个回答
  • 51 Views
Martin Hope
JFlo
Asked: 2025-01-03 00:07:17 +0800 CST

在 PySpark 笔记本中读取多个 Parquet 文件

  • 5

当将多个镶木地板文件读入数据框时,它似乎会随后对每个镶木地板文件进行评估以进行后续转换,而它应该对数据框进行评估。

我正在使用 pyspark 在 fabric notebook 中工作。我试图将多个 parquet 文件读入一个数据框。每个 parquet 文件的列数相同,但列架构可能不同,例如,一个名为“adjustment”的列可能是 int 类型,但如果留空,则类型为 string。我目前正在将文件读入我的数据框,如下所示

df = spark.read.schema(schema).parquet(*files).withColumn(
    "file_name", split(input_file_name(), "/").getItem(8)
)

我在其中指定了架构,文件是我想要从我的湖中加载的文件的文件路径列表。File_name 只是包含日期的文件的名称。

当我跑步时

display(df.where(col("file_name").contains("2024-10-01")))

它似乎可以很好地显示数据框,类似于 display(df),但是当我运行

display(df.where(col("file_name").contains("2024-12-01")))

它给了我这个错误

org.apache.spark.SparkException: Parquet column cannot be converted in file abfss://[email protected]/lakehouse/path/to/my/data/Data_2024-12-01. Column: [data.adjustment], Expected: string, Found: INT64.

我尝试过指定架构,尝试过 .cache() 或 .persist() 数据框,但每次都出现此错误。我认为这与惰性求值有关,但除了分别读取每个 parquet 文件,然后在对每列强制执行架构更改后将它们合并之外,我实在想不出还能做什么。在此先感谢您的帮助

apache-spark
  • 1 个回答
  • 34 Views
Martin Hope
Tang Chen
Asked: 2024-12-31 16:21:33 +0800 CST

HIVE中grouping_id计算逻辑差异

  • 8

我最近在使用 Hive 2.3.9 版本时遇到了一个问题。

当我的分组依据设置为“a,b,c”并且分组集设置为(a,b,(a,b))时。

在上一个 Hive 版本中,grouping__id 的结果是当 "a" 时为 1 当 "b" 时为 2 当 "a,b" 时为 3。但是在 2.3.9 版本中,我发现结果是当 "a" 时为 1 当 "b" 时为 2 当 "a, b" 时为 0。新的正则与 Spark grouping__id Regular 相同。

我记得Hive Grouping__id的规则是从低到高,默认都是0,当出现维度时就重置为1,最后进行小数化。

但现在看上去好像已经改变了。

那么 Hive 做出了一些改变吗?

apache-spark
  • 1 个回答
  • 50 Views
Martin Hope
Ged
Asked: 2024-12-26 21:38:52 +0800 CST

Databricks 社区版中多个接收器处理无法持久化

  • 5

我只是想将 Rate 与 Structured Streaming 结合使用,以便将每个 MicroBatch 写入多个表名。即在 pyspark 中刷新多个接收器逻辑以准备某些认证。

没有错误,但没有发生持久性。我看的时候已经有一段时间了;一定是一些基本的东西。

在 Databricks 社区版上进行如下编码,没有 Hive 目录。基本内容。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit

spark = SparkSession.builder \
    .appName("SimulateKAFKAandMultipleSinks") \
    .getOrCreate()

rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 1) \
    .load()

message_stream = rate_stream.select(
    (rate_stream["value"] + 1000).alias("message_id"),              
    rate_stream["timestamp"].alias("event_time"),                    
    (concat(lit("T"), rate_stream["value"] % 5)).alias("table_id")   
)

def append_to_parquet(df, table_id):
    table_path = f"/mnt/parquet/{table_id}"   

    df.write \
        .format("parquet") \
        .mode("append") \
        .option("path", table_path) \
        .save()

def process_batch(df, batch_id):
    partitioned_df = df.repartition("table_id")
    def process_partition(iterator):
        for partition in iterator:
            first_row_value = df.first()
            table_id_value = first_row_value['table_id']
            print(f"Writing partition for table_id: {table_id_value}") 
            partition_df = partition.filter(col("table_id") == table_id_value)
            append_to_parquet(partition_df, table_id_value)

    partitioned_df.rdd.mapPartitions(process_partition)

query = message_stream.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .option("checkpointLocation", "/mnt/parquet/checkpoints/") \
    .start()

query.awaitTermination()

更新。我这边有疏忽,但足够有教育意义。可以使用批处理 KAFKA 来实现,而不是通过结构化流。

apache-spark
  • 1 个回答
  • 44 Views
Martin Hope
xsa xsa
Asked: 2024-12-10 12:38:32 +0800 CST

pyspark 数据框转换

  • 5

我有一个如下的数据框:

f1     |f2
=========
test   | [{"f3": 1, "f4": "f4_1" }, {"f3": 2, "f4": "f4_2" }] 

f2是对象列表

我想要获得如下所示的数据框:

f3|f4    | temp_col
=========================
1 |"f4_1"| {"f1": "test"}
2 |"f4_2"| {"f1": "test"}

temp_col是我提供的名字。

我如何使用 pyspark 来实现这一点?

我曾尝试json_normalize通过转换为 pandas df 来使用,但没有效果。

apache-spark
  • 2 个回答
  • 46 Views

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    重新格式化数字,在固定位置插入分隔符

    • 6 个回答
  • Marko Smith

    为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会?

    • 2 个回答
  • Marko Smith

    VScode 自动卸载扩展的问题(Material 主题)

    • 2 个回答
  • Marko Smith

    Vue 3:创建时出错“预期标识符但发现‘导入’”[重复]

    • 1 个回答
  • Marko Smith

    具有指定基础类型但没有枚举器的“枚举类”的用途是什么?

    • 1 个回答
  • Marko Smith

    如何修复未手动导入的模块的 MODULE_NOT_FOUND 错误?

    • 6 个回答
  • Marko Smith

    `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它?

    • 3 个回答
  • Marko Smith

    在 C++ 中,一个不执行任何操作的空程序需要 204KB 的堆,但在 C 中则不需要

    • 1 个回答
  • Marko Smith

    PowerBI 目前与 BigQuery 不兼容:Simba 驱动程序与 Windows 更新有关

    • 2 个回答
  • Marko Smith

    AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String”

    • 1 个回答
  • Martin Hope
    Fantastic Mr Fox msvc std::vector 实现中仅不接受可复制类型 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant 使用 chrono 查找下一个工作日 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor 构造函数的成员初始化程序可以包含另一个成员的初始化吗? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský 为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul C++20 是否进行了更改,允许从已知绑定数组“type(&)[N]”转换为未知绑定数组“type(&)[]”? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann 为什么 {2,3,10} 和 {x,3,10} (x=2) 的顺序不同? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller 在 5.2 版中,bash 条件语句中的 [[ .. ]] 中的分号现在是可选的吗? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench 为什么双破折号 (--) 会导致此 MariaDB 子句评估为 true? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng 为什么 `dict(id=1, **{'id': 2})` 有时会引发 `KeyError: 'id'` 而不是 TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String” 2024-03-20 03:12:31 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve