from pyspark.sql import functions as F
eval_udf = F.udf(lambda op, data: eval(op, {}, data.asDict()))
df.withColumn('data', F.struct([df[x] for x in df.columns if x != 'logical_operation'])) \
.withColumn('result', eval_udf(F.col('logical_operation'), F.col('data'))) \
.show()
这是在 pyspark 中使用 scala UDF 的解决方案,因为它们比 python UDF 更快。您可以在以下存储库中找到 pyspark 脚本中使用的 UDF 和发布 jar 的代码。
如果您想修改 UDF 函数以满足您未来的需求,您只需运行
sbt assembly
编译 jar 即可。com.help.stackoverflow.CheckUDFs
然后从 jar 中调用该类来验证正确的实现。https://github.com/dineshdharme/pyspark-native-udfs
该类的源代码
EvaluateBooleanExpression
:Pyspark python 脚本:
输出 :
您可以在UDF内使用标准 Python eval 函数。
该
eval
函数期望数据位于字典中,因此我们首先将数据列转换为结构体:输出:
eval
存在一些安全问题,因此请检查这对您来说是否有问题!