如何使用 MySQL 和 Apache Spark?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/27718382/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to work with MySQL and Apache Spark?
提问by pangkaj paul
I want to run my existing application with Apache Spark and MySQL.
我想用 Apache Spark 和 MySQL 运行我现有的应用程序。
回答by cherah30
From pySpark, it work for me :
从 pySpark,它对我有用:
dataframe_mysql = mySqlContext.read.format("jdbc").options(
url="jdbc:mysql://localhost:3306/my_bd_name",
driver = "com.mysql.jdbc.Driver",
dbtable = "my_tablename",
user="root",
password="root").load()
回答by Liam
With spark 2.0.x,you can use DataFrameReader and DataFrameWriter. Use SparkSession.read to access DataFrameReader and use Dataset.write to access DataFrameWriter.
在 spark 2.0.x 中,您可以使用 DataFrameReader 和 DataFrameWriter。使用 SparkSession.read 访问 DataFrameReader,使用 Dataset.write 访问 DataFrameWriter。
Suppose using spark-shell.
假设使用 spark-shell。
read example
阅读示例
val prop=new java.util.Properties()
prop.put("user","username")
prop.put("password","yourpassword")
val url="jdbc:mysql://host:port/db_name"
val df=spark.read.jdbc(url,"table_name",prop)
df.show()
read example 2
阅读示例 2
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
from spark doc
来自火花文档
read example3
阅读示例3
If you want to read data from a query result rather than a table.
如果要从查询结果而不是表中读取数据。
val sql="""select * from db.your_table where id>1"""
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql:dbserver")
.option("dbtable", s"( $sql ) t")
.option("user", "username")
.option("password", "password")
.load()
write example
写例子
import org.apache.spark.sql.SaveMode
val prop=new java.util.Properties()
prop.put("user","username")
prop.put("password","yourpassword")
val url="jdbc:mysql://host:port/db_name"
//df is a dataframe contains the data which you want to write.
df.write.mode(SaveMode.Append).jdbc(url,"table_name",prop)
回答by Lokesh
Using Scala, this worked for me :Use the commands below:
使用 Scala,这对我有用:使用以下命令:
sudo -u root spark-shell --jars /mnt/resource/lokeshtest/guava-12.0.1.jar,/mnt/resource/lokeshtest/hadoop-aws-2.6.0.jar,/mnt/resource/lokeshtest/aws-java-sdk-1.7.3.jar,/mnt/resource/lokeshtest/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar --packages com.databricks:spark-csv_2.10:1.2.0
import org.apache.spark.sql.SQLContext
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
val dataframe_mysql = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://Public_IP:3306/DB_NAME").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "tblage").option("user", "sqluser").option("password", "sqluser").load()
dataframe_mysql.show()
回答by jstuartmill
For Scala if you use the sbt
this will also work.
对于 Scala,如果您使用它,sbt
这也将起作用。
In your build.sbt
file:
在您的build.sbt
文件中:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.6.2",
"org.apache.spark" %% "spark-sql" % "1.6.2",
"org.apache.spark" %% "spark-mllib" % "1.6.2",
"mysql" % "mysql-connector-java" % "5.1.12"
)
Then you just need to declare your usage of the driver.
然后你只需要声明你对驱动程序的使用。
Class.forName("com.mysql.jdbc.Driver").newInstance
val conf = new SparkConf().setAppName("MY_APP_NAME").setMaster("MASTER")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val data = sqlContext.read
.format("jdbc")
.option("url", "jdbc:mysql://<HOST>:3306/<database>")
.option("user", <USERNAME>)
.option("password", <PASSWORD>)
.option("dbtable", "MYSQL_QUERY")
.load()
回答by Jatin
public static void main(String[] args) {
Map<String, String> options = new HashMap<String, String>();
options.put("url","jdbc:postgresql://<DBURL>:<PORT>/<Database>?user=<UserName>&password=<Password>");
options.put("dbtable", "<TableName>");
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("DBConnection").setMaster("local[*]"));
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// DataFrame jdbcDF = sqlContext.load("jdbc", options).cache();
DataFrame jdbcDF = sqlContext.jdbc(options.get("url"),options.get("dbtable"));
System.out.println("Data------------------->" + jdbcDF.toJSON().first());
Row[] rows = jdbcDF.collect();
System.out.println("Without Filter \n ------------------------------------------------- ");
for (Row row2 : rows) {
System.out.println(row2.toString());
}
System.out.println("Filter Data\n ------------------------------------------------- ");
jdbcDF = jdbcDF.select("agency_id","route_id").where(jdbcDF.col("route_id").$less$eq(3));
rows = jdbcDF.collect();
for (Row row2 : rows) {
System.out.println(row2.toString());
}
}
回答by D Qiang
For Java(using maven), add spark dependencies and sql driver dependencies in your pom.xml file,
对于 Java(使用 maven),在 pom.xml 文件中添加 spark 依赖项和 sql 驱动程序依赖项,
<properties>
<java.version>1.8</java.version>
<spark.version>1.6.3</spark.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
Sample code, suppose your mysql locates at local, database nameis test, user nameis rootand passwordis password, and two tables in test db are table1and table2
示例代码,在本地假设你的mysql所处,数据库名是测试,用户名是根和密码是密码,并在测试数据库的两个表是表1和表2
SparkConf sparkConf = new SparkConf();
SparkContext sc = new SparkContext("local", "spark-mysql-test", sparkConf);
SQLContext sqlContext = new SQLContext(sc);
// here you can run sql query
String sql = "(select * from table1 join table2 on table1.id=table2.table1_id) as test_table";
// or use an existed table directly
// String sql = "table1";
DataFrame dataFrame = sqlContext
.read()
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true")
.option("user", "root")
.option("password", "password")
.option("dbtable", sql)
.load();
// continue your logical code
......
回答by EpicPandaForce
For Java, this worked for me:
对于 Java,这对我有用:
@Bean
public SparkConf sparkConf() {
SparkConf sparkConf = new SparkConf()
.setAppName(appName)
.setSparkHome(sparkHome)
.setMaster(masterUri);
return sparkConf;
}
@Bean
public JavaSparkContext javaSparkContext() {
return new JavaSparkContext(sparkConf());
}
@Bean
public SparkSession sparkSession() {
return SparkSession
.builder()
.sparkContext(javaSparkContext().sc())
.appName("Java Spark SQL basic example")
.getOrCreate();
}
Properties properties = new Properties();
properties.put("user", "root");
properties.put("password", "root");
properties.put("driver", "com.mysql.cj.jdbc.Driver");
sparkSession.read()
.jdbc("jdbc:mysql://localhost:3306/books?useSSL=false", "(SELECT books.BOOK_ID as BOOK_ID, books.BOOK_TITLE as BOOK_TITLE, books.BOOK_AUTHOR as BOOK_AUTHOR, borrowers.BORR_NAME as BORR_NAME FROM books LEFT OUTER JOIN borrowers ON books.BOOK_ID = borrowers.BOOK_ID) as t", properties) // join example
.show();
of course, for MySQL, I needed the connector:
当然,对于 MySQL,我需要连接器:
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
And I get
我得到
+-------+------------------+--------------+---------------+
|BOOK_ID| BOOK_TITLE| BOOK_AUTHOR| BORR_NAME|
+-------+------------------+--------------+---------------+
| 1| Gy?r? kúra|J.R.K. Tolkien| Sára Sarolta|
| 2| Kecske-eledel| Mekk Elek|Maláta Melchior|
| 3| Répás tészta| Vegán Eleazár| null|
| 4|Krumpli és pityóka| Farmer Emília| null|
+-------+------------------+--------------+---------------+
回答by Gábor Bakos
Based on this infoobjects articletry the following (assuming Java or Scala, not sure how this would work with python):
基于这篇infoobjects 文章,请尝试以下操作(假设使用 Java 或 Scala,不确定这将如何与 python 一起使用):
- add the mysql-connector-javato the path of your spark cluster
- initialize the driver:
Class.forName("com.mysql.jdbc.Driver")
- create a JdbcRDDdata source:
- 将mysql-connector-java添加到Spark集群的路径中
- 初始化驱动程序:
Class.forName("com.mysql.jdbc.Driver")
- 创建一个JdbcRDD数据源:
val myRDD = new JdbcRDD( sc, () =>
DriverManager.getConnection(url,username,password),
"select first_name,last_name,gender from person limit ?, ?",
1,//lower bound
5,//upper bound
2,//number of partitions
r =>
r.getString("last_name") + ", " + r.getString("first_name"))
回答by Liran Brimer
val query: String =
"select col1, col2 from schema.table_name where condition"
val url= "jdbc:mysql://<ip>:3306/<schema>"
val username = ""
val password = ""
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.load("jdbc", Map(
"url" -> (url + "/?user=" + username + "&password=" + password),
"dbtable" -> s"($query) as tbl",
"driver" -> "com.mysql.jdbc.Driver"))
df.show()
回答by Martin
For Spark 2.1.0 and Scala (On Windows 7 OS), below code works pretty fine for me:
对于 Spark 2.1.0 和 Scala(在 Windows 7 操作系统上),下面的代码对我来说效果很好:
import org.apache.spark.sql.SparkSession
object MySQL {
def main(args: Array[String]) {
//At first create a Spark Session as the entry point of your app
val spark:SparkSession = SparkSession
.builder()
.appName("JDBC")
.master("local[*]")
.config("spark.sql.warehouse.dir", "C:/Exp/")
.getOrCreate();
val dataframe_mysql = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost/feedback")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "person") //replace with own
.option("user", "root") //replace with own
.option("password", "vertrigo") // replace with own
.load()
dataframe_mysql.show()
}
}