scala 在 Spark 中将字符串字段转换为时间戳的更好方法

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29844144/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:05:28  来源:igfitidea点击:

Better way to convert a string field into timestamp in Spark

scalaapache-sparkapache-spark-sql

提问by user568109

I have a CSV in which a field is datetime in a specific format. I cannot import it directly in my Dataframe because it needs to be a timestamp. So I import it as string and convert it into a Timestamplike this

我有一个 CSV,其中一个字段是特定格式的日期时间。我不能直接在我的 Dataframe 中导入它,因为它需要是一个时间戳。所以我将它作为字符串导入并将其转换为Timestamp这样的

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row

def getTimestamp(x:Any) : Timestamp = {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    if (x.toString() == "") 
    return null
    else {
        val d = format.parse(x.toString());
        val t = new Timestamp(d.getTime());
        return t
    }
}

def convert(row : Row) : Row = {
    val d1 = getTimestamp(row(3))
    return Row(row(0),row(1),row(2),d1)
}

Is there a better, more concise way to do this, with the Dataframe API or spark-sql? The above method requires the creation of an RDD and to give the schema for the Dataframe again.

使用 Dataframe API 或 spark-sql 有没有更好、更简洁的方法来做到这一点?上述方法需要创建一个 RDD 并再次为 Dataframe 提供模式。

回答by zero323

Spark >= 2.2

火花 >= 2.2

Since you 2.2 you can provide format string directly:

从 2.2 开始,您可以直接提供格式字符串:

import org.apache.spark.sql.functions.to_timestamp

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+-------------------+
// |id |dts                |ts                 |
// +---+-------------------+-------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2  |#$@#@#             |null               |
// +---+-------------------+-------------------+

Spark >= 1.6, < 2.2

火花 >= 1.6, < 2.2

You can use date processing functions which have been introduced in Spark 1.5. Assuming you have following data:

您可以使用 Spark 1.5 中引入的日期处理函数。假设您有以下数据:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")

You can use unix_timestampto parse strings and cast it to timestamp

您可以使用unix_timestamp解析字符串并将其转换为时间戳

import org.apache.spark.sql.functions.unix_timestamp

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+---------------------+
// |id |dts                |ts                   |
// +---+-------------------+---------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2  |#$@#@#             |null                 |
// +---+-------------------+---------------------+

As you can see it covers both parsing and error handling. The format string should be compatible with Java SimpleDateFormat.

如您所见,它涵盖了解析和错误处理。格式字符串应与 Java 兼容SimpleDateFormat

Spark >= 1.5, < 1.6

火花 >= 1.5, < 1.6

You'll have to use use something like this:

你必须使用像这样的东西:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")

or

或者

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")

due to SPARK-11724.

由于SPARK-11724

Spark < 1.5

火花 < 1.5

you should be able to use these with exprand HiveContext.

您应该能够将这些与expr和一起使用HiveContext

回答by jarandaf

I haven't played with Spark SQL yet but I think this would be more idiomatic scala (null usage is not considered a good practice):

我还没有玩过 Spark SQL,但我认为这将是更惯用的 scala(使用 null 不被认为是一个好习惯):

def getTimestamp(s: String) : Option[Timestamp] = s match {
  case "" => None
  case _ => {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    Try(new Timestamp(format.parse(s).getTime)) match {
      case Success(t) => Some(t)
      case Failure(_) => None
    }    
  }
}

Please notice I assume you know Rowelements types beforehand (if you read it from a csv file, all them are String), that's why I use a proper type like Stringand not Any(everything is subtype of Any).

请注意,我假设您Row事先知道元素类型(如果您从 csv 文件中读取它,则它们都是String),这就是为什么我使用适当的类型,String而不是Any(一切都是 的子类型Any)。

It also depends on how you want to handle parsing exceptions. In this case, if a parsing exception occurs, a Noneis simply returned.

它还取决于您希望如何处理解析异常。在这种情况下,如果发生解析异常,None则简单地返回 a。

You could use it further on with:

您可以进一步使用它:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))

回答by zengr

I have ISO8601 timestamp in my dataset and I needed to convert it to "yyyy-MM-dd" format. This is what I did:

我的数据集中有 ISO8601 时间戳,我需要将其转换为“yyyy-MM-dd”格式。这就是我所做的:

import org.joda.time.{DateTime, DateTimeZone}
object DateUtils extends Serializable {
  def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
  def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
}

sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))

And you can just use the UDF in your spark SQL query.

您可以在 Spark SQL 查询中使用 UDF。

回答by Aravind Krishnakumar

Spark Version: 2.4.4

火花版本:2.4.4

scala> import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.types.TimestampType

scala> val df = Seq("2019-04-01 08:28:00").toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: string]

scala> val df_mod = df.select($"ts".cast(TimestampType))
df_mod: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df_mod.printSchema()
root
 |-- ts: timestamp (nullable = true)

回答by yjshen

I would like to move the getTimeStampmethod wrote by you into rdd's mapPartitions and reuse GenericMutableRow among rows in an iterator:

我想把getTimeStamp你写的方法移到rdd的mapPartitions中,并在迭代器的行之间重用GenericMutableRow:

val strRdd = sc.textFile("hdfs://path/to/cvs-file")
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter =>
  new Iterator[Row] {
    val row = new GenericMutableRow(4)
    var current: Array[String] = _

    def hasNext = iter.hasNext
    def next() = {
      current = iter.next()
      row(0) = current(0)
      row(1) = current(1)
      row(2) = current(2)

      val ts = getTimestamp(current(3))
      if(ts != null) {
        row.update(3, ts)
      } else {
        row.setNullAt(3)
      }
      row
    }
  }
}

And you should still use schema to generate a DataFrame

你仍然应该使用模式来生成一个 DataFrame

val df = sqlContext.createDataFrame(rowRdd, tableSchema)

The usage of GenericMutableRow inside an iterator implementation could be find in Aggregate Operator, InMemoryColumnarTableScan, ParquetTableOperationsetc.

GenericMutableRow 在迭代器实现中的使用可以在Aggregate OperatorInMemoryColumnarTableScanParquetTableOperations等中找到。

回答by mark

I would use https://github.com/databricks/spark-csv

我会使用https://github.com/databricks/spark-csv

This will infer timestamps for you.

这将为您推断时间戳。

import com.databricks.spark.csv._
val rdd: RDD[String] = sc.textFile("csvfile.csv")

val df : DataFrame = new CsvParser().withDelimiter('|')
      .withInferSchema(true)
      .withParseMode("DROPMALFORMED")
      .csvRdd(sqlContext, rdd)

回答by ashwin319

I had some issues with to_timestamp where it was returning an empty string. After a lot of trial and error, I was able to get around it by casting as a timestamp, and then casting back as a string. I hope this helps for anyone else with the same issue:

我在 to_timestamp 返回空字符串时遇到了一些问题。经过大量的反复试验,我能够通过将其转换为时间戳,然后将其转换为字符串来绕过它。我希望这对遇到相同问题的其他人有所帮助:

df.columns.intersect(cols).foldLeft(df)((newDf, col) => {
  val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string")
  newDf.withColumn(col, conversionFunc)
})