database Apache Spark 的主键
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33102727/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Primary keys with Apache Spark
提问by Nhor
I am having a JDBC connection with Apache Spark and PostgreSQL and I want to insert some data into my database. When I use append
mode I need to specify id
for each DataFrame.Row
. Is there any way for Spark to create primary keys?
我与 Apache Spark 和 PostgreSQL 建立了 JDBC 连接,我想将一些数据插入到我的数据库中。当我使用append
模式时,我需要id
为每个DataFrame.Row
. Spark有没有办法创建主键?
回答by zero323
Scala:
斯卡拉:
If all you need is unique numbers you can use zipWithUniqueId
and recreate DataFrame. First some imports and dummy data:
如果您只需要唯一的数字,您可以使用zipWithUniqueId
并重新创建 DataFrame。首先是一些进口和虚拟数据:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
Extract schema for further usage:
提取架构以供进一步使用:
val schema = df.schema
Add id field:
添加id字段:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
Create DataFrame:
创建数据框:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
The same thing in Python:
在Python 中同样的事情:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
If you prefer consecutive number your can replace zipWithUniqueId
with zipWithIndex
but it is a little bit more expensive.
如果您更喜欢连续号码,您可以更换zipWithUniqueId
,zipWithIndex
但它会贵一点。
Directly with DataFrame
API:
直接使用DataFrame
API:
(universal Scala, Python, Java, R with pretty much the same syntax)
(通用 Scala、Python、Java、R,语法几乎相同)
Previously I've missed monotonicallyIncreasingId
function which should work just fine as long as you don't require consecutive numbers:
以前我错过了monotonicallyIncreasingId
只要您不需要连续数字就可以正常工作的功能:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
While useful monotonicallyIncreasingId
is non-deterministic. Not only ids may be different from execution to execution but without additional tricks cannot be used to identify rows when subsequent operations contain filters.
虽然有用monotonicallyIncreasingId
是不确定的。不仅 id 可能因执行而异,而且当后续操作包含过滤器时,如果没有额外的技巧,则不能用于识别行。
Note:
注意:
It is also possible to use rowNumber
window function:
也可以使用rowNumber
窗口函数:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Unfortunately:
很遗憾:
WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
警告窗口:没有为窗口操作定义分区!将所有数据移动到单个分区,这会导致严重的性能下降。
So unless you have a natural way to partition your data and ensure uniqueness is not particularly useful at this moment.
因此,除非您有一种自然的方法来分区数据并确保唯一性,否则此时并不是特别有用。
回答by Allyn
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn("id", monotonically_increasing_id()).show()
Note that the 2nd argument of df.withColumn is monotonically_increasing_id() not monotonically_increasing_id .
请注意, df.withColumn 的第二个参数是 monotonically_increasing_id() 而不是 monotonically_increasing_id 。
回答by rocconnick
I found the following solution to be relatively straightforward for the case where zipWithIndex() is the desired behavior, i.e. for those desirng consecutive integers.
我发现以下解决方案对于 zipWithIndex() 是所需行为的情况相对简单,即对于那些需要连续整数的情况。
In this case, we're using pyspark and relying on dictionary comprehension to map the original row object to a new dictionary which fits a new schema including the unique index.
在这种情况下,我们使用 pyspark 并依靠字典理解将原始行对象映射到适合包括唯一索引在内的新模式的新字典。
# read the initial dataframe without index
dfNoIndex = sqlContext.read.parquet(dataframePath)
# Need to zip together with a unique integer
# First create a new schema with uuid field appended
newSchema = StructType([StructField("uuid", IntegerType(), False)]
+ dfNoIndex.schema.fields)
# zip with the index, map it to a dictionary which includes new field
df = dfNoIndex.rdd.zipWithIndex()\
.map(lambda (row, id): {k:v
for k, v
in row.asDict().items() + [("uuid", id)]})\
.toDF(newSchema)