使用 Apache Spark 1.4.0 写入 Oracle 数据库
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31287182/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Writing to Oracle Database using Apache Spark 1.4.0
提问by Tolun Tosun
I am trying to write some data to our Oracle database using Spark 1.4.0 DataFrame.write.jdbc()function.
我正在尝试使用 Spark 1.4.0 DataFrame.write.jdbc()函数将一些数据写入我们的 Oracle 数据库。
The symmetric read.jdbc()function for reading data from Oracle Database to DataFrame objects works well. However while I am writing the dataframe back (I also tried to write exactly same object that I got from database setting CverWriteto true) gives the following exception:
用于从 Oracle 数据库读取数据到 DataFrame 对象的对称read.jdbc()函数运行良好。但是,当我写回数据帧时(我也尝试编写与我从数据库设置CverWrite为 true获得的对象完全相同的对象)给出以下异常:
Exception in thread "main" java.sql.SQLSyntaxErrorException: ORA-00902: Ungültiger Datentyp
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1017)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:655)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58)
at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:943)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1075)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3820)
at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3897)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1361)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:252)
at main3$.main(main3.scala:72)
at main3.main(main3.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
The table has 2 basic string columns. When they are Integer, it can also write it.
该表有 2 个基本字符串列。当它们是Integer时,它也可以写出来。
Actually when I go deeper, I realize that it maps StringTypeto "TEXT" which is not recognized by Oracle (should be "VARCHAR" instead). The code is following from jdbc.scala which may be found at GitHub:
实际上,当我深入了解时,我意识到它将StringType映射到 Oracle 无法识别的“TEXT”(应该是“VARCHAR”)。代码来自 jdbc.scala,可以在GitHub 上找到:
def schemaString(df: DataFrame, url: String): String = {
val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
df.schema.fields foreach { field => {
val name = field.name
val typ: String =
dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse(
field.dataType match {
case IntegerType => "INTEGER"
case LongType => "BIGINT"
case DoubleType => "DOUBLE PRECISION"
case FloatType => "REAL"
case ShortType => "INTEGER"
case ByteType => "BYTE"
case BooleanType => "BIT(1)"
case StringType => "TEXT"
case BinaryType => "BLOB"
case TimestampType => "TIMESTAMP"
case DateType => "DATE"
case DecimalType.Unlimited => "DECIMAL(40,20)"
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
})
val nullable = if (field.nullable) "" else "NOT NULL"
sb.append(s", $name $typ $nullable")
}}
if (sb.length < 2) "" else sb.substring(2)
}
So the question is am I mistaken somewhere or SparkSQL does not support Oracle and should I install a plug-in to use SparkSQL with Oracle?
所以问题是我在某处弄错了还是 SparkSQL 不支持 Oracle,我应该安装插件以将 SparkSQL 与 Oracle 一起使用吗?
My simple main is:
我的简单主要是:
val conf = new SparkConf().setAppName("Parser").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val reader = sqlContext.read
val frame = reader.jdbc(url,"STUDENTS",connectionprop)
frame.printSchema()
frame.show()
val row = Row("3","4")
val struct =
StructType(
StructField("ONE", StringType, true) ::
StructField("TWO", StringType, true) :: Nil)
val arr = Array(row)
val rddRow = sc.parallelize(arr)
val dframe = sqlContext.createDataFrame(rddRow,struct
)
dframe.printSchema()
dframe.show()
dframe.write.jdbc(url,"STUDENTS",connectionprop)
回答by MxR
Actual answer - it's not possible to write back to Oracle using existing DataFrame.write.jdbc() implementation in 1.4.0 But if you don't mind to upgrade to Spark 1.5 there is a little bit hackish way to do it. As described herethere are two problems:
实际答案 - 不可能使用 1.4.0 中现有的 DataFrame.write.jdbc() 实现写回 Oracle 但如果您不介意升级到 Spark 1.5,那么有一点黑客的方法来做到这一点。如此处所述,有两个问题:
easy one - spark way to check table existence is not compatible with oracle
简单的一种 - 检查表存在的火花方式与 oracle 不兼容
SELECT 1 FROM $table LIMIT 1
that can be easily avoided by direct save table utility method
可以通过直接保存表实用程序方法轻松避免
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(df, url, table, props)
and hard one (as you've properly guessed) - there is no Oracle specific data type dialect available out of the box. Adopted from the same article solution:
和困难的(正如您正确猜测的那样) - 没有开箱即用的 Oracle 特定数据类型方言。从同一篇文章解决方案中采用:
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.types._
val OracleDialect = new JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))
case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))
case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
// case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))
case _ => None
}
}
JdbcDialects.registerDialect(OracleDialect)
so, finally, working example should look similar to
所以,最后,工作示例应该类似于
val url: String = "jdbc:oracle:thin:@your_domain:1521/dbname"
val driver: String = "oracle.jdbc.OracleDriver"
val props = new java.util.Properties()
props.setProperty("user", "username")
props.setProperty("password", "userpassword")
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(dataFrame, url, "table_name", props)
回答by user3886907
Update : Starting from Spark 2.x
更新:从 Spark 2.x 开始
There is an issue wherein every columnName is double quoted in Spark while creating jdbc table and hence all the Oracle table columnNames becomes case sensitive when you try to query them via sqlPlus.
存在一个问题,即在创建 jdbc 表时,Spark 中的每个 columnName 都被双引号,因此当您尝试通过 sqlPlus 查询它们时,所有 Oracle 表 columnNames 都变得区分大小写。
select colA from myTable; => doesn't works anymore
select "colA" from myTable; => works
[Solution] Dataframe to Oracle creates table with case sensitive column
回答by ledong
you can useorg.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable
. just like Aerondir says.
你可以使用org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable
. 就像Aerondir说的那样。