scala 过滤器和scala spark sql中的where之间的区别

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33885979/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:48:58  来源:igfitidea点击:

Difference between filter and where in scala spark sql

scalaapache-sparkapache-spark-sql

提问by Ishan

I've tried both but it works same

我都试过了,但效果一样

example

例子

val items =  List(1, 2, 3)

using filter

使用过滤器

employees.filter($"emp_id".isin(items:_*)).show

using where

使用哪里

employees.where($"emp_id".isin(items:_*)).show

Result is same for the both

两者的结果相同

+------+------+------+-------+------+-------+
|EMP_ID|F_NAME|SALARY|DEPT_ID|L_NAME|MANAGER|
+------+------+------+-------+------+-------+
|     6|    E6|  2000|      4|    L6|      2|
|     7|    E7|  3000|      4|    L7|      1|
|     8|    E8|  4000|      2|    L8|      2|
|     9|    E9|  1500|      2|    L9|      1|
|    10|   E10|  1000|      2|   L10|      1|
|     4|    E4|   400|      3|    L4|      1|
|     2|    E2|   200|      1|    L2|      1|
|     3|    E3|   700|      2|    L3|      2|
|     5|    E5|   300|      2|    L5|      2|
+------+------+------+-------+------+-------+

回答by Alexey Romanov

wheredocumentation:

where文件

Filters rows using the given condition. This is an alias for filter.

使用给定条件过滤行。这是过滤器的别名。

filteris simply the standard Scala (and FP in general) name for such a function, and whereis for people who prefer SQL.

filter只是这种函数的标准 Scala(和一般的 FP)名称,适用where于喜欢 SQL 的人。

回答by Aleksey Yakushev

It's related also with Spark optimization. Look at short example: Big parquet file in HDFS with structure and data:

它也与 Spark 优化有关。看一个简短的例子:HDFS 中的大型镶木地板文件,具有结构和数据:

[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 27 items
drwxr-xr-x   - root root          0 2019-01-16 12:55 /user/tickers/ticks.parquet/ticker_id=1
drwxr-xr-x   - root root          0 2019-01-16 13:58 /user/tickers/ticks.parquet/ticker_id=10
drwxr-xr-x   - root root          0 2019-01-16 14:04 /user/tickers/ticks.parquet/ticker_id=11
drwxr-xr-x   - root root          0 2019-01-16 14:10 /user/tickers/ticks.parquet/ticker_id=12
...

Where each partition has partitions inside (by date)

每个分区内部都有分区(按日期)

[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet/ticker_id=1
Found 6 items
drwxr-xr-x   - root root          0 2019-01-16 12:55 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-09
drwxr-xr-x   - root root          0 2019-01-16 12:50 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-10
drwxr-xr-x   - root root          0 2019-01-16 12:53 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-11
...

Structure:

结构:

scala> spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet").printSchema
root
 |-- ticker_id: integer (nullable = true)
 |-- ddate: date (nullable = true)
 |-- db_tsunx: long (nullable = true)
 |-- ask: double (nullable = true)
 |-- bid: double (nullable = true)

For example, you have DS like this:

例如,您有这样的 DS:

val maxTsunx = spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet").select(col("ticker_id"),col("db_tsunx")).groupBy("ticker_id").agg(max("db_tsunx"))

that contains max(db_tsunx) for each ticker_id

包含每个 ticker_id 的 max(db_tsunx)

F.E.: you want get data just for only one ticker from this DS

FE:您只想从这个 DS 中获取一个代码的数据

You have 2 ways:

你有两种方式:

1) maxTsunx.filter(r => r.get(0) == 1)
2) maxTsunx.where(col("ticker_id")===1)

and it's a very different "Physical Plan"

这是一个非常不同的“物理计划”

look at 1)

看 1)

    == Physical Plan ==
    *(2) Filter <function1>.apply
    +- *(2) HashAggregate(keys=[ticker_id#37], functions=[max(db_tsunx#39L)], output=[ticker_id#37, max(db_tsunx)#52L])
       +- Exchange hashpartitioning(ticker_id#37, 200)
          +- *(1) HashAggregate(keys=[ticker_id#37], functions=[partial_max(db_tsunx#39L)], output=[ticker_id#37, max#61L])
             +- *(1) Project [ticker_id#37, db_tsunx#39L]
                +- *(1) FileScan parquet [db_tsunx#39L,ticker_id#37,ddate#38]    Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[hdfs://hdpnn:9000/user/tickers/ticks.parquet],
PartitionCount: 162, 
    PartitionFilters: [], 
    PushedFilters: [], 
    ReadSchema: struct<db_tsunx:bigint>

2)

2)

== Physical Plan ==
*(2) HashAggregate(keys=[ticker_id#84], functions=[max(db_tsunx#86L)], output=[ticker_id#84, max(db_tsunx)#99L])
+- Exchange hashpartitioning(ticker_id#84, 200)
   +- *(1) HashAggregate(keys=[ticker_id#84], functions=[partial_max(db_tsunx#86L)], output=[ticker_id#84, max#109L])
      +- *(1) Project [ticker_id#84, db_tsunx#86L]
         +- *(1) FileScan parquet [db_tsunx#86L,ticker_id#84,ddate#85] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[hdfs://hdpnn:9000/user/tickers/ticks.parquet], 
PartitionCount: 6, 
PartitionFilters: [isnotnull(ticker_id#84), (ticker_id#84 = 1)], 
PushedFilters: [], 
ReadSchema: struct<db_tsunx:bigint>

Compare 162 and 6 and PartitionFilters: [], PartitionFilters: [isnotnull(ticker_id#84), (ticker_id#84 = 1)],

比较 162 和 6 和 PartitionFilters: [], PartitionFilters: [isnotnull(ticker_id#84), (ticker_id#84 = 1)],

It means that filter action on data from DS and where go inside Spark and used for optimization.

这意味着对来自 DS 的数据进行过滤操作,并在 Spark 内部进行哪些操作并用于优化。

回答by Parthiv Gogree

just FYI,

只是FYI

maxTsunx.filter(r => r.get(0) == 1)

maxTsunx.filter(r => r.get(0) == 1)

maxTsunx.where(col("ticker_id")===1)

maxTsunx.where(col("ticker_id")===1)

or

或者

maxTsunx.filter(col("ticker_id")===1)

maxTsunx.filter(col("ticker_id")===1)

In first case, passing function to filter function

在第一种情况下,将函数传递给过滤函数

In second case, passing condition expression (either string or column type) to filter or where function. Physical plan 2 is also possible by replacing wherewith filter function.

在第二种情况下,将条件表达式(字符串或列类型)传递给过滤器或 where 函数。物理方案 2 也可以替换where为过滤器功能。