我经常使用map
spark 数据集行上的函数在 Scala 中对类型化对象进行转换。我通常的模式是转换数据框转换(withColumn
、groupBy
等)创建的中间结果,并创建中间结果的类型化数据集,以便我可以使用map
。
这种方法效果很好,但会导致中间结果或难以处理的元组类型产生很多“临时”的案例类。
另一种方法是map
在数据框上运行并从行中检索类型字段,但如果是案例类,getAs[T]
这似乎不起作用。spark.implicits
T
例如这给出了错误ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Person
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import org.apache.spark.sql.expressions.Window
import spark.implicits._
final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS
val df = people.alias("p")
.select($"p.name", struct($"p.*").alias("person"))
val ds = df.map(row => {
val name = row.getAs[String]("name")
val person = row.getAs[Person]("person")
(name, person)
})
display(ds)
而这个工作正常:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import org.apache.spark.sql.expressions.Window
import spark.implicits._
final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS
val df = people.alias("p")
.select($"p.name", struct($"p.*").alias("person"))
.as[Tuple2[String, Person]]
val ds = df.map(row => {
val name = row._1
val person = row._2
(name, person)
})
display(ds)
因此,在第二个示例中,spark 很乐意将数据框 person 结构转换为Person
case 类,但在第一个示例中却不会这样做。有人知道解决这个问题的简单方法吗?
谢谢,
大卫
“简单”,可能 :),但大量使用可能会更改的内部 API(并且已经更改)。此代码在 Spark 4 上也无法按原样运行(在 3.5.1 上测试)。
作为一种方法,它也可能比您提供的使用元组的第二个示例更慢,因为 Spark 代码在输入您的地图代码之前会从 InternalRow 转换为用户空间 Row。然后,下面的代码在调用解码器之前转换回 InternalRow。
resolveAndBind 在这种例子中通常是没问题的,但它也不能保证在所有情况下都能起作用,因为字段名称等的解析通常需要作为查询计划的完整分析的一部分来进行。
总之,最好尽可能使用元组包装器和内置编码,它速度更快,并且经过设计和测试。