AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / coding / Perguntas / 77120311
Accepted
Naveen Balachandran
Naveen Balachandran
Asked: 2023-09-17 10:48:24 +0800 CST2023-09-17 10:48:24 +0800 CST 2023-09-17 10:48:24 +0800 CST

Aplicar operação lógica em um dataframe no pyspark

  • 772

Tenho um dataframe com muitas colunas e em uma das colunas tenho a operação lógica que preciso realizar no dataframe. Como exemplo, veja o dataframe abaixo insira a descrição da imagem aqui

Preciso realizar a operação lógica definida na coluna operação lógica nas linhas relevantes

Em um cenário normal, consigo usar expr(). Mas, neste caso, quando quero lê-lo de uma coluna e depois aplicar, ocorre um erro dizendo que a coluna não é iterável.

Alguma sugestão?

pyspark
  • 2 2 respostas
  • 31 Views

2 respostas

  • Voted
  1. Best Answer
    user238607
    2023-09-17T20:56:45+08:002023-09-17T20:56:45+08:00

    Aqui está uma solução usando scala UDF no pyspark, pois eles são mais rápidos que os UDFs python. Você pode encontrar o código para o UDF e o jar de lançamento usado no script pyspark no repositório a seguir.

    Se você quiser fazer modificações na função UDF para atender às suas necessidades futuras, tudo o que você precisa fazer é executar sbt assemblypara compilar o jar.

    Em seguida, chame a com.help.stackoverflow.CheckUDFsclasse do jar para verificar a implementação correta.

    https://github.com/dineshdharme/pyspark-native-udfs

    Código fonte da EvaluateBooleanExpressionclasse:

    package com.help.udf
    
    import org.apache.spark.sql.api.java.UDF3
    import org.apache.spark.sql.api.java.UDF4
    
    import scala.reflect.runtime.currentMirror
    import scala.tools.reflect.ToolBox
    
    
    class EvaluateBooleanExpression extends UDF4[Int, Int, Int, String, Boolean] {
    
      override def call(a_value:Int, b_value:Int, c_value:Int,  given_expression: String): Boolean = {
    
        var new_expression = given_expression.replaceAll("A", a_value.toString)
        new_expression = new_expression.replaceAll("B", b_value.toString)
        new_expression = new_expression.replaceAll("C", c_value.toString)
        new_expression = new_expression.replaceAll("0", false.toString)
        new_expression = new_expression.replaceAll("1", true.toString)
        //println("Here's the new expression ", new_expression)
    
        val toolbox = currentMirror.mkToolBox()
        val calc = toolbox.eval(toolbox.parse(new_expression))
    
        val convertedCalc = calc.toString.toBoolean
        //println("Here's the new expression ", new_expression)
    
        convertedCalc
      }
    }
    

    Script python Pyspark:

    import sys
    
    from pyspark import SparkContext, SQLContext
    import pyspark.sql.functions as F
    import pyspark.sql.functions as F
    from pyspark import SparkContext, SQLContext
    from pyspark.sql import SparkSession
    from pyspark.sql.types import *
    
    spark = SparkSession.builder \
        .appName("MyApp") \
        .config("spark.jars", "file:/path/to/pyspark-native-udfs/releases/pyspark-native-udfs-assembly-0.1.3.jar") \
        .getOrCreate()
    
    sc = spark.sparkContext
    sqlContext = SQLContext(sc)
    
    
    data1 = [
    [0, 1, 1, "(A&B)"],
    [1, 1, 1, "(A)"],
    [0, 0, 1, "(A|C)"],
          ]
    
    df1Columns = ["A", "B", "C", "exp"]
    df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
    df1 = df1.withColumn("A", F.col("A").cast("int"))
    df1 = df1.withColumn("B", F.col("B").cast("int"))
    df1 = df1.withColumn("C", F.col("C").cast("int"))
    
    print("Schema of the dataframe")
    df1.printSchema()
    
    print("Given dataframe")
    df1.show(n=100, truncate=False)
    
    
    
    spark.udf.registerJavaFunction("evaluate_boolean_exp_udf", "com.help.udf.EvaluateBooleanExpression", BooleanType())
    
    df1.createOrReplaceTempView("given_table")
    
    df1_array = sqlContext.sql("select *, evaluate_boolean_exp_udf(A, B, C, exp) as bool_exp_evaluated from given_table")
    print("Dataframe after applying SCALA NATIVE UDF")
    df1_array.show(n=100, truncate=False)
    

    Saída :

    Schema of the dataframe
    root
     |-- A: integer (nullable = true)
     |-- B: integer (nullable = true)
     |-- C: integer (nullable = true)
     |-- exp: string (nullable = true)
    
    Given dataframe
    +---+---+---+-----+
    |A  |B  |C  |exp  |
    +---+---+---+-----+
    |0  |1  |1  |(A&B)|
    |1  |1  |1  |(A)  |
    |0  |0  |1  |(A|C)|
    +---+---+---+-----+
    
    Dataframe after applying SCALA NATIVE UDF
    +---+---+---+-----+------------------+
    |A  |B  |C  |exp  |bool_exp_evaluated|
    +---+---+---+-----+------------------+
    |0  |1  |1  |(A&B)|false             |
    |1  |1  |1  |(A)  |true              |
    |0  |0  |1  |(A|C)|true              |
    +---+---+---+-----+------------------+
    
    • 1
  2. werner
    2023-09-17T21:10:10+08:002023-09-17T21:10:10+08:00

    Você pode usar a função eval padrão do Python dentro de uma UDF .

    A função espera que os dados estejam em um dict, então primeiro evaltransformamos as colunas de dados em uma estrutura :

    from pyspark.sql import functions as F
    
    eval_udf = F.udf(lambda op, data: eval(op, {}, data.asDict()))
    
    df.withColumn('data', F.struct([df[x] for x in df.columns if x != 'logical_operation'])) \
        .withColumn('result', eval_udf(F.col('logical_operation'), F.col('data'))) \
        .show()
    

    Saída:

    +---+---+---+-----------------+---------+------+
    |  A|  B|  C|logical_operation|     data|result|
    +---+---+---+-----------------+---------+------+
    |  0|  1|  1|            (A&B)|{0, 1, 1}|     0|
    |  1|  1|  1|              (A)|{1, 1, 1}|     1|
    |  0|  0|  1|            (A|C)|{0, 0, 1}|     1|
    +---+---+---+-----------------+---------+------+
    

    evalvem com algumas questões de segurança , então verifique se isso pode ser um problema para você!

    • 1

relate perguntas

  • pyspark divide uma coluna do tipo Array de comprimento variável em duas matrizes menores

  • Por que o Spark SQL pula milissegundos quando fazemos a transmissão

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    destaque o código em HTML usando <font color="#xxx">

    • 2 respostas
  • Marko Smith

    Por que a resolução de sobrecarga prefere std::nullptr_t a uma classe ao passar {}?

    • 1 respostas
  • Marko Smith

    Você pode usar uma lista de inicialização com chaves como argumento de modelo (padrão)?

    • 2 respostas
  • Marko Smith

    Por que as compreensões de lista criam uma função internamente?

    • 1 respostas
  • Marko Smith

    Estou tentando fazer o jogo pacman usando apenas o módulo Turtle Random e Math

    • 1 respostas
  • Marko Smith

    java.lang.NoSuchMethodError: 'void org.openqa.selenium.remote.http.ClientConfig.<init>(java.net.URI, java.time.Duration, java.time.Duratio

    • 3 respostas
  • Marko Smith

    Por que 'char -> int' é promoção, mas 'char -> short' é conversão (mas não promoção)?

    • 4 respostas
  • Marko Smith

    Por que o construtor de uma variável global não é chamado em uma biblioteca?

    • 1 respostas
  • Marko Smith

    Comportamento inconsistente de std::common_reference_with em tuplas. Qual é correto?

    • 1 respostas
  • Marko Smith

    Somente operações bit a bit para std::byte em C++ 17?

    • 1 respostas
  • Martin Hope
    fbrereto Por que a resolução de sobrecarga prefere std::nullptr_t a uma classe ao passar {}? 2023-12-21 00:31:04 +0800 CST
  • Martin Hope
    比尔盖子 Você pode usar uma lista de inicialização com chaves como argumento de modelo (padrão)? 2023-12-17 10:02:06 +0800 CST
  • Martin Hope
    Amir reza Riahi Por que as compreensões de lista criam uma função internamente? 2023-11-16 20:53:19 +0800 CST
  • Martin Hope
    Michael A formato fmt %H:%M:%S sem decimais 2023-11-11 01:13:05 +0800 CST
  • Martin Hope
    God I Hate Python std::views::filter do C++20 não filtrando a visualização corretamente 2023-08-27 18:40:35 +0800 CST
  • Martin Hope
    LiDa Cute Por que 'char -> int' é promoção, mas 'char -> short' é conversão (mas não promoção)? 2023-08-24 20:46:59 +0800 CST
  • Martin Hope
    jabaa Por que o construtor de uma variável global não é chamado em uma biblioteca? 2023-08-18 07:15:20 +0800 CST
  • Martin Hope
    Panagiotis Syskakis Comportamento inconsistente de std::common_reference_with em tuplas. Qual é correto? 2023-08-17 21:24:06 +0800 CST
  • Martin Hope
    Alex Guteniev Por que os compiladores perdem a vetorização aqui? 2023-08-17 18:58:07 +0800 CST
  • Martin Hope
    wimalopaan Somente operações bit a bit para std::byte em C++ 17? 2023-08-17 17:13:58 +0800 CST

Hot tag

python javascript c++ c# java typescript sql reactjs html

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve