scala 基于 Spark 中另一个 RDD 的过滤器

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/26214112/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:36:51  来源:igfitidea点击:

Filter based on another RDD in Spark

pythonscalaapache-spark

提问by poiuytrez

I would like to keep only the employees which does have a departement ID referenced in the second table.

我只想保留在第二个表中引用了部门 ID 的员工。

Employee table
LastName    DepartmentID
Rafferty    31
Jones   33
Heisenberg  33
Robinson    34
Smith   34

Department table
DepartmentID
31  
33  

I have tried the following code which does not work:

我已经尝试了以下不起作用的代码:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()

Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

Any ideas? I am using Spark 1.1.0 with Python. However, I would accept a Scala or Python answer.

有任何想法吗?我在 Python 中使用 Spark 1.1.0。但是,我会接受 Scala 或 Python 的答案。

回答by maasg

In this case, what you would like to achieve is to filter at each partition with the data contained in the department table: This would be the basic solution:

在这种情况下,您想要实现的是使用部门表中包含的数据在每个分区进行过滤:这将是基本解决方案:

val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)}

If your department data is large, a broadcast variable will improve performance by delivering the data once to all the nodes instead of having to serialize it with each task

如果您的部门数据很大,广播变量将通过将数据一次传递给所有节点而不是每个任务序列化它来提高性能

val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)}

Although using join would work, it's a very expensive solution as it will require a distributed shuffle of the data (byKey) to achieve the join. Given that the requirement is a simple filter, sending the data to each partition (as shown above) will provide much better performance.

尽管使用 join 可以工作,但它是一个非常昂贵的解决方案,因为它需要数据的分布式混洗(byKey)来实现连接。鉴于要求是一个简单的过滤器,将数据发送到每个分区(如上所示)将提供更好的性能。

回答by poiuytrez

I finally implemented a solution using a join. I had to add a 0 value to the department to avoid an exception from Spark:

我最终使用连接实现了一个解决方案。我不得不向部门添加一个 0 值以避免 Spark 的异常:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
# invert id and name to get id as the key
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0]))
# add a 0 value to avoid an exception
department = sc.parallelize(department).map(lambda d: (d,0))

employee.join(department).map(lambda e: (e[1][0], e[0])).collect()

output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)]

回答by WaveRider

Filtering multiple values in multiple columns:

过滤多列中的多个值:

In the case where you're pulling data from a database (Hive or SQL type db for this example) and need to filter on multiple columns, it might just be easier to load the table with the first filter, then iterate your filters through the RDD (multiple small iterations is the encouraged way of Spark programming):

如果您从数据库中提取数据(本例中为 Hive 或 SQL 类型 db)并需要对多个列进行过滤,则使用第一个过滤器加载表可能会更容易,然后通过RDD(多次小迭代是鼓励的 Spark 编程方式):

{
    import org.apache.spark.sql.hive.HiveContext
    val hc = new HiveContext(sc)

    val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)")
    val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20")
    val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500")

}

Of course you have to know your data a little bit to filter on the right values, but that's part of the analysis process.

当然,您必须稍微了解您的数据才能筛选出正确的值,但这是分析过程的一部分。

回答by yechiel

for the same exm above, I would like to keep only the employees which contained or in a departement ID referenced in the second table. but it has to be no join operation, i would to see it in "contained" or "in", i mean 33 is "in" 334 and 335

对于上面的同一个 exm,我只想保留包含或在第二个表中引用的部门 ID 中的员工。但它必须没有连接操作,我会在“包含”或“中”中看到它,我的意思是 33 是​​“在”334 和 335

employee = [['Raffery',311], ['Jones',334], ['Heisenberg',335], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)