如何使用 Scala 从 Spark 中的列表或数组创建行
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/28108107/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to create a Row from a List or Array in Spark using Scala
提问by Anju
I'm trying to create a Row (org.apache.spark.sql.catalyst.expressions.Row) based on the user input. I'm not able to create a Row randomly.
我正在尝试org.apache.spark.sql.catalyst.expressions.Row根据用户输入创建一个 Row ( )。我无法随机创建一行。
Is there any functionality to create a Row from Listor Array.
是否有任何功能可以从List或Array.
For eg., If I have a .csvfile with the following format,
例如,如果我有一个.csv具有以下格式的文件,
"91xxxxxxxxxx,21.31,15,0,0"
If the user input [1, 2]then I need to take only 2nd column and 3rd column along with the customer_idwhich is the first column
如果用户输入[1, 2]那么我只需要取第二列和第三列以及customer_id第一列
I try to parse it with the code:
我尝试用代码解析它:
val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) `
where foo is defined as
其中 foo 定义为
def f(n: List[Int], s: String) : Row = {
val n = input.length
var out = new Array[Any](n+1)
var r = s.split(",")
out(0) = r(0)
for (i <- 1 to n)
out(i) = r(input(i-1)).toDouble
Row(out)
}
and input is a List say
和输入是一个列表说
val input = List(1,2)
Executing this code I get l3 as:
执行此代码我得到 l3 为:
Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916])
But what I want is:
但我想要的是:
Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])`
This has to be passed to create a schema in Spark SQL
这必须传递以在 Spark SQL 中创建模式
回答by gruggie
Something like the following should work:
类似以下的内容应该可以工作:
import org.apache.spark.sql._
def f(n: List[Int], s: String) : Row =
Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq)
回答by 0x0FFF
You are missing creation of the StructField and StructType. Refer to the official guide http://spark.apache.org/docs/latest/sql-programming-guide.html, part Programmatically Specifying the Schema
您缺少 StructField 和 StructType 的创建。参考官方指南http://spark.apache.org/docs/latest/sql-programming-guide.html,部分以编程方式指定架构
I'm not a Scala specialist, but in Python it would look like this:
我不是 Scala 专家,但在 Python 中它看起来像这样:
from pyspark.sql import *
sqlContext = SQLContext(sc)
input = [1,2]
def parse(line):
global input
l = line.split(',')
res = [l[0]]
for ind in input:
res.append(l[ind])
return res
csv = sc.textFile("file:///tmp/inputfile.csv")
rows = csv.map(lambda x: parse(x))
fieldnum = len(input) + 1
fields = [StructField("col"+str(i), StringType(), True) for i in range(fieldnum)]
schema = StructType(fields)
csvWithSchema = sqlContext.applySchema(rows, schema)
csvWithSchema.registerTempTable("test")
sqlContext.sql("SELECT * FROM test").collect()
In short, you should not directly convert them to Row objects, just leave as RDD and apply schema to it with applySchema
简而言之,您不应该直接将它们转换为 Row 对象,只需保留为 RDD 并使用 applySchema
回答by Joel Mata
You can also try:
你也可以试试:
Row.fromSeq(line(0).toString ++ line(1).toDouble ++ line(2).toDouble ++ line.slice(2, line.size).map(value => value.toString))

