Python 使用类似 SQL 的 IN 子句过滤 Pyspark DataFrame
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35870760/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Filtering a Pyspark DataFrame with SQL-like IN clause
提问by mar tin
I want to filter a Pyspark DataFrame with a SQL-like IN
clause, as in
我想用类似 SQL 的IN
子句过滤 Pyspark DataFrame ,如
sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')
where a
is the tuple (1, 2, 3)
. I am getting this error:
a
元组在哪里(1, 2, 3)
。我收到此错误:
java.lang.RuntimeException: [1.67] failure: ``('' expected but identifier a found
java.lang.RuntimeException: [1.67] 失败: ``('' 预期但标识符 a 找到
which is basically saying it was expecting something like '(1, 2, 3)'instead of a. The problem is I can't manually write the values in a as it's extracted from another job.
这基本上是说它期待像'(1, 2, 3)'而不是a。问题是我无法在 a 中手动写入值,因为它是从另一个作业中提取的。
How would I filter in this case?
在这种情况下我将如何过滤?
回答by zero323
String you pass to SQLContext
it evaluated in the scope of the SQL environment. It doesn't capture the closure. If you want to pass a variable you'll have to do it explicitly using string formatting:
您传递给SQLContext
它的字符串在 SQL 环境的范围内进行评估。它不捕获闭包。如果要传递变量,则必须使用字符串格式显式执行此操作:
df = sc.parallelize([(1, "foo"), (2, "x"), (3, "bar")]).toDF(("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()
## 2
Obviously this is not something you would use in a "real" SQL environment due to security considerations but it shouldn't matter here.
显然,出于安全考虑,这不是您将在“真实”SQL 环境中使用的东西,但在这里应该无关紧要。
In practice DataFrame
DSL is a much choice when you want to create dynamic queries:
实际上,DataFrame
当您要创建动态查询时,DSL 是一个不错的选择:
from pyspark.sql.functions import col
df.where(col("v").isin({"foo", "bar"})).count()
## 2
It is easy to build and compose and handles all details of HiveQL / Spark SQL for you.
很容易为您构建和组合并处理 HiveQL/Spark SQL 的所有细节。
回答by braj
reiterating what @zero323 has mentioned above : we can do the same thing using a list as well (not only set
)like below
重申@zero323 上面提到的内容:我们也可以使用列表(不仅如此set
)来做同样的事情,如下所示
from pyspark.sql.functions import col
df.where(col("v").isin(["foo", "bar"])).count()
回答by shwetabharti
Just a little addition/update:
只是一点点添加/更新:
choice_list = ["foo", "bar", "Hyman", "joan"]
If you want to filter your dataframe "df", such that you want to keep rows based upon a column "v" taking only the values from choice_list, then
如果你想过滤你的数据框“df”,这样你就想根据列“v”保留行,只取choice_list中的值,然后
df_filtered = df.where( ( col("v").isin (choice_list) ) )
回答by BICube
You can also do this for integer columns:
您也可以对整数列执行此操作:
df_filtered = df.filter("field1 in (1,2,3)")
or this for string columns:
或者这对于字符串列:
df_filtered = df.filter("field1 in ('a','b','c')")
回答by Alex_Gidiotis
A slightly different approach that worked for me is to filter with a custom filter function.
对我有用的略有不同的方法是使用自定义过滤器功能进行过滤。
def filter_func(a):
"""wrapper function to pass a in udf"""
def filter_func_(col):
"""filtering function"""
if col in a.value:
return True
return False
return udf(filter_func_, BooleanType())
# Broadcasting allows to pass large variables efficiently
a = sc.broadcast((1, 2, 3))
df = my_df.filter(filter_func(a)(col('field1'))) \