AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / coding / 问题 / 79367947
Accepted
mabanalyst
mabanalyst
Asked: 2025-01-19 04:45:15 +0800 CST2025-01-19 04:45:15 +0800 CST 2025-01-19 04:45:15 +0800 CST

PySpark 过滤

  • 772

目前,我正在对包含借款人如何偿还贷款的信息的数据库进行计算。

从技术角度来看,我正在使用 PySpark,刚刚面临如何使用高级过滤操作的问题。

我的数据框如下所示:

ID     ContractDate LoanSum  ClosingDate Status Bank
ID3    2024-06-10   20                   Active A
ID3    2024-06-11   30                   Active A
ID3    2024-06-12   50                   Active A
ID3    2024-06-12   15                   Active B
ID3    2024-06-12   5        2024-06-18  Closed A
ID3    2024-06-13   40       2024-06-20  Closed A
ID3    2024-06-22   50                   Active A
ID4    2024-07-11   20                   Active A
ID4    2024-07-12   30                   Active B
ID4    2024-07-13   50                   Active B
ID4    2024-07-11   5        2024-08-20  Closed A

我的目标是通过“贷款金额”字段计算借款人的总和,这些借款人拥有同一家银行在首笔贷款发放之日起 3 天内发放的 3 笔或更多有效贷款。

在我的例子中,ID3 的总和为 20 + 30 + 50 = 100

我目前所做的:

from pyspark.sql import functions as f
from pyspark.sql import Window

df = spark.createDataFrame(data).toDF('ID','ContractDate','LoanSum','ClosingDate', 'Status', 'Bank')
df.show()

cols = df.columns
w = Window.partitionBy('ID').orderBy('ContractDate')

df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) \
  .withColumn('Target', f.expr('datediff(ContractDate, PreviousContractDate) >= 0 & datediff(ContractDate, PreviousContractDate) <= 3')) \
  .withColumn('Target', f.col('Target') | f.lead('Target').over(w)) \
  .filter('Target == True')

此代码仅有助于根据 ContractDate 获取向一名借款人发放的贷款。

我如何添加更多条件?

pyspark
  • 1 1 个回答
  • 38 Views

1 个回答

  • Voted
  1. Best Answer
    Vamsi Bitra
    2025-01-19T08:47:36+08:002025-01-19T08:47:36+08:00

    要解决您的问题,请按照以下代码操作。作为示例,我使用上述数据框。

    代码:

    from pyspark.sql import functions as f
    from pyspark.sql import Window
    
    data1 = [
        ('ID3', '2024-06-10', 20, None, 'Active', 'A'),
        ('ID3', '2024-06-11', 30, None, 'Active', 'A'),
        ('ID3', '2024-06-12', 50, None, 'Active', 'A'),
        ('ID3', '2024-06-12', 15, None, 'Active', 'B'),
        ('ID3', '2024-06-12', 5, '2024-06-18', 'Closed', 'A'),
        ('ID3', '2024-06-13', 40, '2024-06-20', 'Closed', 'A'),
        ('ID3', '2024-06-22', 50, None, 'Active', 'A'),
        ('ID4', '2024-07-11', 20, None, 'Active', 'A'),
        ('ID4', '2024-07-12', 30, None, 'Active', 'B'),
        ('ID4', '2024-07-13', 50, None, 'Active', 'B'),
        ('ID4', '2024-07-11', 5, '2024-08-20', 'Closed', 'A'),
    ]
    
    
    df12 = spark.createDataFrame(data1, ['ID', 'ContractDate', 'LoanSum', 'ClosingDate', 'Status', 'Bank']) \
        .withColumn('ContractDate', f.to_date('ContractDate')) \
        .filter(f.col('Status') == 'Active')
    
    # Use  window function and calculate cumulative count
    w = Window.partitionBy('ID', 'Bank').orderBy('ContractDate')
    df = df12.withColumn('CumulativeCount', f.sum(
        f.when(f.datediff(f.col('ContractDate'), f.lag('ContractDate').over(w)).isNull(), 1)
        .when(f.datediff(f.col('ContractDate'), f.lag('ContractDate').over(w)) <= 3, 1)
        .otherwise(0)
    ).over(w))
    
    df1 = df.filter(f.col('CumulativeCount') >= 3).groupBy('ID', 'Bank').agg(f.sum('LoanSum').alias('TotalLoanSum'))
    
    
    display(df1)
    

    输出:

    在此处输入图片描述

    • 1

相关问题

  • 如何在spark sql中左移列值?

  • Databricks Autoloader / writeStream:如何重试?

  • 在 pyspark 中对数据帧应用逻辑运算

  • pyspark 将可变长度数组类型的列拆分为两个较小的数组

  • 为什么当我们进行转换时 Spark sql 会跳过毫秒

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    重新格式化数字,在固定位置插入分隔符

    • 6 个回答
  • Marko Smith

    为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会?

    • 2 个回答
  • Marko Smith

    VScode 自动卸载扩展的问题(Material 主题)

    • 2 个回答
  • Marko Smith

    Vue 3:创建时出错“预期标识符但发现‘导入’”[重复]

    • 1 个回答
  • Marko Smith

    具有指定基础类型但没有枚举器的“枚举类”的用途是什么?

    • 1 个回答
  • Marko Smith

    如何修复未手动导入的模块的 MODULE_NOT_FOUND 错误?

    • 6 个回答
  • Marko Smith

    `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它?

    • 3 个回答
  • Marko Smith

    在 C++ 中,一个不执行任何操作的空程序需要 204KB 的堆,但在 C 中则不需要

    • 1 个回答
  • Marko Smith

    PowerBI 目前与 BigQuery 不兼容:Simba 驱动程序与 Windows 更新有关

    • 2 个回答
  • Marko Smith

    AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String”

    • 1 个回答
  • Martin Hope
    Fantastic Mr Fox msvc std::vector 实现中仅不接受可复制类型 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant 使用 chrono 查找下一个工作日 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor 构造函数的成员初始化程序可以包含另一个成员的初始化吗? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský 为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul C++20 是否进行了更改,允许从已知绑定数组“type(&)[N]”转换为未知绑定数组“type(&)[]”? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann 为什么 {2,3,10} 和 {x,3,10} (x=2) 的顺序不同? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller 在 5.2 版中,bash 条件语句中的 [[ .. ]] 中的分号现在是可选的吗? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench 为什么双破折号 (--) 会导致此 MariaDB 子句评估为 true? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng 为什么 `dict(id=1, **{'id': 2})` 有时会引发 `KeyError: 'id'` 而不是 TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String” 2024-03-20 03:12:31 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve