scala 选择数组中的一系列元素spark sql
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40134975/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
selecting a range of elements in an array spark sql
提问by thinkinbee
I use spark-shellto do the below operations.
我spark-shell用来做以下操作。
Recently loaded a table with an array column in spark-sql .
最近在 spark-sql 中加载了一个包含数组列的表。
Here is the DDL for the same:
这是相同的 DDL:
create table test_emp_arr{
dept_id string,
dept_nm string,
emp_details Array<string>
}
the data looks something like this
数据看起来像这样
+-------+-------+-------------------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+-------------------------------+
| 10|Finance|[Jon, Snow, Castle, Black, Ned]|
| 20| IT| [Ned, is, no, more]|
+-------+-------+-------------------------------+
I can query the emp_details column something like this :
我可以像这样查询 emp_details 列:
sqlContext.sql("select emp_details[0] from emp_details").show
Problem
问题
I want to query a range of elements in the collection :
我想查询集合中的一系列元素:
Expected query to work
预期的查询工作
sqlContext.sql("select emp_details[0-2] from emp_details").show
or
或者
sqlContext.sql("select emp_details[0:2] from emp_details").show
Expected output
预期输出
+-------------------+
| emp_details|
+-------------------+
|[Jon, Snow, Castle]|
| [Ned, is, no]|
+-------------------+
In pure Scala, if i have an array something as :
在纯 Scala 中,如果我有一个数组:
val emp_details = Array("Jon","Snow","Castle","Black")
I can get the elements from 0 to 2 range using
我可以使用从 0 到 2 范围内的元素
emp_details.slice(0,3)
returns me
还我
Array(Jon, Snow,Castle)
I am not able to apply the above operation of the array in spark-sql.
我无法在 spark-sql 中应用数组的上述操作。
Thanks
谢谢
采纳答案by cheseaux
Here is a solution using a User Defined Functionwhich has the advantage of working for any slice size you want. It simply builds a UDF function around the scala builtin slicemethod :
这是一个使用用户定义函数的解决方案,它具有适用于您想要的任何切片大小的优势。它只是围绕 scala 内置slice方法构建一个 UDF 函数:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val slice = udf((array : Seq[String], from : Int, to : Int) => array.slice(from,to))
Example with a sample of your data :
示例数据示例:
val df = sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned') as emp_details")
df.withColumn("slice", slice($"emp_details", lit(0), lit(3))).show
Produces the expected output
产生预期的输出
+--------------------+-------------------+
| emp_details| slice|
+--------------------+-------------------+
|[Jon, Snow, Castl...|[Jon, Snow, Castle]|
+--------------------+-------------------+
You can also register the UDF in your sqlContextand use it like this
您也可以在您的 UDF 中注册sqlContext并像这样使用它
sqlContext.udf.register("slice", (array : Seq[String], from : Int, to : Int) => array.slice(from,to))
sqlContext.sql("select array('Jon','Snow','Castle','Black','Ned'),slice(array('Jon??','Snow','Castle','Black','Ned'),0,3)")
You won't need litanymore with this solution
您将不再需要lit此解决方案
回答by zero323
Since Spark 2.4 you can use slicefunction. In Python):
从 Spark 2.4 开始,您可以使用slice函数。在Python 中):
pyspark.sql.functions.slice(x, start, length)Collection function: returns an array containing all the elements in x from index start (or starting from the end if start is negative) with the specified length.
...
New in version 2.4.
pyspark.sql.functions.slice(x, start, length)集合函数:返回一个数组,其中包含指定长度的 x 中从索引开始(如果 start 为负,则从末尾开始)的所有元素。
...
2.4 版中的新功能。
from pyspark.sql.functions import slice
df = spark.createDataFrame([
(10, "Finance", ["Jon", "Snow", "Castle", "Black", "Ned"]),
(20, "IT", ["Ned", "is", "no", "more"])
], ("dept_id", "dept_nm", "emp_details"))
df.select(slice("emp_details", 1, 3).alias("empt_details")).show()
+-------------------+
| empt_details|
+-------------------+
|[Jon, Snow, Castle]|
| [Ned, is, no]|
+-------------------+
def slice(x: Column, start: Int, length: Int): ColumnReturns an array containing all the elements in x from index start (or starting from the end if start is negative) with the specified length.
def slice(x: Column, start: Int, length: Int): Column返回一个数组,该数组包含具有指定长度的 x 中从索引开始(如果 start 为负,则从末尾开始)的所有元素。
import org.apache.spark.sql.functions.slice
val df = Seq(
(10, "Finance", Seq("Jon", "Snow", "Castle", "Black", "Ned")),
(20, "IT", Seq("Ned", "is", "no", "more"))
).toDF("dept_id", "dept_nm", "emp_details")
df.select(slice($"emp_details", 1, 3) as "empt_details").show
+-------------------+
| empt_details|
+-------------------+
|[Jon, Snow, Castle]|
| [Ned, is, no]|
+-------------------+
The same thing can be of course done in SQL
SELECT slice(emp_details, 1, 3) AS emp_details FROM df
Important:
重要:
Please note, that unlike Seq.slice, values are indexed from zero and the second argument is length, not end position.
请注意,与 不同的是Seq.slice,值从零开始索引,第二个参数是长度,而不是结束位置。
回答by Wilmerton
Edit2: For who wants to avoid udf at the expense of readability ;-)
Edit2:对于谁想以牺牲可读性为代价来避免 udf ;-)
If you really want to do it in one step, you will have to use Scala to create a lambda function returning an sequence of Columnand wrap it in an array. This is a bit involved, but it's one step:
如果您真的想一步完成,则必须使用 Scala 创建一个 lambda 函数,该函数返回一个 的序列Column并将其包装在一个数组中。这有点复杂,但这是一个步骤:
val df = List(List("Jon", "Snow", "Castle", "Black", "Ned")).toDF("emp_details")
df.withColumn("slice", array((0 until 3).map(i => $"emp_details"(i)):_*)).show(false)
+-------------------------------+-------------------+
|emp_details |slice |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
+-------------------------------+-------------------+
The _:*works a bit of magic to pass an list to a so-called variadic function (arrayin this case, which construct the sql array). But I would advice against using this solution as is. put the lambda function in a named function
将_:*列表传递给所谓的可变参数函数(array在这种情况下,它构造 sql 数组)的工作有点神奇。但我建议不要按原样使用此解决方案。将 lambda 函数放入命名函数中
def slice(from: Int, to: Int) = array((from until to).map(i => $"emp_details"(i)):_*))
for code readability. Note that in general, sticking to Columnexpressions (without using `udf) has better performances.
为了代码可读性。请注意,一般而言,坚持使用Column表达式(不使用 `udf)具有更好的性能。
Edit: In order to do it in a sql statement (as you ask in your question...), following the same logic you would generate the sql query using scala logic (not saying it's the most readable)
编辑:为了在 sql 语句中执行此操作(正如您在问题中所问...),遵循相同的逻辑,您将使用 Scala 逻辑生成 sql 查询(并不是说它是最易读的)
def sliceSql(emp_details: String, from: Int, to: Int): String = "Array(" + (from until to).map(i => "emp_details["+i.toString+"]").mkString(",") + ")"
val sqlQuery = "select emp_details,"+ sliceSql("emp_details",0,3) + "as slice from emp_details"
sqlContext.sql(sqlQuery).show
+-------------------------------+-------------------+
|emp_details |slice |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
+-------------------------------+-------------------+
note that you can replace untilby toin order to provide the last element taken rather than the element at which the iteration stops.
请注意,您可以替换until通过to以提供取,而不是在其迭代停止元素的最后一个元素。
回答by Tzach Zohar
You can use the function arrayto build a new Array out of the three values:
您可以使用该函数array从三个值中构建一个新数组:
import org.apache.spark.sql.functions._
val input = sqlContext.sql("select emp_details from emp_details")
val arr: Column = col("emp_details")
val result = input.select(array(arr(0), arr(1), arr(2)) as "emp_details")
val result.show()
// +-------------------+
// | emp_details|
// +-------------------+
// |[Jon, Snow, Castle]|
// | [Ned, is, no]|
// +-------------------+
回答by Bewang
Here is my generic slice UDF, support array with any type. A little bit ugly because you need to know the element type in advance.
这是我的通用切片 UDF,支持任何类型的数组。有点难看,因为您需要提前知道元素类型。
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
def arraySlice(arr: Seq[AnyRef], from: Int, until: Int): Seq[AnyRef] =
if (arr == null) null else arr.slice(from, until)
def slice(elemType: DataType): UserDefinedFunction =
udf(arraySlice _, ArrayType(elemType)
fs.select(slice(StringType)($"emp_details", 1, 2))
回答by Clay
For those of you stuck using Spark < 2.4 and don't have the slicefunction, here is a solution in pySpark (Scala would be very similar) that does not use udfs. Instead it uses the spark sql functions concat_ws, substring_index, and split.
对于那些坚持使用 Spark < 2.4 并且没有该slice功能的人,这里有一个不使用 udfs 的 pySpark 解决方案(Scala 将非常相似)。相反,它使用的火花SQL函数concat_ws,substring_index和split。
This will only work with string arrays. To make it work with arrays of other types, you will have to cast them into strings first, then cast back to the original type after you have 'sliced' the array.
这仅适用于字符串数组。要使其适用于其他类型的数组,您必须先将它们转换为字符串,然后在“切片”数组后转换回原始类型。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.master('yarn')
.appName("array_slice")
.getOrCreate()
)
emp_details = [
["Jon", "Snow", "Castle", "Black", "Ned"],
["Ned", "is", "no", "more"]
]
df1 = spark.createDataFrame(
[tuple([emp]) for emp in emp_details],
["emp_details"]
)
df1.show(truncate=False)
+-------------------------------+
|emp_details |
+-------------------------------+
|[Jon, Snow, Castle, Black, Ned]|
|[Ned, is, no, more] |
+-------------------------------+
last_string = 2
df2 = (
df1
.withColumn('last_string', (F.lit(last_string)))
.withColumn('concat', F.concat_ws(" ", F.col('emp_details')))
.withColumn('slice', F.expr("substring_index(concat, ' ', last_string + 1)" ))
.withColumn('slice', F.split(F.col('slice'), ' '))
.select('emp_details', 'slice')
)
df2.show(truncate=False)
+-------------------------------+-------------------+
|emp_details |slice |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
|[Ned, is, no, more] |[Ned, is, no] |
+-------------------------------+-------------------+
回答by Kamal Pradhan
use selecrExpr()and split()function in apache spark.
在 apache spark 中使用selecrExpr()和split()函数。
for example :
例如 :
fs.selectExpr("((split(emp_details, ','))[0]) as e1,((split(emp_details, ','))[1]) as e2,((split(emp_details, ','))[2]) as e3);
回答by MarcelG
Use nested split:
使用嵌套拆分:
split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',')
split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',')
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark=SparkSession.builder().getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d637673
scala> val df = spark.read.json("file:///Users/gengmei/Desktop/test/test.json")
18/12/11 10:09:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]
scala> df.createOrReplaceTempView("raw_data")
scala> df.show()
+-------+-------+--------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+--------------------+
| 10|Finance|[Jon, Snow, Castl...|
| 20| IT| [Ned, is, no, more]|
+-------+-------+--------------------+
scala> val df2 = spark.sql(
| s"""
| |select dept_id,dept_nm,split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',') as emp_details from raw_data
| """)
df2: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]
scala> df2.show()
+-------+-------+-------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+-------------------+
| 10|Finance|[Jon, Snow, Castle]|
| 20| IT| [Ned, is, no]|
+-------+-------+-------------------+

![scala 将 Array[(String,String)] 类型转换为 Spark 中的 RDD[(String,String)] 类型](/res/img/loading.gif)