scala 从 hive 表中获取数据到 spark 并在 RDD 上执行连接
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/26785672/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
fetch data from hive table into spark and perform join on RDDs
提问by user1189851
I have two tables in hive/impala. I want to fetch the data from the table into spark as rdds and perform say a join operation.
我在 hive/impala 中有两张桌子。我想将表中的数据作为 rdds 提取到 spark 中并执行连接操作。
I do not want to directly pass the join query in my hive context. This is just an example. I have more use cases that are not possible by a standard HiveQL. How do I fetch all rows, access the columns and perform transformation.
我不想在我的配置单元上下文中直接传递连接查询。这只是一个例子。我有更多标准 HiveQL 无法实现的用例。如何获取所有行、访问列并执行转换。
Suppose I have two rdds:
假设我有两个 rdd:
val table1 = hiveContext.hql("select * from tem1")
val table2 = hiveContext.hql("select * from tem2")
I want to perform a join on the rdds on a column called "account_id"
我想在名为“account_id”的列上对 rdds 执行连接
Ideally I want to do something like this using the rdds using spark shell.
理想情况下,我想使用 spark shell 使用 rdds 来做这样的事情。
select * from tem1 join tem2 on tem1.account_id=tem2.account_id;
回答by Daniel de Paula
I'm not sure I understood the question, but as an alternative you can use the API to join DataFrames, so you can have many things decided programatically (e.g. the joinfunction can be passed as parameter to a method that applies a custom transformation).
我不确定我是否理解了这个问题,但作为替代方案,您可以使用 API 来加入 DataFrame,因此您可以通过编程方式决定许多事情(例如,该join函数可以作为参数传递给应用自定义转换的方法)。
For your example, it would be like this:
对于您的示例,它会是这样的:
val table1 = hiveContext.sql("select * from tem1")
val table2 = hiveContext.sql("select * from tem2")
val common_attributes = Seq("account_id")
val joined = table1.join(table2, common_attributes)
There are many common transformations available in the DataFrame API: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
DataFrame API 中有许多常见的转换可用:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
Cheers
干杯
回答by Holden
So we could register table1 and table2 as temporary tables and then do the join on these temporary tables.
所以我们可以将 table1 和 table2 注册为临时表,然后对这些临时表进行连接。
table1.registerTempTable("t1")
table2.registerTempTable("t2")
table3 = hiveContext.hql("select * from t1 join t2 on t1.account_id=t2.account_id")
回答by Blaubaer
table1 and table2 are of type DataFrame. It is possible to transform them into rdd's using:
table1 和 table2 是 DataFrame 类型。可以使用以下方法将它们转换为 rdd:
lazy val table1_rdd = table1.rdd
lazy val table2_rdd = table2.rdd
This should to the trick. On these rdd's you can use whatever rdd operation.
这应该是诀窍。在这些 rdd 上,您可以使用任何 rdd 操作。
See also: https://issues.apache.org/jira/browse/SPARK-6608and https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
另见:https: //issues.apache.org/jira/browse/SPARK-6608和https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql .DataFrame
回答by BadBoy777
You can directly select that column which you want from following code:
您可以直接从以下代码中选择所需的列:
val table1 = hiveContext.hql("select account_id from tem1")
val table2 = hiveContext.hql("select account_id from tem2")
val joinedTable = table1.join(table2)

