我有一个具有以下架构的 scala 数据框:
root
|-- time: string (nullable = true)
|-- itemId: string (nullable = true)
|-- itemFeatures: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
我想 explode itemFeatures
列,然后将我的数据帧发送到 UDF。但只要我包含 explode
,调用 UDF 就会导致此错误:org.apache.spark.SparkException: Task not serializable
我想不通为什么???
环境:Scala 2.11.12,Spark 2.4.4
完整示例:
val dataList = List(
("time1", "id1", "map1"),
("time2", "id2", "map2"))
val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode("itemFeatures"))
val doNextThingUDF: UserDefinedFunction = udf(doNextThing _)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time"))
我的 UDF 看起来像这样:
val doNextThing(time: String): String = {
time+"blah"
}
如果我删除 explode
,一切正常,或者如果我在 explode 之后不调用 UDF,一切正常。我可以想象如果 Spark 动态执行 explode 并且不知道将存在多少行,但即使我添加 ex dfExploded.cache()
和 dfExploded.count()
,Spark 也无法以某种方式将每一行发送到 UDF我仍然得到错误。这是一个已知的问题?我错过了什么?
回答1
我认为问题来自您如何定义 donextThing
函数。在您的“完整示例”中也有几个错别字。
特别是 itemFeatures 列在您的示例中是一个字符串,我知道它应该是一个 Map。但这是一个工作示例:
val dataList = List(
("time1", "id1", Map("map1" -> 1)),
("time2", "id2", Map("map2" -> 2)))
val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode($"itemFeatures"))
val doNextThing = (time: String) => {time+"blah"}
val doNextThingUDF = udf(doNextThing)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time")))