Python 如何将数组(即列表)列转换为 Vector

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/42138482/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 21:17:48  来源:igfitidea点击:

How do I convert an array (i.e. list) column to Vector

pythonapache-sparkpysparkapache-spark-sqlapache-spark-ml

提问by Arthur Tacca

Short version of the question!

问题的简短版本!

Consider the following snippet (assuming sparkis already set to some SparkSession):

考虑以下代码段(假设spark已经设置为 some SparkSession):

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

Notice that the temperatures field is a list of floats. I would like to convert these lists of floats to the MLlib type Vector, and I'd like this conversion to be expressed using the basic DataFrameAPI rather than going via RDDs (which is inefficient because it sends all data from the JVM to Python, the processing is done in Python, we don't get the benefits of Spark's Catalyst optimizer, yada yada). How do I do this? Specifically:

请注意,温度字段是一个浮点数列表。我想将这些浮点数列表转换为 MLlib 类型Vector,并且我希望使用基本DataFrameAPI 而不是通过 RDD来表达这种转换(这是低效的,因为它将所有数据从 JVM 发送到 Python,处理是用 Python 完成的,我们没有得到 Spark 的 Catalyst 优化器 yada yada 的好处)。我该怎么做呢?具体来说:

  1. Is there a way to get a straight cast working? See below for details (and a failed attempt at a workaround)? Or, is there any other operation that has the effect I was after?
  2. Which is more efficient out of the two alternative solutions I suggest below (UDF vs exploding/reassembling the items in the list)? Or are there any other almost-but-not-quite-right alternatives that are better than either of them?
  1. 有没有办法让直接演员工作?请参阅下面的详细信息(以及尝试解决方法失败)?或者,是否还有任何其他操作具有我所追求的效果?
  2. 我在下面建议的两种替代解决方案中哪个更有效(UDF 与爆炸/重新组装列表中的项目)?或者还有其他几乎但不完全正确的替代方案比它们中的任何一个都更好吗?

A straight cast doesn't work

直接演员不起作用

This is what I would expect to be the "proper" solution. I want to convert the type of a column from one type to another, so I should use a cast. As a bit of context, let me remind you of the normal way to cast it to another type:

这就是我所期望的“正确”解决方案。我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换。作为上下文,让我提醒您将其转换为另一种类型的正常方法:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

Now e.g. df_with_strings.collect()[0]["temperatures"][1]is '-7.0'. But if I cast to an ml Vector then things do not go so well:

现在例如df_with_strings.collect()[0]["temperatures"][1]'-7.0'。但是,如果我转换为 ml Vector,那么事情就不会那么顺利:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

This gives an error:

这给出了一个错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

Yikes! Any ideas how to fix this?

哎呀!任何想法如何解决这一问题?

Possible alternatives

可能的选择

Alternative 1: Using VectorAssembler

备选方案 1:使用 VectorAssembler

There is a Transformerthat seems almost ideal for this job: the VectorAssembler. It takes one or more columns and concatenates them into a single vector. Unfortunately it only takes Vectorand Floatcolumns, not Arraycolumns, so the follow doesn't work:

有一个Transformer看起来几乎是这项工作的理想选择:VectorAssembler. 它需要一列或多列并将它们连接成一个向量。不幸的是,它只需要VectorFloat列,而不是Array列,所以以下不起作用:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)

It gives this error:

它给出了这个错误:

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

The best work around I can think of is to explode the list into multiple columns and then use the VectorAssemblerto collect them all back up again:

我能想到的最好的解决方法是将列表分解为多列,然后使用 将VectorAssembler它们全部收集起来:

from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], 
    outputCol="temperature_vector"
)
df_exploded = df.select(
    df["city"], 
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

This seems like it would be ideal, except that TEMPERATURE_COUNTbe more than 100, and sometimes more than 1000. (Another problem is that the code would be more complicated if you don't know the size of the array in advance, although that is not the case for my data.) Does Spark actually generate an intermediate data set with that many columns, or does it just consider this an intermediate step that individual items pass through transiently (or indeed does it optimise this away step entirely when it sees that the only use of these columns is to be assembled into a vector)?

这似乎是理想的,除了TEMPERATURE_COUNT超过 100,有时超过 1000。(另一个问题是,如果您事先不知道数组的大小,代码会更复杂,尽管这不是我的数据的情况。)Spark 是否真的生成了一个包含那么多列的中间数据集,或者它只是认为这是单个项目暂时通过的中间步骤(或者当它看到只有使用这些列才能组装成一个向量)?

Alternative 2: use a UDF

备选方案 2:使用 UDF

A rather simpler alternative is to use a UDF to do the conversion. This lets me express quite directly what I want to do in one line of code, and doesn't require making a data set with a crazy number of columns. But all that data has to be exchanged between Python and the JVM, and every individual number has to be handled by Python (which is notoriously slow for iterating over individual data items). Here is how that looks:

一个相当简单的替代方法是使用 UDF 进行转换。这让我可以在一行代码中非常直接地表达我想要做的事情,并且不需要制作包含大量列的数据集。但是所有这些数据都必须在 Python 和 JVM 之间交换,并且每个单独的数字都必须由 Python 处理(众所周知,Python 迭代单个数据项的速度很慢)。这是它的外观:

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
    df["city"], 
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

Ignorable remarks

可忽略的言论

The remaining sections of this rambling question are some extra things I came up with while trying to find an answer. They can probably be skipped by most people reading this.

这个漫无边际的问题的其余部分是我在试图找到答案时想到的一些额外的东西。大多数阅读本文的人可能会跳过它们。

Not a solution: use Vectorto begin with

不是解决方案:使用Vector开始

In this trivial example it's possible to create the data using the vector type to begin with, but of course my data isn't really a Python list that I'm parallelizing, but instead is being read from a data source. But for the record, here is how that would look:

在这个简单的示例中,可以使用向量类型开始创建数据,但当然,我的数据实际上并不是我正在并行化的 Python 列表,而是从数据源中读取的。但为了记录,这里是这样的:

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)

Inefficient solution: use map()

低效解决方案:使用 map()

One possibility is to use the RDD map()method to transform the list to a Vector. This is similar to the UDF idea, except that its even worse because the cost of serialisation etc. is incurred for all the fields in each row, not just the one being operated on. For the record, here's what that solution would look like:

一种可能性是使用 RDDmap()方法将列表转换为Vector. 这类似于 UDF 的想法,除了它更糟糕,因为序列化等的成本是针对每一行中的所有字段产生的,而不仅仅是被操作的字段。作为记录,以下是该解决方案的样子:

df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"], 
    temperatures=Vectors.dense(row["temperatures"])
)).toDF()

Failed attempt at a workaround for cast

尝试演员的变通方法失败

In desperation, I noticed that Vectoris represented internally by a struct with four fields, but using a traditional cast from that type of struct doesn't work either. Here is an illustration (where I built the struct using a udf but the udf isn't the important part):

无奈之下,我注意到它Vector在内部由具有四个字段的结构表示,但使用该类型结构的传统强制转换也不起作用。这是一个插图(我使用 udf 构建了结构,但 udf 不是重要的部分):

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
    df["city"], 
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"], 
    df_almost_vector["temperatures"].cast(VectorUDT())
)

This gives the error:

这给出了错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"

采纳答案by zero323

Personally I would go with Python UDF and wouldn't bother with anything else:

就我个人而言,我会使用 Python UDF 而不会打扰其他任何事情:

But if you really want other options here you are:

但是,如果您真的想要其他选择,您可以:

  • Scala UDF with Python wrapper:

    Install sbtfollowing the instructions on the project site.

    Create Scala package with following structure:

    .
    ├── build.sbt
    └── udfs.scala
    

    Edit build.sbt(adjust to reflect Scala and Spark version):

    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.4.4",
      "org.apache.spark" %% "spark-mllib" % "2.4.4"
    )
    

    Edit udfs.scala:

    package com.example.spark.udfs
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.ml.linalg.DenseVector
    
    object udfs {
      val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }
    

    Package:

    sbt package
    

    and include (or equivalent depending on Scala version):

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
    

    as an argument for --driver-class-pathwhen starting shell / submitting application.

    In PySpark define a wrapper:

    from pyspark.sql.column import _to_java_column, _to_seq, Column
    from pyspark import SparkContext
    
    def as_vector(col):
        sc = SparkContext.getOrCreate()
        f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
        return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
    

    Test:

    with_vec = df.withColumn("vector", as_vector("temperatures"))
    with_vec.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|          vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_vec.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- vector: vector (nullable = true)
    
  • Dump data to a JSON format reflecting DenseVectorschema and read it back:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT
    
    json_vec = to_json(struct(struct(
        lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
        col("temperatures").alias("values")
    ).alias("v")))
    
    schema = StructType([StructField("v", VectorUDT())])
    
    with_parsed_vector = df.withColumn(
        "parsed_vector", from_json(json_vec, schema).getItem("v")
    )
    
    with_parsed_vector.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_parsed_vector.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- parsed_vector: vector (nullable = true)
    
  • 带有 Python 包装器的 Scala UDF:

    按照项目站点上的说明安装sbt

    创建具有以下结构的 Scala 包:

    .
    ├── build.sbt
    └── udfs.scala
    

    编辑build.sbt(调整以反映 Scala 和 Spark 版本):

    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.4.4",
      "org.apache.spark" %% "spark-mllib" % "2.4.4"
    )
    

    编辑udfs.scala

    package com.example.spark.udfs
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.ml.linalg.DenseVector
    
    object udfs {
      val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }
    

    包裹:

    sbt package
    

    并包括(或等效的,取决于 Scala 版本):

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
    

    作为--driver-class-path启动外壳程序/提交应用程序时的参数。

    在 PySpark 中定义一个包装器:

    from pyspark.sql.column import _to_java_column, _to_seq, Column
    from pyspark import SparkContext
    
    def as_vector(col):
        sc = SparkContext.getOrCreate()
        f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
        return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
    

    测试:

    with_vec = df.withColumn("vector", as_vector("temperatures"))
    with_vec.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|          vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_vec.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- vector: vector (nullable = true)
    
  • 将数据转储为反映DenseVector架构的 JSON 格式并将其读回:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT
    
    json_vec = to_json(struct(struct(
        lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
        col("temperatures").alias("values")
    ).alias("v")))
    
    schema = StructType([StructField("v", VectorUDT())])
    
    with_parsed_vector = df.withColumn(
        "parsed_vector", from_json(json_vec, schema).getItem("v")
    )
    
    with_parsed_vector.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_parsed_vector.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- parsed_vector: vector (nullable = true)
    

回答by GGDammy

I had a same problem like you and I did this way. This way includes RDD transformation, so is not performance critical, but it works.

我和你有同样的问题,我就是这样做的。这种方式包括 RDD 转换,因此性能不是关键,但它有效。

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])

new_df

the result is,

结果是,

DataFrame[city: string, temperatures: vector]