我想使用 pyspark 根据输入创建新的数据框,它会打印出每个不同 value 列的第一次出现。 rownumber() 工作还是 window()。不确定最好的方法是解决这个问题还是 sparksql 是最好的。基本上第二个 table 是我想要的输出,它只打印出输入中第一次出现的 value 列。我只对“value”列的第一次出现感兴趣。如果一个 value 被重复,只显示第一个看到的。
+--------+--------+--------+
| VALUE| DAY | Color
+--------+--------+--------+
|20 |MON | BLUE|
|20 |TUES | BLUE|
|30 |WED | BLUE|
+--------+--------+--------+
+--------+--------+--------+
| VALUE| DAY | Color
+--------+--------+--------+
|20 |MON | BLUE|
|30 |WED | BLUE|
+--------+--------+--------+
回答1
这是我在不使用窗口的情况下执行此操作的方法。它可能会在大型数据集上表现更好,因为它可以使用更多的集群来完成工作。在您的情况下,您需要使用“VALUE”作为部门,使用“工资”作为“DATE”。
from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [("James","Sales",3000),("Michael","Sales",4600),
("Robert","Sales",4100),("Maria","Finance",3000),
("Raman","Finance",3000),("Scott","Finance",3300),
("Jen","Finance",3900),("Jeff","Marketing",3000),
("Kumar","Marketing",2000)]
df = spark.createDataFrame(data,["Name","Department","Salary"])
unGroupedDf = df.select( \
df["Department"], \
f.struct(*[\ # Make a struct with all the record elements.
df["Salary"].alias("Salary"),\ #will be sorted on Salary first
df["Department"].alias("Dept"),\
df["Name"].alias("Name")] )\
.alias("record") )
unGroupedDf.groupBy("Department")\ #group
.agg(f.collect_list("record")\ #Gather all the element in a group
.alias("record"))\
.select(\
f.reverse(\ #Make the sort Descending
f.array_sort(\ #Sort the array ascending
f.col("record")\ #the struct
)\
)[0].alias("record"))\ #grab the "Max element in the array
).select( f.col("record.*") ).show() # use struct as Columns
.show()
+---------+------+-------+
| Dept|Salary| Name|
+---------+------+-------+
| Sales| 4600|Michael|
| Finance| 3900| Jen|
|Marketing| 3000| Jeff|
+---------+------+-------+
回答2
在我看来,您想通过 VALUE
删除重复的项目。如果是这样,请使用 dropDuplicates
df.dropDuplicates(['VALUE']).show()
+-----+---+-----+
|VALUE|DAY|Color|
+-----+---+-----+
| 20|MON| BLUE|
| 30|WED| BLUE|
+-----+---+-----+
回答3
以下是https://sparkbyexamples.com/pyspark/pyspark-select-first-row-of-each-group/。在这个例子中,他们以薪水为例。在你的情况下,我认为你会使用'DAY'作为orderBy,'Value'作为partitionBy。
from pyspark.sql import SparkSession,Row spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate() data = [("James","Sales",3000),("Michael","Sales",4600), ("Robert","Sales",4100),("Maria","Finance",3000), ("Raman","Finance",3000),("Scott","Finance",3300), ("Jen","Finance",3900),("Jeff","Marketing",3000), ("Kumar","Marketing",2000)] df = spark.createDataFrame(data,["Name","Department","Salary"]) df.show() from pyspark.sql.window import Window from pyspark.sql.functions import col, row_number w2 = Window.partitionBy("department").orderBy(col("salary")) df.withColumn("row",row_number().over(w2)) \ .filter(col("row") == 1).drop("row") \ .show() +-------------+----------+------+ |employee_name|department|salary| +-------------+----------+------+ | James| Sales| 3000| | Maria| Finance| 3000| | Kumar| Marketing| 2000| +-------------+----------+------+
是的,您需要开发一种排序天数的方法,但我认为您明白这是可能的,并且您选择了正确的工具。我总是喜欢警告人们,这使用一个窗口,他们将所有数据吸入 1 个执行者来完成工作。这不是特别有效。在小型数据集上,这可能是高性能的。在较大的数据集上,可能需要很长时间才能完成。