如何使用 Scala 从 Spark 中的列表或数组创建行

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/28108107/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:50:52  来源:igfitidea点击:

How to create a Row from a List or Array in Spark using Scala

scalaapache-sparkapache-spark-sql

提问by Anju

I'm trying to create a Row (org.apache.spark.sql.catalyst.expressions.Row) based on the user input. I'm not able to create a Row randomly.

我正在尝试org.apache.spark.sql.catalyst.expressions.Row根据用户输入创建一个 Row ( )。我无法随机创建一行。

Is there any functionality to create a Row from Listor Array.

是否有任何功能可以从ListArray.

For eg., If I have a .csvfile with the following format,

例如,如果我有一个.csv具有以下格式的文件,

"91xxxxxxxxxx,21.31,15,0,0"

If the user input [1, 2]then I need to take only 2nd column and 3rd column along with the customer_idwhich is the first column

如果用户输入[1, 2]那么我只需要取第二列和第三列以及customer_id第一列

I try to parse it with the code:

我尝试用代码解析它:

val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) `

where foo is defined as

其中 foo 定义为

def f(n: List[Int], s: String) : Row = {
    val n = input.length
    var out = new Array[Any](n+1)
    var r = s.split(",")
    out(0) = r(0)
    for (i <- 1 to n)
        out(i) = r(input(i-1)).toDouble
    Row(out)
}

and input is a List say

和输入是一个列表说

val input = List(1,2)

Executing this code I get l3 as:

执行此代码我得到 l3 为:

Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916])

But what I want is:

但我想要的是:

Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])`

This has to be passed to create a schema in Spark SQL

这必须传递以在 Spark SQL 中创建模式

回答by gruggie

Something like the following should work:

类似以下的内容应该可以工作:

import org.apache.spark.sql._

def f(n: List[Int], s: String) : Row =
  Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq)

回答by 0x0FFF

You are missing creation of the StructField and StructType. Refer to the official guide http://spark.apache.org/docs/latest/sql-programming-guide.html, part Programmatically Specifying the Schema

您缺少 StructField 和 StructType 的创建。参考官方指南http://spark.apache.org/docs/latest/sql-programming-guide.html,部分以编程方式指定架构

I'm not a Scala specialist, but in Python it would look like this:

我不是 Scala 专家,但在 Python 中它看起来像这样:

from pyspark.sql import *
sqlContext = SQLContext(sc)

input = [1,2]

def parse(line):
    global input
    l = line.split(',')
    res = [l[0]]
    for ind in input:
        res.append(l[ind])
    return res

csv  = sc.textFile("file:///tmp/inputfile.csv")
rows = csv.map(lambda x: parse(x))

fieldnum = len(input) + 1
fields = [StructField("col"+str(i), StringType(), True) for i in range(fieldnum)]
schema = StructType(fields)

csvWithSchema = sqlContext.applySchema(rows, schema)
csvWithSchema.registerTempTable("test")
sqlContext.sql("SELECT * FROM test").collect()

In short, you should not directly convert them to Row objects, just leave as RDD and apply schema to it with applySchema

简而言之,您不应该直接将它们转换为 Row 对象,只需保留为 RDD 并使用 applySchema

回答by Joel Mata

You can also try:

你也可以试试:

    Row.fromSeq(line(0).toString ++ line(1).toDouble ++ line(2).toDouble ++ line.slice(2, line.size).map(value => value.toString))