apache-spark - Scala explode 后跟数据帧上的 UDF 失败

我有一个具有以下架构的 scala 数据框:

root
 |-- time: string (nullable = true)
 |-- itemId: string (nullable = true)
 |-- itemFeatures: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

我想 explode itemFeatures 列,然后将我的数据帧发送到 UDF。但只要我包含 explode,调用 UDF 就会导致此错误:org.apache.spark.SparkException: Task not serializable

我想不通为什么???

环境:Scala 2.11.12,Spark 2.4.4

完整示例:

val dataList = List(
    ("time1", "id1", "map1"),
    ("time2", "id2", "map2"))
val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode("itemFeatures"))

val doNextThingUDF: UserDefinedFunction = udf(doNextThing _)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time"))

我的 UDF 看起来像这样:

val doNextThing(time: String): String = {
  time+"blah"
}

如果我删除 explode,一切正常,或者如果我在 explode 之后不调用 UDF,一切正常。我可以想象如果 Spark 动态执行 explode 并且不知道将存在多少行,但即使我添加 ex dfExploded.cache()dfExploded.count(),Spark 也无法以某种方式将每一行发送到 UDF我仍然得到错误。这是一个已知的问题?我错过了什么?

回答1

我认为问题来自您如何定义 donextThing 函数。在您的“完整示例”中也有几个错别字。

特别是 itemFeatures 列在您的示例中是一个字符串,我知道它应该是一个 Map。但这是一个工作示例:

val dataList = List(
    ("time1", "id1", Map("map1" -> 1)),
    ("time2", "id2", Map("map2" -> 2)))

val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode($"itemFeatures"))

val doNextThing = (time: String) => {time+"blah"}
val doNextThingUDF = udf(doNextThing)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time")))

相似文章

随机推荐

最新文章