AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / coding / 问题

问题[pyspark](coding)

Martin Hope
Rakesh Kushwaha
Asked: 2025-03-07 05:30:07 +0800 CST

在 pyspark 中将 json 列更改为 maptype

  • 6

我无法理解为什么在使用 StructType() 和 MapType() 更改以下代码中的 json 列时获取 null

data = [(1, '''{"a": '1'}''')]
df = spark.createDataFrame(data, ("key", "value"))
df.show()
print()
#below lien changes the data of value column to maptype
df1=df.withColumn('data',from_json(df.value,MapType(StringType(),StringType())))
df1.show()
#below line does not changes the data of value column into maptpe
schema = StructType([StructField("a", MapType(StringType(),StringType()))])
df2=df.withColumn('data',from_json(df.value, schema))
df2.show()
# df.withColumn('data',from_json(df.value,MapType(StringType(),StringType()))) produces output
+---+----------+--------+
|key|     value|    data|
+---+----------+--------+
|  1|{"a": '1'}|{a -> 1}|
+---+----------+--------+
#df2=df.withColumn('data',from_json(df.value, schema)) produces output
+---+----------+------+
|key|     value|  data|
+---+----------+------+
|  1|{"a": '1'}|{null}|
+---+----------+------+
pyspark
  • 1 个回答
  • 27 Views
Martin Hope
mabanalyst
Asked: 2025-01-19 04:45:15 +0800 CST

PySpark 过滤

  • 5

目前,我正在对包含借款人如何偿还贷款的信息的数据库进行计算。

从技术角度来看,我正在使用 PySpark,刚刚面临如何使用高级过滤操作的问题。

我的数据框如下所示:

ID     ContractDate LoanSum  ClosingDate Status Bank
ID3    2024-06-10   20                   Active A
ID3    2024-06-11   30                   Active A
ID3    2024-06-12   50                   Active A
ID3    2024-06-12   15                   Active B
ID3    2024-06-12   5        2024-06-18  Closed A
ID3    2024-06-13   40       2024-06-20  Closed A
ID3    2024-06-22   50                   Active A
ID4    2024-07-11   20                   Active A
ID4    2024-07-12   30                   Active B
ID4    2024-07-13   50                   Active B
ID4    2024-07-11   5        2024-08-20  Closed A

我的目标是通过“贷款金额”字段计算借款人的总和,这些借款人拥有同一家银行在首笔贷款发放之日起 3 天内发放的 3 笔或更多有效贷款。

在我的例子中,ID3 的总和为 20 + 30 + 50 = 100

我目前所做的:

from pyspark.sql import functions as f
from pyspark.sql import Window

df = spark.createDataFrame(data).toDF('ID','ContractDate','LoanSum','ClosingDate', 'Status', 'Bank')
df.show()

cols = df.columns
w = Window.partitionBy('ID').orderBy('ContractDate')

df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) \
  .withColumn('Target', f.expr('datediff(ContractDate, PreviousContractDate) >= 0 & datediff(ContractDate, PreviousContractDate) <= 3')) \
  .withColumn('Target', f.col('Target') | f.lead('Target').over(w)) \
  .filter('Target == True')

此代码仅有助于根据 ContractDate 获取向一名借款人发放的贷款。

我如何添加更多条件?

pyspark
  • 1 个回答
  • 38 Views
Martin Hope
Danylo Kuznetsov
Asked: 2025-01-07 18:37:53 +0800 CST

如何在 PySpark Databricks 中将 DataFrame 转换为整数?

  • 5

我是 PySpark 的新手,目前在 Databricks 中工作,比较两个具有相同列结构的 DataFrame。我将它们相互比较(本质上是将已加载到数据库中的文件与新文件进行比较)。在此过程中,我使用以下代码计算对每个变量所做的更改次数:

Comparison_DF = DF1_Data_To_Compare.withColumn("Value1_Change", when(col("b.Value1") == col("a.Value1"), 0).otherwise(1))
Comparison_DF = Comparison_DF.withColumn("Value2_Change", when(col("b.Value2") == col("a.Value2"), 0).otherwise(1))  

# Summarizing the number of changes
Change_To_Value1 = Comparison_DF.select(sum("Value1_Change"))
Change_To_Value2 = Comparison_DF.select(sum("Value2_Change"))

# Forming the change report DataFrame
# columns=["Type of Change", "Number of Occurrences"]
data = [("Change to Value1", Change_To_Value1), ("Change to Value2", Change_To_Value2)]

rdd = spark.sparkContext.parallelize(data)
print(data) 

该行rdd = spark.sparkContext.parallelize(data)返回错误。检查错误回溯后,我意识到Change_To_Value1和Change_To_Value2不是变量而是 DataFrames。该print(data)语句给出以下结果:[('Change to Value1', DataFrame[sum(Value1_Change): bigint]), ('Change to Value2', DataFrame[sum(Value2_Change): bigint])]。

我需要形成这种 DataFrame 以将其用作更改报告,以便与 SSIS 包返回的结果进行比较。

我在 StackOverflow 或任何其他开源上都没有找到类似的东西。我尝试构建一个循环语句来收集这些 DataFrame 并直接将它们输入到新的 DataFrame 中,但我也失败了。

有没有办法将这些 DataFrame 转换为 int 变量?或者有没有更好的方法来形成这个 DataFrame?

pyspark
  • 1 个回答
  • 20 Views
Martin Hope
telecomshy
Asked: 2024-12-28 19:51:47 +0800 CST

PySpark 的 Py4J 错误:为什么一个脚本有效而另一个失败?

  • 6

我已经在笔记本电脑上安装了 PySpark。当我运行以下程序时,一切正常:

spark = SparkSession.builder.appName('pyspark').getOrCreate()
book_local = spark.read.text("data.txt")
book_local.show()

但是,当我运行以下程序时,出现错误:

spark = SparkSession.builder.appName('pyspark').getOrCreate()

my_grocery_list = [
    ["Banana", 2, 1.74],
    ["Apple", 4, 2.04],
    ["Carrot", 1, 1.09],
    ["Cake", 1, 10.99],
]
df_grocery_list = spark.createDataFrame(my_grocery_list)
df_grocery_list.show()   # This is where the error is thrown

错误信息是:

Py4JJavaError: java.io.IOException: Cannot run program "python3"

设置环境变量后,一切恢复正常。

import os
import sys
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

我的问题是,为什么第一个程序运行没有问题,但第二个程序却抛出了 Py4J 错误?第一个程序根本没有使用 Py4J 包吗?

此外,当我尝试用以下代码替换环境变量配置时:

spark = SparkSession.builder.appName('pyspark').config("spark.pyspark.python", sys.executable).getOrCreate()

我仍然遇到错误。

pyspark
  • 1 个回答
  • 24 Views
Martin Hope
LearneR
Asked: 2024-12-19 15:26:24 +0800 CST

在 Pyspark TempView 中,BooleanType 列中的 NULL 值的比较无法按预期进行

  • 5

我的 PySpark 笔记本中有一个TempView。当我在视图上运行 SQL 查询时,WHERE以下条件不会产生预期的结果:

is_valid != false

对于传入的记录,is_valid列(BooleanType字段)包含一个null值。我期望条件is_valid != false计算为,true因为null显然不等于false,对吗?

然而,事实并非如此。相反,它的计算结果为false。

我不确定我的情况出了什么问题。有什么想法吗?

pyspark
  • 2 个回答
  • 29 Views
Martin Hope
MetallicPriest
Asked: 2024-11-08 21:44:29 +0800 CST

如何对 Delta 表中的结构类型数组使用 LIKE 条件

  • 5

我有一个从 Databricks 访问的增量表。我有一个结构体数组类型的列。如果我想查看数组中任何元素的字段是否包含某个元素,我可以使用以下命令。

array_contains(transform(ArrayCol, x -> x.f), 'something')

但是现在我不想进行完全匹配,而是进行类似于的操作LIKE 'some%'。我该如何实现?基本上,我想返回true数组中的任何元素是否具有f包含此处模式的字段'some%'。

pyspark
  • 1 个回答
  • 28 Views
Martin Hope
user1783504
Asked: 2024-11-01 01:16:02 +0800 CST

Pyspark-立方体聚合

  • 6

我正在尝试使用 pyspark 中的多维数据集函数,但不包含多维数据集中的所有列。

我想要实现的 SQL 等效目标:

从表中按 col1、col2、col3、sum(col4) 分组,选择 col1、col2、col3

这将按 col1 以及 col2 和 col3 的所有组合对组进行分组

在 pyspark 中,运行以下命令时,我收到消息 GroupedData 对象没有属性“cube”

spark.table("table").groupBy(col1).cube(col2,col3).agg(sum(col4))

我可以使用 cube,但是我需要包含我不想要的 col1

spark.table("table").cube(col1,col2,col3).agg(sum(col4))

pyspark
  • 1 个回答
  • 25 Views
Martin Hope
Pachu Martinez
Asked: 2024-09-04 22:32:47 +0800 CST

如何设置使用 pyspark 读取 JDBC 的 upperBound 和 lowerBound 格式

  • 5

尝试使用 pyspark 从 JDBC读取。在 JDBC 中,有一列 FEC_PART 作为日期类型,格式为 yyyymmdd。对于读取,参数upperBound或lowerBound与所需格式yyyymmdd不匹配:

  • 使用所需的格式yyyymmdd,出现无法识别日期格式的错误: pyspark.sql.utils.IllegalArgumentException: Cannot parse the bound value 20200112 as date
  • 格式为yyyy-mm-dd,与 JDBC 中 FEC_PART 的格式不匹配。它显示此错误 java.sql.SQLDataException: ORA-01861: literal does not match format string: WHERE "ARQPIB_FEC_PART" < '2020-01-13' or "ARQPIB_FEC_PART" is null , Error Msg = ORA-01861: literal does not match format string

使用spark进行read.load()时出现错误。

input_data = spark.read \
    .format(constants.FORMAT_JDBC) \
    .options(**properties) \
    .option("partitionColumn", "FEC_PART")  # Keep partition column as it is
    .option("lowerBound", "20200112")  # Use the yyyymmdd format for bounds to match partitionColumn
    .option("upperBound", "20200114")  # Use the yyyymmdd format for bounds to match partitionColumn
    .option("numPartitions", "2") \
    .load()

第一种方法。尝试添加此选项:

.option("oracle.jdbc.mapDateToTimestamp", "false")
.option("sessionInitStatement", "ALTER SESSION SET NLS_DATE_FORMAT = 'YYYYMMDD'")

采用另一种选择的第二种方法:

.option("dateFormat", "yyyyMMdd")

又尝试了一些方法,但都没有任何结果。

pyspark
  • 1 个回答
  • 22 Views
Martin Hope
elokema
Asked: 2024-08-30 23:08:48 +0800 CST

Pyspark:编写一个通用函数

  • 5

我想在函数pyspark中写入这部分

df = (df.withColumn("January", F.lit(None).cast('double'))
        .withColumn("February", F.lit(None).cast('double'))
        .withColumn("March", F.lit(None).cast('double'))
        .withColumn("April", F.lit(None).cast('double'))
        .withColumn("May", F.lit(None).cast('double'))
        .withColumn("June", F.lit(None).cast('double'))
        .withColumn("July", F.lit(None).cast('double'))
        .withColumn("August", F.lit(None).cast('double'))
        .withColumn("September", F.lit(None).cast('double'))
        .withColumn("November", F.lit(None).cast('double'))
        .withColumn("December", F.lit(None).cast('double'))

pyspark
  • 1 个回答
  • 20 Views
Martin Hope
1zftqITss0
Asked: 2024-08-27 18:28:06 +0800 CST

PySpark 将复杂函数应用于数据框的每一行以构造新列

  • 5

我在 Microsoft Fabric 中使用 Spark Notebook。我想从 Lakehouse 中的元数据构建列映射。该映射应写入带有表列表的数据框中的“映射”列中。

我目前的尝试如下:

# Create initial list of table data
dataframe_tablelist = spark.createDataFrame(
    [
        ("abcd", "AB", "t1"),
        ("efgh", "CD", "t2"),
        ("efgh", "CD", "t3"),
    ],
    ["database", "entity", "table_name"]
)

def construct_mapping(database, entity, table_name):
    meta_name = "Metadata_" + database + "_" + entity + "_" + table_name
    metadata = spark.sql(f"""select * from {meta_name}""")
    # Here I would construct the mapping from the metadata
    return meta_name

udf_constructor = udf(construct_mapping, StringType())

mapping_df = dataframe_tablelist.withColumn("test_column", udf_constructor(dataframe_tablelist.database, dataframe_tablelist.entity, dataframe_tablelist.table_name))

display(mapping_df)

我收到了这个我完全不明白的错误:

PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

我可能可以让它与 collect() 一起工作并逐行附加,但我想以“正确”的方式进行。

pyspark
  • 1 个回答
  • 25 Views

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    重新格式化数字,在固定位置插入分隔符

    • 6 个回答
  • Marko Smith

    为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会?

    • 2 个回答
  • Marko Smith

    VScode 自动卸载扩展的问题(Material 主题)

    • 2 个回答
  • Marko Smith

    Vue 3:创建时出错“预期标识符但发现‘导入’”[重复]

    • 1 个回答
  • Marko Smith

    具有指定基础类型但没有枚举器的“枚举类”的用途是什么?

    • 1 个回答
  • Marko Smith

    如何修复未手动导入的模块的 MODULE_NOT_FOUND 错误?

    • 6 个回答
  • Marko Smith

    `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它?

    • 3 个回答
  • Marko Smith

    在 C++ 中,一个不执行任何操作的空程序需要 204KB 的堆,但在 C 中则不需要

    • 1 个回答
  • Marko Smith

    PowerBI 目前与 BigQuery 不兼容:Simba 驱动程序与 Windows 更新有关

    • 2 个回答
  • Marko Smith

    AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String”

    • 1 个回答
  • Martin Hope
    Fantastic Mr Fox msvc std::vector 实现中仅不接受可复制类型 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant 使用 chrono 查找下一个工作日 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor 构造函数的成员初始化程序可以包含另一个成员的初始化吗? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský 为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul C++20 是否进行了更改,允许从已知绑定数组“type(&)[N]”转换为未知绑定数组“type(&)[]”? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann 为什么 {2,3,10} 和 {x,3,10} (x=2) 的顺序不同? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller 在 5.2 版中,bash 条件语句中的 [[ .. ]] 中的分号现在是可选的吗? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench 为什么双破折号 (--) 会导致此 MariaDB 子句评估为 true? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng 为什么 `dict(id=1, **{'id': 2})` 有时会引发 `KeyError: 'id'` 而不是 TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String” 2024-03-20 03:12:31 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve