scala 在火花数据框中映射

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/47253834/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:29:59  来源:igfitidea点击:

Map in a spark dataframe

scalaapache-sparkapache-spark-sql

提问by Magellan88

Using Spark 2.x I'm making use of the dataframes.

使用 Spark 2.x 我正在使用数据帧。

val proposals = spark.read
  .option("header", true)
  .option("inferSchema", true)
  .option("delimiter", ";")
  .csv("/proposals.txt.gz")

proposals.printSchema()

which works fine and gives:

它工作正常并给出:

root
 |-- MARKETCODE: string (nullable = true)
 |-- REFDATE: string (nullable = true)
 |-- UPDTIME: string (nullable = true)
 |-- UPDTIMEMSEC: integer (nullable = true)
 |-- ENDTIME: string (nullable = true)
 |-- ENDTIMEMSEC: integer (nullable = true)
 |-- BONDCODE: string (nullable = true)

Now I'd like to calculate a time in milliseconds and thus have written a function:

现在我想以毫秒为单位计算时间,因此编写了一个函数:

def time2usecs( time:String, msec:Int )={
    val Array(hour,minute,seconds) = time.split(":").map( _.toInt )
    msec + seconds.toInt*1000 + minute.toInt*60*1000 + hour.toInt*60*60*1000
}
time2usecs( "08:13:44", 111 )


time2usecs: (time: String, msec: Int)Int
res90: Int = 29624111

The last peace of the puzzle that would be something like:

拼图的最后和平将是这样的:

proposals.withColumn( "utime",
  proposals.select("UPDTIME","UPDTIMEMSEC")
    .map( (t,tms) => time2usecs(t,tms) ))

But I can't figure out how to do the df.select(column1, column2).map(...)part.

但我无法弄清楚如何做这df.select(column1, column2).map(...)部分。

采纳答案by user8929556

Why not use SQL all the way?

为什么不一直使用SQL?

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

def time2usecs(time: Column, msec: Column) = {
  val bits  = split(time, ":")
  msec + bits(2).cast("int") * 1000 + bits(1).cast("int") * 60 * 1000 + 
  bits(0).cast("int") *60*60*1000
}

df.withColumn("ts", time2usecs(col(""UPDTIME"), col("UPDTIMEMSEC"))

With your code you'd have to:

使用您的代码,您必须:

proposals
  .select("UPDTIME","UPDTIMEMSEC")
  .as[(String, Int)]
  .map { case (t, s) => time2usecs(t, s) }

回答by Shaido - Reinstate Monica

The common approach to using a method on dataframe columns in Spark is to define an UDF(User-Defined Function, see herefor more information). For your case:

在 Spark 中对数据框列使用方法的常见方法是定义一个UDF(用户定义的函数,请参阅此处了解更多信息)。对于您的情况:

import org.apache.spark.sql.functions.udf
import spark.implicits._

val time2usecs = udf((time: String, msec: Int) => {
  val Array(hour,minute,seconds) = time.split(":").map( _.toInt )
  msec + seconds.toInt*1000 + minute.toInt*60*1000 + hour.toInt*60*60*1000
})

val df2 = df.withColumn("utime", time2usecs($"UPDTIME", $"UPDTIMEMSEC"))

spark.implicits._is imported here to allow the use of the $shorthand for the col()function.

spark.implicits._在这里导入是为了允许使用函数的$简写col()