Difference between filter and where in scala spark sql
It's related also with Spark optimization. Look at short example: Big parquet file in HDFS with structure and data:
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 27 items
drwxr-xr-x - root root 0 2019-01-16 12:55 /user/tickers/ticks.parquet/ticker_id=1
drwxr-xr-x - root root 0 2019-01-16 13:58 /user/tickers/ticks.parquet/ticker_id=10
drwxr-xr-x - root root 0 2019-01-16 14:04 /user/tickers/ticks.parquet/ticker_id=11
drwxr-xr-x - root root 0 2019-01-16 14:10 /user/tickers/ticks.parquet/ticker_id=12
Where each partition has partitions inside (by date)
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet/ticker_id=1
Found 6 items
drwxr-xr-x - root root 0 2019-01-16 12:55 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-09
drwxr-xr-x - root root 0 2019-01-16 12:50 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-10
drwxr-xr-x - root root 0 2019-01-16 12:53 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-11
scala> spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet").printSchema
|-- ticker_id: integer (nullable = true)
|-- ddate: date (nullable = true)
|-- db_tsunx: long (nullable = true)
|-- ask: double (nullable = true)
|-- bid: double (nullable = true)
For example, you have DS like this:
val maxTsunx = spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet").select(col("ticker_id"),col("db_tsunx")).groupBy("ticker_id").agg(max("db_tsunx"))
that contains max(db_tsunx) for each ticker_id
F.E.: you want get data just for only one ticker from this DS
You have 2 ways:
1) maxTsunx.filter(r => r.get(0) == 1)
2) maxTsunx.where(col("ticker_id")===1)
and it's a very different "Physical Plan"
look at 1)
== Physical Plan ==
*(2) Filter <function1>.apply
+- *(2) HashAggregate(keys=[ticker_id#37], functions=[max(db_tsunx#39L)], output=[ticker_id#37, max(db_tsunx)#52L])
+- Exchange hashpartitioning(ticker_id#37, 200)
+- *(1) HashAggregate(keys=[ticker_id#37], functions=[partial_max(db_tsunx#39L)], output=[ticker_id#37, max#61L])
+- *(1) Project [ticker_id#37, db_tsunx#39L]
+- *(1) FileScan parquet [db_tsunx#39L,ticker_id#37,ddate#38] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdpnn:9000/user/tickers/ticks.parquet],
PartitionCount: 162,
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<db_tsunx:bigint>
== Physical Plan ==
*(2) HashAggregate(keys=[ticker_id#84], functions=[max(db_tsunx#86L)], output=[ticker_id#84, max(db_tsunx)#99L])
+- Exchange hashpartitioning(ticker_id#84, 200)
+- *(1) HashAggregate(keys=[ticker_id#84], functions=[partial_max(db_tsunx#86L)], output=[ticker_id#84, max#109L])
+- *(1) Project [ticker_id#84, db_tsunx#86L]
+- *(1) FileScan parquet [db_tsunx#86L,ticker_id#84,ddate#85] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdpnn:9000/user/tickers/ticks.parquet],
PartitionCount: 6,
PartitionFilters: [isnotnull(ticker_id#84), (ticker_id#84 = 1)],
PushedFilters: [],
ReadSchema: struct<db_tsunx:bigint>
Compare 162 and 6 and PartitionFilters: [], PartitionFilters: [isnotnull(ticker_id#84), (ticker_id#84 = 1)],
It means that filter action on data from DS and where go inside Spark and used for optimization.
Filters rows using the given condition. This is an alias for filter.
is simply the standard Scala (and FP in general) name for such a function, and where
is for people who prefer SQL.