MySQL 从 SQL 查询创建 Spark 数据帧
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38376307/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Create Spark Dataframe from SQL Query
提问by opus111
I'm sure this is a simple SQLContext question, but I can't find any answer in the Spark docs or Stackoverflow
我确定这是一个简单的 SQLContext 问题,但我在 Spark 文档或 Stackoverflow 中找不到任何答案
I want to create a Spark Dataframe from a SQL Query on MySQL
我想从 MySQL 上的 SQL 查询创建一个 Spark 数据帧
For example, I have a complicated MySQL query like
例如,我有一个复杂的 MySQL 查询,如
SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...
and I want a Dataframe with Columns X,Y and Z
我想要一个包含 X、Y 和 Z 列的数据框
I figured out how to load entire tables into Spark, and I could load them all, and then do the joining and selection there. However, that is very inefficient. I just want to load the table generated by my SQL query.
我想出了如何将整个表加载到 Spark 中,我可以将它们全部加载,然后在那里进行连接和选择。然而,这是非常低效的。我只想加载由我的 SQL 查询生成的表。
Here is my current approximation of the code, that doesn't work. Mysql-connector has an option "dbtable" that can be used to load a whole table. I am hoping there is some way to specify a query
这是我目前的代码近似值,它不起作用。Mysql-connector 有一个选项“dbtable”,可用于加载整个表。我希望有某种方式来指定查询
val df = sqlContext.format("jdbc").
option("url", "jdbc:mysql://localhost:3306/local_content").
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("continueBatchOnError","true").
option("useSSL", "false").
option("user", "root").
option("password", "").
sql(
"""
select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100
"""
).load()
回答by opus111
I found this here Bulk data migration through Spark SQL
我在这里找到了通过 Spark SQL 进行批量数据迁移
The dbname parameter can be any query wrapped in parenthesis with an alias. So in my case, I need to do this:
dbname 参数可以是用别名括在括号中的任何查询。所以就我而言,我需要这样做:
val query = """
(select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100) foo
"""
val df = sqlContext.format("jdbc").
option("url", "jdbc:mysql://localhost:3306/local_content").
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("continueBatchOnError","true").
option("useSSL", "false").
option("user", "root").
option("password", "").
option("dbtable",query).
load()
As expected, loading each table as its own Dataframe and joining them in Spark was very inefficient.
正如预期的那样,将每个表作为自己的 Dataframe 加载并将它们加入 Spark 的效率非常低。
回答by Alberto Bonsanto
If you have your table
already registered in your SQLContext, you could simply use sql
method.
如果您table
已经在SQLContext 中注册,则可以简单地使用sql
方法。
val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
回答by Kristian
to save the output of a query to a new dataframe, simple set the result equal to a variable:
要将查询的输出保存到新的数据帧,只需将结果设置为一个变量即可:
val newDataFrame = spark.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
and now newDataFrame
is a dataframe with all the dataframe functionalities available to it.
现在newDataFrame
是一个具有所有可用数据框功能的数据框。
回答by WillemM
TL;DR:just create a view in your database.
TL;DR:只需在您的数据库中创建一个视图。
Detail:I have a table t_city in my postgres database, on which I create a view:
详细信息:我的 postgres 数据库中有一个表 t_city,我在该表上创建了一个视图:
create view v_city_3500 as
select asciiname, country, population, elevation
from t_city
where elevation>3500
and population>100000
select * from v_city_3500;
asciiname | country | population | elevation
-----------+---------+------------+-----------
Potosi | BO | 141251 | 3967
Oruro | BO | 208684 | 3936
La Paz | BO | 812799 | 3782
Lhasa | CN | 118721 | 3651
Puno | PE | 116552 | 3825
Juliaca | PE | 245675 | 3834
In the spark-shell:
在火花壳中:
val sx= new org.apache.spark.sql.SQLContext(sc)
var props=new java.util.Properties()
props.setProperty("driver", "org.postgresql.Driver" )
val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"
val city_df=sx.read.jdbc(url=url,table="t_city",props)
val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)
Result:
结果:
city_df.count()
Long = 145725
city_3500_df.count()
Long = 6
回答by Santhosh Hirekerur
with MYSQL read/loading data something like below
使用 MYSQL 读取/加载数据,如下所示
val conf = new SparkConf().setAppName("SparkMe Application").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password",
"dbtable" -> "TABLE_NAME")).load()
write data to table as below
将数据写入表,如下所示
import java.util.Properties
val prop = new Properties()
prop.put("user", "<>")
prop.put("password", "simple3")
val dfWriter = jdbcDF.write.mode("append")
dfWriter.jdbc("jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password", "tableName", prop)
to create dataframe from query do something like below
要从查询创建数据框,请执行以下操作
val finalModelDataDF = {
val query = "select * from table_name"
sqlContext.sql(query)
};
finalModelDataDF.show()