scala Spark Struded Streaming 自动将时间戳转换为本地时间
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/48767008/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark Strutured Streaming automatically converts timestamp to local time
提问by Martin Bri?iak
I have my timestamp in UTC and ISO8601, but using Structured Streaming, it gets automatically converted into the local time. Is there a way to stop this conversion? I would like to have it in UTC.
我的时间戳为 UTC 和 ISO8601,但使用结构化流,它会自动转换为本地时间。有没有办法阻止这种转换?我想在UTC中使用它。
I'm reading json data from Kafka and then parsing them using the from_jsonSpark function.
我正在从 Kafka 读取 json 数据,然后使用from_jsonSpark 函数解析它们。
Input:
输入:
{"Timestamp":"2015-01-01T00:00:06.222Z"}
Flow:
流动:
SparkSession
.builder()
.master("local[*]")
.appName("my-app")
.getOrCreate()
.readStream()
.format("kafka")
... //some magic
.writeStream()
.format("console")
.start()
.awaitTermination();
Schema:
架构:
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("Timestamp", DataTypes.TimestampType, true),});
Output:
输出:
+--------------------+
| Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+
As you can see, the hour has incremented by itself.
如您所见,小时已自行增加。
PS: I tried to experiment with the from_utc_timestampSpark function, but no luck.
PS:我尝试尝试使用from_utc_timestampSpark 功能,但没有成功。
回答by astro_asz
For me it worked to use:
对我来说,它可以使用:
spark.conf.set("spark.sql.session.timeZone", "UTC")
It tells the spark SQL to use UTC as a default timezone for timestamps. I used it in spark SQL for example:
它告诉 spark SQL 使用 UTC 作为时间戳的默认时区。例如,我在 spark SQL 中使用了它:
select *, cast('2017-01-01 10:10:10' as timestamp) from someTable
I know it does not work in 2.0.1. but works in Spark 2.2. I used in SQLTransformeralso and it worked.
我知道它在 2.0.1 中不起作用。但适用于 Spark 2.2。我SQLTransformer也用过,效果很好。
I am not sure about streaming though.
虽然我不确定流媒体。
回答by zero323
Note:
注意:
This answer is useful primarilyin Spark < 2.2. For newer Spark version see the answerby astro-asz
这个答案主要在 Spark < 2.2 中很有用。对于新版本的Spark看到答案通过ASTRO-ASZ
However we should note that as of Spark 2.4.0, spark.sql.session.timeZonedoesn't set user.timezone(java.util.TimeZone.getDefault). So setting spark.sql.session.timeZonealone can result in rather awkward situation where SQL and non-SQL components use different timezone settings.
但是我们应该注意,从 Spark 2.4.0 开始,spark.sql.session.timeZone没有设置user.timezone( java.util.TimeZone.getDefault)。因此,spark.sql.session.timeZone单独设置会导致 SQL 和非 SQL 组件使用不同时区设置的尴尬情况。
Therefore I still recommend setting user.timezoneexplicitly, even if spark.sql.session.timeZoneis set.
因此我仍然建议user.timezone明确设置,即使spark.sql.session.timeZone设置了。
TL;DRUnfortunately this is how Spark handles timestamps right now and there is really no built-in alternative, other than operating on epoch time directly, without using date/time utilities.
TL;DR不幸的是,这就是 Spark 现在处理时间戳的方式,除了直接在纪元时间上操作,而不使用日期/时间实用程序之外,实际上没有内置的替代方法。
You can an insightful discussion on the Spark developers list: SQL TIMESTAMP semantics vs. SPARK-18350
您可以在 Spark 开发人员列表上进行有见地的讨论:SQL TIMESTAMP semantics vs. SPARK-18350
The cleanest workaround I've found so far is to set -Duser.timezoneto UTCfor both the driver and executors. For example with submit:
最干净的解决办法,我发现到目前为止是设置-Duser.timezone到UTC驾驶者和执行者两种。例如提交:
bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" \
--conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"
or by adjusting configuration files (spark-defaults.conf):
或通过调整配置文件 ( spark-defaults.conf):
spark.driver.extraJavaOptions -Duser.timezone=UTC
spark.executor.extraJavaOptions -Duser.timezone=UTC
回答by Chris Bedford
Although two very good answers have been provided, I found them both to be a bit of a heavy hammer to solve the problem. I did not want anything that would require modifying time zone parsing behavior across the whole app, or an approach that would alter the default time zone of my JVM. I did find a solution after much pain, which I will share below...
虽然已经提供了两个非常好的答案,但我发现它们都是解决问题的重锤。我不想要任何需要在整个应用程序中修改时区解析行为的东西,或者一种会改变我的 JVM 的默认时区的方法。我确实在痛苦之后找到了解决方案,我将在下面分享...
Parsing time[/date] strings into timestamps for date manipulations, then correctly rendering the result back
将 time[/date] 字符串解析为时间戳以进行日期操作,然后正确呈现结果
First, let's address the issue of how to get Spark SQL to correctly parse a date[/time] string (given a format) into a timetamp and then properly render that timestamp back out so it shows the same date[/time] as the original string input. The general approach is:
首先,让我们解决如何让 Spark SQL 正确解析日期 [/时间] 字符串(给定格式)到时间戳的问题,然后正确渲染该时间戳,以便它显示相同的日期 [/时间] 作为原始字符串输入。一般的做法是:
- convert a date[/time] string to time stamp [via to_timestamp]
[ to_timestamp seems to assume the date[/time] string represents a time relative to UTC (GMT time zone) ]
- relativize that timestamp to the timezone we are in via from_utc_timestamp
The test code below implements this approach. 'timezone we are in' is passed as the first argument to the timeTricks method. The code converts the input string "1970-01-01" to localizedTimeStamp (via from_utc_timestamp) and verifies that the 'valueOf' of that time stamp is the same as "1970-01-01 00:00:00".
下面的测试代码实现了这种方法。“我们所在的时区”作为第一个参数传递给 timeTricks 方法。该代码将输入字符串“1970-01-01”转换为 localizedTimeStamp(通过 from_utc_timestamp)并验证该时间戳的“valueOf”是否与“1970-01-01 00:00:00”相同。
object TimeTravails {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark: SparkSession = SparkSession.builder()
.master("local[3]")
.appName("SparkByExample")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
import java.sql.Timestamp
def timeTricks(timezone: String): Unit = {
val df2 = List("1970-01-01").toDF("timestr"). // can use to_timestamp even without time parts !
withColumn("timestamp", to_timestamp('timestr, "yyyy-MM-dd")).
withColumn("localizedTimestamp", from_utc_timestamp('timestamp, timezone)).
withColumn("weekday", date_format($"localizedTimestamp", "EEEE"))
val row = df2.first()
println("with timezone: " + timezone)
df2.show()
val (timestamp, weekday) = (row.getAs[Timestamp]("localizedTimestamp"), row.getAs[String]("weekday"))
timezone match {
case "UTC" =>
assert(timestamp == Timestamp.valueOf("1970-01-01 00:00:00") && weekday == "Thursday")
case "PST" | "GMT-8" | "America/Los_Angeles" =>
assert(timestamp == Timestamp.valueOf("1969-12-31 16:00:00") && weekday == "Wednesday")
case "Asia/Tokyo" =>
assert(timestamp == Timestamp.valueOf("1970-01-01 09:00:00") && weekday == "Thursday")
}
}
timeTricks("UTC")
timeTricks("PST")
timeTricks("GMT-8")
timeTricks("Asia/Tokyo")
timeTricks("America/Los_Angeles")
}
}
Solution to problem of Structured Streaming Interpreting incoming date[/time] strings as UTC (not local time)
Structured Streaming Interpretingcoming date[/time] strings as UTC (not local time) 问题的解决方案
The code below illustrates how to apply the above tricks (with a slight modification) so as to correct the problem of timestamps being shifted by the offset between local time and GMT.
下面的代码说明了如何应用上述技巧(稍作修改),以纠正时间戳被本地时间和 GMT 之间的偏移量偏移的问题。
object Struct {
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
def main(args: Array[String]): Unit = {
val timezone = "PST"
val spark: SparkSession = SparkSession.builder()
.master("local[3]")
.appName("SparkByExample")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val df = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.load()
import spark.implicits._
val splitDf = df.select(split(df("value"), " ").as("arr")).
select($"arr" (0).as("tsString"), $"arr" (1).as("count")).
withColumn("timestamp", to_timestamp($"tsString", "yyyy-MM-dd"))
val grouped = splitDf.groupBy(window($"timestamp", "1 day", "1 day").as("date_window")).count()
val tunedForDisplay =
grouped.
withColumn("windowStart", to_utc_timestamp($"date_window.start", timezone)).
withColumn("windowEnd", to_utc_timestamp($"date_window.end", timezone))
tunedForDisplay.writeStream
.format("console")
.outputMode("update")
.option("truncate", false)
.start()
.awaitTermination()
}
}
The code requires input be fed via socket... I use the program 'nc' (net cat) started like this:
代码需要通过套接字输入输入......我使用程序'nc'(网络猫),如下所示:
nc -l 9999
Then I start the Spark program and provide net cat with one line of input:
然后我启动 Spark 程序并为 net cat 提供一行输入:
1970-01-01 4
The output I get illustrates the problem with the offset shift:
我得到的输出说明了偏移偏移的问题:
-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+-------------------+-------------------+
|date_window |count|windowStart |windowEnd |
+------------------------------------------+-----+-------------------+-------------------+
|[1969-12-31 16:00:00, 1970-01-01 16:00:00]|1 |1970-01-01 00:00:00|1970-01-02 00:00:00|
+------------------------------------------+-----+-------------------+-------------------+
Note that the start and end for date_window is shifted by eight hours from the input (because I am in the GMT-7/8 timezone, PST). However, I correct this shift using to_utc_timestamp to get the proper start and end date times for the one day window that subsumes the input: 1970-01-01 00:00:00,1970-01-02 00:00:00.
请注意,date_window 的开始和结束从输入偏移了八小时(因为我在 GMT-7/8 时区,PST)。但是,我使用 to_utc_timestamp 更正了这一转变,以获得包含输入的一天窗口的正确开始和结束日期时间:1970-01-01 00:00:00,1970-01-02 00:00:00。
Note that in the first block of code presented we used from_utc_timestamp, whereas for the structured streaming solution we used to_utc_timestamp. I have yet to figure out which of these two to use in a given situation. (Please clue me in if you know!).
请注意,在呈现的第一个代码块中,我们使用了 from_utc_timestamp,而对于结构化流解决方案,我们使用了 to_utc_timestamp。我还没有弄清楚在给定情况下使用这两个中的哪一个。(如果你知道,请告诉我!)。

