python - 区分 sql null 和 json null pyspark 数据框

我有一个模式不一致的 json 文件,其中某些字段可能会或可能不会出现在连续的行中

示例 JSON 文件

{"table":"TABLEA","ID":1,"COLUMN1":283,"COLUMN2":0,"COLUMN3":0}
{"table":"TABLEA","ID":1,"COLUMN1":null,"COLUMN2":null,"COLUMN3":null}
{"table":"TABLEA","ID":1,"COLUMN4":"CLOSE","COLUMN5":"user"}
{"table":"TABLEA","ID":1,"COLUMN5":"user","COLUMN6":355}
{"table":"TABLEA","ID":1,"COLUMN5":"user","COLUMN4":"NOTE"}
{"table":"TABLEA","ID":1,"COLUMN5":"user","COLUMN4":"NOTE"}

上面的 json 表示发生在特定 table 上的各种更新,在上面的 JSON 中,

  • 第一个事件只有这 3 个 cols 的更新
  • 对于这 3 个列,第二个事件的更新为 null
  • 第三个事件有其他 2 列的更新

基本上,每个事件只包含其中只有更新的列。如果没有更新,则该列将不可用。

问题

我想区分作为事件更新的一部分的空值与将此数据加载到数据帧时生成的空值由于这里的架构是动态的,因此当我尝试在数据帧中加载 json 并尝试显示它时.这就是它的存储方式

+-------+-------+-------+-------+------------+-------+---+------+
|COLUMN1|COLUMN2|COLUMN3|COLUMN4|     COLUMN5|COLUMN6| ID| table|
+-------+-------+-------+-------+------------+-------+---+------+
|    283|      0|      0|   null|        null|   null|  1|TABLEA|
|   null|   null|   null|   null|        null|   null|  1|TABLEA|
|   null|   null|   null|  CLOSE|        user|   null|  1|TABLEA|
|   null|   null|   null|   null|        user|    355|  1|TABLEA|
|   null|   null|   null|   NOTE|        user|   null|  1|TABLEA|
|   null|   null|   null|   NOTE|        user|   null|  1|TABLEA|
+-------+-------+-------+-------+------------+-------+---+------+

其中第二行实际上是从前 3 列发生的空更新,而对于其他行,因为这些列不是它默认加载 null values 的事件的一部分。

我想区分 json 文件中的空值与由于模式不一致而默认加载的空值。

我试过的

尝试了几种方法,但没有任何效果

spark = SparkSession \
        .builder \
        .appName("Test") \
        .getOrCreate()
    applicationId = spark.sparkContext.applicationId
    sc = spark.sparkContext

    print(sc.getConf().getAll())
    input_file_path = "above json file"
    print(str(input_file_path))
    json_df = spark.read.json(input_file_path)

    json_df.show()

    #method1
    json_df.withColumn("testcol", F.when(F.isnull('COLUMN1'), F.lit('NaN')).otherwise(F.col('COLUMN1'))).show()

    #method2
def has_column(df,col):
    try:
       df[col]
       return F.lit(True)
    except Exception:
       return F.lit(False)
    json_df.withColumn("testcol", F.when(has_column(json_df, 'COLUMN1'), F.col('COLUMN1')).otherwise(F.lit('NaN'))).show()

任何帮助,将不胜感激。谢谢!

回答1

由于读取数据后空值变得相同,因此我会尝试先替换文件中的空值,然后再读取它。

with open(input_file_path) as f:
    newText=f.read().replace(":null",":NaN")

with open(input_file_path, "w") as f:
    f.write(newText)

然后 json_df.show() 应该在 table 下面给出

+-------+-------+-------+-------+-------+-------+---+------+
|COLUMN1|COLUMN2|COLUMN3|COLUMN4|COLUMN5|COLUMN6| ID| table|
+-------+-------+-------+-------+-------+-------+---+------+
|  283.0|    0.0|    0.0|   null|   null|   null|  1|TABLEA|
|    NaN|    NaN|    NaN|   null|   null|   null|  1|TABLEA|
|   null|   null|   null|  CLOSE|   user|   null|  1|TABLEA|
|   null|   null|   null|   null|   user|    355|  1|TABLEA|
|   null|   null|   null|   NOTE|   user|   null|  1|TABLEA|
|   null|   null|   null|   NOTE|   user|   null|  1|TABLEA|
+-------+-------+-------+-------+-------+-------+---+------+