Difference between filter and where in scala spark sql
It's related also with Spark optimization. Look at short example: Big parquet file in HDFS with structure and data:
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 27 items
drwxr-xr-x - root root 0 2019-01-16 12:55 /user/tickers/ticks.parquet/ticker_id=1
drwxr-xr-x - root root 0 2019-01-16 13:58 /user/tickers/ticks.parquet/ticker_id=10
drwxr-xr-x - root root 0 2019-01-16 14:04 /user/tickers/ticks.parquet/ticker_id=11
drwxr-xr-x - root root 0 2019-01-16 14:10 /user/tickers/ticks.parquet/ticker_id=12
...
Where each partition has partitions inside (by date)
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet/ticker_id=1
Found 6 items
drwxr-xr-x - root root 0 2019-01-16 12:55 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-09
drwxr-xr-x - root root 0 2019-01-16 12:50 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-10
drwxr-xr-x - root root 0 2019-01-16 12:53 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-11
...
Structure:
scala> spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet").printSchema
root
|-- ticker_id: integer (nullable = true)
|-- ddate: date (nullable = true)
|-- db_tsunx: long (nullable = true)
|-- ask: double (nullable = true)
|-- bid: double (nullable = true)
For example, you have DS like this:
val maxTsunx = spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet").select(col("ticker_id"),col("db_tsunx")).groupBy("ticker_id").agg(max("db_tsunx"))
that contains max(db_tsunx) for each ticker_id
F.E.: you want get data just for only one ticker from this DS
You have 2 ways:
1) maxTsunx.filter(r => r.get(0) == 1)
2) maxTsunx.where(col("ticker_id")===1)
and it's a very different "Physical Plan"
look at 1)
== Physical Plan ==
*(2) Filter <function1>.apply
+- *(2) HashAggregate(keys=[ticker_id#37], functions=[max(db_tsunx#39L)], output=[ticker_id#37, max(db_tsunx)#52L])
+- Exchange hashpartitioning(ticker_id#37, 200)
+- *(1) HashAggregate(keys=[ticker_id#37], functions=[partial_max(db_tsunx#39L)], output=[ticker_id#37, max#61L])
+- *(1) Project [ticker_id#37, db_tsunx#39L]
+- *(1) FileScan parquet [db_tsunx#39L,ticker_id#37,ddate#38] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdpnn:9000/user/tickers/ticks.parquet],
PartitionCount: 162,
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<db_tsunx:bigint>
2)
== Physical Plan ==
*(2) HashAggregate(keys=[ticker_id#84], functions=[max(db_tsunx#86L)], output=[ticker_id#84, max(db_tsunx)#99L])
+- Exchange hashpartitioning(ticker_id#84, 200)
+- *(1) HashAggregate(keys=[ticker_id#84], functions=[partial_max(db_tsunx#86L)], output=[ticker_id#84, max#109L])
+- *(1) Project [ticker_id#84, db_tsunx#86L]
+- *(1) FileScan parquet [db_tsunx#86L,ticker_id#84,ddate#85] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdpnn:9000/user/tickers/ticks.parquet],
PartitionCount: 6,
PartitionFilters: [isnotnull(ticker_id#84), (ticker_id#84 = 1)],
PushedFilters: [],
ReadSchema: struct<db_tsunx:bigint>
Compare 162 and 6 and PartitionFilters: [], PartitionFilters: [isnotnull(ticker_id#84), (ticker_id#84 = 1)],
It means that filter action on data from DS and where go inside Spark and used for optimization.
where
documentation:
Filters rows using the given condition. This is an alias for filter.
filter
is simply the standard Scala (and FP in general) name for such a function, and where
is for people who prefer SQL.