scala> val inputDF = sc.parallelize(Seq((1,"oclay",400,"2015-01-01 00:00:00"),(1,"oclay",800,"2018-01-01 00:00:00"))).toDF("pid","pname","price","last_mod")
scala> inputDF.show
+---+-----+-----+-------------------+
|pid|pname|price| last_mod|
+---+-----+-----+-------------------+
| 1|oclay| 400|2015-01-01 00:00:00|
| 1|oclay| 800|2018-01-01 00:00:00|
+---+-----+-----+-------------------+
scala> import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
def getLatestRecs(df: DataFrame, partition_col: List[String], sortCols: List[String]): DataFrame = {
val part = Window.partitionBy(partition_col.head,partition_col:_*).orderBy(array(sortCols.head,sortCols:_*).desc)
val rowDF = df.withColumn("rn", row_number().over(part))
val res = rowDF.filter("rn==1").drop("rn")
res
}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
getLatestRecs: (df: org.apache.spark.sql.DataFrame, partition_col: List[String], sortCols: List[String])org.apache.spark.sql.DataFrame
scala> val result = getLatestRecs(inputDF,List("pid","pname"),List("last_mod"))
result: org.apache.spark.sql.DataFrame = [pid: int, pname: string, price: int, last_mod: string]
scala> result.show
+---+-----+-----+-------------------+
|pid|pname|price| last_mod|
+---+-----+-----+-------------------+
| 1|oclay| 800|2018-01-01 00:00:00|
+---+-----+-----+-------------------+