Python 如何将数组(即列表)列转换为 Vector
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/42138482/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do I convert an array (i.e. list) column to Vector
提问by Arthur Tacca
Short version of the question!
问题的简短版本!
Consider the following snippet (assuming spark
is already set to some SparkSession
):
考虑以下代码段(假设spark
已经设置为 some SparkSession
):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
Notice that the temperatures field is a list of floats. I would like to convert these lists of floats to the MLlib type Vector
, and I'd like this conversion to be expressed using the basic DataFrame
API rather than going via RDDs (which is inefficient because it sends all data from the JVM to Python, the processing is done in Python, we don't get the benefits of Spark's Catalyst optimizer, yada yada). How do I do this? Specifically:
请注意,温度字段是一个浮点数列表。我想将这些浮点数列表转换为 MLlib 类型Vector
,并且我希望使用基本DataFrame
API 而不是通过 RDD来表达这种转换(这是低效的,因为它将所有数据从 JVM 发送到 Python,处理是用 Python 完成的,我们没有得到 Spark 的 Catalyst 优化器 yada yada 的好处)。我该怎么做呢?具体来说:
- Is there a way to get a straight cast working? See below for details (and a failed attempt at a workaround)? Or, is there any other operation that has the effect I was after?
- Which is more efficient out of the two alternative solutions I suggest below (UDF vs exploding/reassembling the items in the list)? Or are there any other almost-but-not-quite-right alternatives that are better than either of them?
- 有没有办法让直接演员工作?请参阅下面的详细信息(以及尝试解决方法失败)?或者,是否还有任何其他操作具有我所追求的效果?
- 我在下面建议的两种替代解决方案中哪个更有效(UDF 与爆炸/重新组装列表中的项目)?或者还有其他几乎但不完全正确的替代方案比它们中的任何一个都更好吗?
A straight cast doesn't work
直接演员不起作用
This is what I would expect to be the "proper" solution. I want to convert the type of a column from one type to another, so I should use a cast. As a bit of context, let me remind you of the normal way to cast it to another type:
这就是我所期望的“正确”解决方案。我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换。作为上下文,让我提醒您将其转换为另一种类型的正常方法:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Now e.g. df_with_strings.collect()[0]["temperatures"][1]
is '-7.0'
. But if I cast to an ml Vector then things do not go so well:
现在例如df_with_strings.collect()[0]["temperatures"][1]
是'-7.0'
。但是,如果我转换为 ml Vector,那么事情就不会那么顺利:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
This gives an error:
这给出了一个错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"
Yikes! Any ideas how to fix this?
哎呀!任何想法如何解决这一问题?
Possible alternatives
可能的选择
Alternative 1: Using VectorAssembler
备选方案 1:使用 VectorAssembler
There is a Transformer
that seems almost ideal for this job: the VectorAssembler
. It takes one or more columns and concatenates them into a single vector. Unfortunately it only takes Vector
and Float
columns, not Array
columns, so the follow doesn't work:
有一个Transformer
看起来几乎是这项工作的理想选择:VectorAssembler
. 它需要一列或多列并将它们连接成一个向量。不幸的是,它只需要Vector
和Float
列,而不是Array
列,所以以下不起作用:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
It gives this error:
它给出了这个错误:
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
The best work around I can think of is to explode the list into multiple columns and then use the VectorAssembler
to collect them all back up again:
我能想到的最好的解决方法是将列表分解为多列,然后使用 将VectorAssembler
它们全部收集起来:
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")
This seems like it would be ideal, except that TEMPERATURE_COUNT
be more than 100, and sometimes more than 1000. (Another problem is that the code would be more complicated if you don't know the size of the array in advance, although that is not the case for my data.) Does Spark actually generate an intermediate data set with that many columns, or does it just consider this an intermediate step that individual items pass through transiently (or indeed does it optimise this away step entirely when it sees that the only use of these columns is to be assembled into a vector)?
这似乎是理想的,除了TEMPERATURE_COUNT
超过 100,有时超过 1000。(另一个问题是,如果您事先不知道数组的大小,代码会更复杂,尽管这不是我的数据的情况。)Spark 是否真的生成了一个包含那么多列的中间数据集,或者它只是认为这是单个项目暂时通过的中间步骤(或者当它看到只有使用这些列才能组装成一个向量)?
Alternative 2: use a UDF
备选方案 2:使用 UDF
A rather simpler alternative is to use a UDF to do the conversion. This lets me express quite directly what I want to do in one line of code, and doesn't require making a data set with a crazy number of columns. But all that data has to be exchanged between Python and the JVM, and every individual number has to be handled by Python (which is notoriously slow for iterating over individual data items). Here is how that looks:
一个相当简单的替代方法是使用 UDF 进行转换。这让我可以在一行代码中非常直接地表达我想要做的事情,并且不需要制作包含大量列的数据集。但是所有这些数据都必须在 Python 和 JVM 之间交换,并且每个单独的数字都必须由 Python 处理(众所周知,Python 迭代单个数据项的速度很慢)。这是它的外观:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)
Ignorable remarks
可忽略的言论
The remaining sections of this rambling question are some extra things I came up with while trying to find an answer. They can probably be skipped by most people reading this.
这个漫无边际的问题的其余部分是我在试图找到答案时想到的一些额外的东西。大多数阅读本文的人可能会跳过它们。
Not a solution: use Vector
to begin with
不是解决方案:使用Vector
开始
In this trivial example it's possible to create the data using the vector type to begin with, but of course my data isn't really a Python list that I'm parallelizing, but instead is being read from a data source. But for the record, here is how that would look:
在这个简单的示例中,可以使用向量类型开始创建数据,但当然,我的数据实际上并不是我正在并行化的 Python 列表,而是从数据源中读取的。但为了记录,这里是这样的:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
Inefficient solution: use map()
低效解决方案:使用 map()
One possibility is to use the RDD map()
method to transform the list to a Vector
. This is similar to the UDF idea, except that its even worse because the cost of serialisation etc. is incurred for all the fields in each row, not just the one being operated on. For the record, here's what that solution would look like:
一种可能性是使用 RDDmap()
方法将列表转换为Vector
. 这类似于 UDF 的想法,除了它更糟糕,因为序列化等的成本是针对每一行中的所有字段产生的,而不仅仅是被操作的字段。作为记录,以下是该解决方案的样子:
df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()
Failed attempt at a workaround for cast
尝试演员的变通方法失败
In desperation, I noticed that Vector
is represented internally by a struct with four fields, but using a traditional cast from that type of struct doesn't work either. Here is an illustration (where I built the struct using a udf but the udf isn't the important part):
无奈之下,我注意到它Vector
在内部由具有四个字段的结构表示,但使用该类型结构的传统强制转换也不起作用。这是一个插图(我使用 udf 构建了结构,但 udf 不是重要的部分):
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)
This gives the error:
这给出了错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
采纳答案by zero323
Personally I would go with Python UDF and wouldn't bother with anything else:
就我个人而言,我会使用 Python UDF 而不会打扰其他任何事情:
Vectors
are not native SQL types so there will be performance overhead one way or another. In particular this process requires two steps where data is first converted from external type to row, and then from row to internal representation using genericRowEncoder
.- Any downstream ML
Pipeline
will be much more expensive than a simple conversion. Moreover it requires a process which opposite to the one described above
Vectors
不是本机 SQL 类型,因此会以一种或另一种方式存在性能开销。特别是这个过程需要两个步骤,首先将数据从外部类型转换为行,然后使用泛型从行转换为内部表示RowEncoder
。- 任何下游 ML
Pipeline
都将比简单的转换昂贵得多。此外,它需要一个与上述过程相反的过程
But if you really want other options here you are:
但是,如果您真的想要其他选择,您可以:
Scala UDF with Python wrapper:
Install sbtfollowing the instructions on the project site.
Create Scala package with following structure:
. ├── build.sbt └── udfs.scala
Edit
build.sbt
(adjust to reflect Scala and Spark version):scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.4.4", "org.apache.spark" %% "spark-mllib" % "2.4.4" )
Edit
udfs.scala
:package com.example.spark.udfs import org.apache.spark.sql.functions.udf import org.apache.spark.ml.linalg.DenseVector object udfs { val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray)) }
Package:
sbt package
and include (or equivalent depending on Scala version):
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
as an argument for
--driver-class-path
when starting shell / submitting application.In PySpark define a wrapper:
from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext def as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs.as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
Test:
with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show()
+--------+------------------+----------------+ | city| temperatures| vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_vec.printSchema()
root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- vector: vector (nullable = true)
Dump data to a JSON format reflecting
DenseVector
schema and read it back:from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT json_vec = to_json(struct(struct( lit(1).alias("type"), # type 1 is dense, type 0 is sparse col("temperatures").alias("values") ).alias("v"))) schema = StructType([StructField("v", VectorUDT())]) with_parsed_vector = df.withColumn( "parsed_vector", from_json(json_vec, schema).getItem("v") ) with_parsed_vector.show()
+--------+------------------+----------------+ | city| temperatures| parsed_vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+
with_parsed_vector.printSchema()
root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- parsed_vector: vector (nullable = true)
带有 Python 包装器的 Scala UDF:
按照项目站点上的说明安装sbt。
创建具有以下结构的 Scala 包:
. ├── build.sbt └── udfs.scala
编辑
build.sbt
(调整以反映 Scala 和 Spark 版本):scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.4.4", "org.apache.spark" %% "spark-mllib" % "2.4.4" )
编辑
udfs.scala
:package com.example.spark.udfs import org.apache.spark.sql.functions.udf import org.apache.spark.ml.linalg.DenseVector object udfs { val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray)) }
包裹:
sbt package
并包括(或等效的,取决于 Scala 版本):
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
作为
--driver-class-path
启动外壳程序/提交应用程序时的参数。在 PySpark 中定义一个包装器:
from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext def as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs.as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
测试:
with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show()
+--------+------------------+----------------+ | city| temperatures| vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_vec.printSchema()
root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- vector: vector (nullable = true)
将数据转储为反映
DenseVector
架构的 JSON 格式并将其读回:from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT json_vec = to_json(struct(struct( lit(1).alias("type"), # type 1 is dense, type 0 is sparse col("temperatures").alias("values") ).alias("v"))) schema = StructType([StructField("v", VectorUDT())]) with_parsed_vector = df.withColumn( "parsed_vector", from_json(json_vec, schema).getItem("v") ) with_parsed_vector.show()
+--------+------------------+----------------+ | city| temperatures| parsed_vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+
with_parsed_vector.printSchema()
root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- parsed_vector: vector (nullable = true)
回答by GGDammy
I had a same problem like you and I did this way. This way includes RDD transformation, so is not performance critical, but it works.
我和你有同样的问题,我就是这样做的。这种方式包括 RDD 转换,因此性能不是关键,但它有效。
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])
new_df
the result is,
结果是,
DataFrame[city: string, temperatures: vector]