看来我无法使用 spark 作业中的 delta 格式进行写入,但我不确定我遗漏了什么。我正在使用 spark 3.5.3 和 deltalake 3.2.0。
我的错误:
Exception in thread "main" org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
我的build.sbt:
name := "test"
version := "0.1"
scalaVersion := "2.12.18"
logLevel := Level.Warn
assembly / logLevel := Level.Warn
clean / logLevel := Level.Warn
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.3" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.3" % "provided"
libraryDependencies += "io.delta" %% "delta-spark" % "3.2.0"
assembly / test := {}
assemblyJarName := s"${name.value}-${version.value}.jar"
assemblyMergeStrategy in assembly := {
case PathList("META-INF", _*) => MergeStrategy.discard
case _ => MergeStrategy.first
}
我的工作是:
val spark = SparkSession
.builder()
.appName("test")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog"
)
.getOrCreate()
val df = getData(spark)
val path = "/home/user/testtable"
df.write.format("delta").mode("overwrite").save(path)
spark.stop()
有什么想法吗?我正在查看 delta lake quickstart,但我没有发现任何遗漏的内容。不过我觉得有些东西很明显。
您如何运行您的代码?
以防万一,如果您准备好了 fat jar
sbt assembly
然后通过提交,spark-submit
请不要忘记--packages io.delta:delta-spark_2.12:3.2.0
。.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0")
不起作用。libraryDependencies += "io.delta" %% "delta-spark" % "3.2.0"
也build.sbt
不够。尝试