scala 在 Spark 中将字符串字段转换为时间戳的更好方法
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29844144/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Better way to convert a string field into timestamp in Spark
提问by user568109
I have a CSV in which a field is datetime in a specific format. I cannot import it directly in my Dataframe because it needs to be a timestamp. So I import it as string and convert it into a Timestamplike this
我有一个 CSV,其中一个字段是特定格式的日期时间。我不能直接在我的 Dataframe 中导入它,因为它需要是一个时间戳。所以我将它作为字符串导入并将其转换为Timestamp这样的
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row
def getTimestamp(x:Any) : Timestamp = {
val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
if (x.toString() == "")
return null
else {
val d = format.parse(x.toString());
val t = new Timestamp(d.getTime());
return t
}
}
def convert(row : Row) : Row = {
val d1 = getTimestamp(row(3))
return Row(row(0),row(1),row(2),d1)
}
Is there a better, more concise way to do this, with the Dataframe API or spark-sql? The above method requires the creation of an RDD and to give the schema for the Dataframe again.
使用 Dataframe API 或 spark-sql 有没有更好、更简洁的方法来做到这一点?上述方法需要创建一个 RDD 并再次为 Dataframe 提供模式。
回答by zero323
Spark >= 2.2
火花 >= 2.2
Since you 2.2 you can provide format string directly:
从 2.2 开始,您可以直接提供格式字符串:
import org.apache.spark.sql.functions.to_timestamp
val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")
df.withColumn("ts", ts).show(2, false)
// +---+-------------------+-------------------+
// |id |dts |ts |
// +---+-------------------+-------------------+
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2 |#$@#@# |null |
// +---+-------------------+-------------------+
Spark >= 1.6, < 2.2
火花 >= 1.6, < 2.2
You can use date processing functions which have been introduced in Spark 1.5. Assuming you have following data:
您可以使用 Spark 1.5 中引入的日期处理函数。假设您有以下数据:
val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")
You can use unix_timestampto parse strings and cast it to timestamp
您可以使用unix_timestamp解析字符串并将其转换为时间戳
import org.apache.spark.sql.functions.unix_timestamp
val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")
df.withColumn("ts", ts).show(2, false)
// +---+-------------------+---------------------+
// |id |dts |ts |
// +---+-------------------+---------------------+
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2 |#$@#@# |null |
// +---+-------------------+---------------------+
As you can see it covers both parsing and error handling. The format string should be compatible with Java SimpleDateFormat.
如您所见,它涵盖了解析和错误处理。格式字符串应与 Java 兼容SimpleDateFormat。
Spark >= 1.5, < 1.6
火花 >= 1.5, < 1.6
You'll have to use use something like this:
你必须使用像这样的东西:
unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")
or
或者
(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")
due to SPARK-11724.
由于SPARK-11724。
Spark < 1.5
火花 < 1.5
you should be able to use these with exprand HiveContext.
您应该能够将这些与expr和一起使用HiveContext。
回答by jarandaf
I haven't played with Spark SQL yet but I think this would be more idiomatic scala (null usage is not considered a good practice):
我还没有玩过 Spark SQL,但我认为这将是更惯用的 scala(使用 null 不被认为是一个好习惯):
def getTimestamp(s: String) : Option[Timestamp] = s match {
case "" => None
case _ => {
val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
Try(new Timestamp(format.parse(s).getTime)) match {
case Success(t) => Some(t)
case Failure(_) => None
}
}
}
Please notice I assume you know Rowelements types beforehand (if you read it from a csv file, all them are String), that's why I use a proper type like Stringand not Any(everything is subtype of Any).
请注意,我假设您Row事先知道元素类型(如果您从 csv 文件中读取它,则它们都是String),这就是为什么我使用适当的类型,String而不是Any(一切都是 的子类型Any)。
It also depends on how you want to handle parsing exceptions. In this case, if a parsing exception occurs, a Noneis simply returned.
它还取决于您希望如何处理解析异常。在这种情况下,如果发生解析异常,None则简单地返回 a。
You could use it further on with:
您可以进一步使用它:
rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))
回答by zengr
I have ISO8601 timestamp in my dataset and I needed to convert it to "yyyy-MM-dd" format. This is what I did:
我的数据集中有 ISO8601 时间戳,我需要将其转换为“yyyy-MM-dd”格式。这就是我所做的:
import org.joda.time.{DateTime, DateTimeZone}
object DateUtils extends Serializable {
def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
}
sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))
And you can just use the UDF in your spark SQL query.
您可以在 Spark SQL 查询中使用 UDF。
回答by Aravind Krishnakumar
Spark Version: 2.4.4
火花版本:2.4.4
scala> import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.types.TimestampType
scala> val df = Seq("2019-04-01 08:28:00").toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: string]
scala> val df_mod = df.select($"ts".cast(TimestampType))
df_mod: org.apache.spark.sql.DataFrame = [ts: timestamp]
scala> df_mod.printSchema()
root
|-- ts: timestamp (nullable = true)
回答by yjshen
I would like to move the getTimeStampmethod wrote by you into rdd's mapPartitions and reuse GenericMutableRow among rows in an iterator:
我想把getTimeStamp你写的方法移到rdd的mapPartitions中,并在迭代器的行之间重用GenericMutableRow:
val strRdd = sc.textFile("hdfs://path/to/cvs-file")
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter =>
new Iterator[Row] {
val row = new GenericMutableRow(4)
var current: Array[String] = _
def hasNext = iter.hasNext
def next() = {
current = iter.next()
row(0) = current(0)
row(1) = current(1)
row(2) = current(2)
val ts = getTimestamp(current(3))
if(ts != null) {
row.update(3, ts)
} else {
row.setNullAt(3)
}
row
}
}
}
And you should still use schema to generate a DataFrame
你仍然应该使用模式来生成一个 DataFrame
val df = sqlContext.createDataFrame(rowRdd, tableSchema)
The usage of GenericMutableRow inside an iterator implementation could be find in Aggregate Operator, InMemoryColumnarTableScan, ParquetTableOperationsetc.
GenericMutableRow 在迭代器实现中的使用可以在Aggregate Operator、InMemoryColumnarTableScan、ParquetTableOperations等中找到。
回答by mark
I would use https://github.com/databricks/spark-csv
我会使用https://github.com/databricks/spark-csv
This will infer timestamps for you.
这将为您推断时间戳。
import com.databricks.spark.csv._
val rdd: RDD[String] = sc.textFile("csvfile.csv")
val df : DataFrame = new CsvParser().withDelimiter('|')
.withInferSchema(true)
.withParseMode("DROPMALFORMED")
.csvRdd(sqlContext, rdd)
回答by ashwin319
I had some issues with to_timestamp where it was returning an empty string. After a lot of trial and error, I was able to get around it by casting as a timestamp, and then casting back as a string. I hope this helps for anyone else with the same issue:
我在 to_timestamp 返回空字符串时遇到了一些问题。经过大量的反复试验,我能够通过将其转换为时间戳,然后将其转换为字符串来绕过它。我希望这对遇到相同问题的其他人有所帮助:
df.columns.intersect(cols).foldLeft(df)((newDf, col) => {
val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string")
newDf.withColumn(col, conversionFunc)
})

