Python 使用类似 SQL 的 IN 子句过滤 Pyspark DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35870760/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 17:06:03  来源:igfitidea点击:

Filtering a Pyspark DataFrame with SQL-like IN clause

pythonsqlapache-sparkdataframepyspark

提问by mar tin

I want to filter a Pyspark DataFrame with a SQL-like INclause, as in

我想用类似 SQL 的IN子句过滤 Pyspark DataFrame ,如

sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')

where ais the tuple (1, 2, 3). I am getting this error:

a元组在哪里(1, 2, 3)。我收到此错误:

java.lang.RuntimeException: [1.67] failure: ``('' expected but identifier a found

java.lang.RuntimeException: [1.67] 失败: ``('' 预期但标识符 a 找到

which is basically saying it was expecting something like '(1, 2, 3)'instead of a. The problem is I can't manually write the values in a as it's extracted from another job.

这基本上是说它期待像'(1, 2, 3)'而不是a。问题是我无法在 a 中手动写入值,因为它是从另一个作业中提取的。

How would I filter in this case?

在这种情况下我将如何过滤?

回答by zero323

String you pass to SQLContextit evaluated in the scope of the SQL environment. It doesn't capture the closure. If you want to pass a variable you'll have to do it explicitly using string formatting:

您传递给SQLContext它的字符串在 SQL 环境的范围内进行评估。它不捕获闭包。如果要传递变量,则必须使用字符串格式显式执行此操作:

df = sc.parallelize([(1, "foo"), (2, "x"), (3, "bar")]).toDF(("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()
##  2 

Obviously this is not something you would use in a "real" SQL environment due to security considerations but it shouldn't matter here.

显然,出于安全考虑,这不是您将在“真实”SQL 环境中使用的东西,但在这里应该无关紧要。

In practice DataFrameDSL is a much choice when you want to create dynamic queries:

实际上,DataFrame当您要创建动态查询时,DSL 是一个不错的选择:

from pyspark.sql.functions import col

df.where(col("v").isin({"foo", "bar"})).count()
## 2

It is easy to build and compose and handles all details of HiveQL / Spark SQL for you.

很容易为您构建和组合并处理 HiveQL/Spark SQL 的所有细节。

回答by braj

reiterating what @zero323 has mentioned above : we can do the same thing using a list as well (not only set)like below

重申@zero323 上面提到的内容:我们也可以使用列表(不仅如此set来做同样的事情,如下所示

from pyspark.sql.functions import col

df.where(col("v").isin(["foo", "bar"])).count()

回答by shwetabharti

Just a little addition/update:

只是一点点添加/更新:

choice_list = ["foo", "bar", "Hyman", "joan"]

If you want to filter your dataframe "df", such that you want to keep rows based upon a column "v" taking only the values from choice_list, then

如果你想过滤你的数据框“df”,这样你就想根据列“v”保留行,只取choice_list中的值,然后

df_filtered = df.where( ( col("v").isin (choice_list) ) )

回答by BICube

You can also do this for integer columns:

您也可以对整数列执行此操作:

df_filtered = df.filter("field1 in (1,2,3)")

or this for string columns:

或者这对于字符串列:

df_filtered = df.filter("field1 in ('a','b','c')")

回答by Alex_Gidiotis

A slightly different approach that worked for me is to filter with a custom filter function.

对我有用的略有不同的方法是使用自定义过滤器功能进行过滤。

def filter_func(a):
"""wrapper function to pass a in udf"""
    def filter_func_(col):
    """filtering function"""
        if col in a.value:
            return True

    return False

return udf(filter_func_, BooleanType())

# Broadcasting allows to pass large variables efficiently
a = sc.broadcast((1, 2, 3))
df = my_df.filter(filter_func(a)(col('field1'))) \