Python PySpark 从 TimeStampType 列向 DataFrame 添加一列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/30882268/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
PySpark add a column to a DataFrame from a TimeStampType column
提问by Wai Yip Tung
I have a DataFrame that look something like that. I want to operate on the day of the date_time
field.
我有一个看起来像这样的 DataFrame。我想在date_time
外地当天进行手术。
root
|-- host: string (nullable = true)
|-- user_id: string (nullable = true)
|-- date_time: timestamp (nullable = true)
I tried to add a column to extract the day. So far my attempts have failed.
我尝试添加一列来提取日期。到目前为止,我的尝试都失败了。
df = df.withColumn("day", df.date_time.getField("day"))
org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type TimestampType;
This has also failed
这也失败了
df = df.withColumn("day", df.select("date_time").map(lambda row: row.date_time.day))
AttributeError: 'PipelinedRDD' object has no attribute 'alias'
Any idea how this can be done?
知道如何做到这一点吗?
采纳答案by zero323
You can use simple map
:
您可以使用简单的map
:
df.rdd.map(lambda row:
Row(row.__fields__ + ["day"])(row + (row.date_time.day, ))
)
Another option is to register a function and run SQL query:
另一种选择是注册一个函数并运行 SQL 查询:
sqlContext.registerFunction("day", lambda x: x.day)
sqlContext.registerDataFrameAsTable(df, "df")
sqlContext.sql("SELECT *, day(date_time) as day FROM df")
Finally you can define udf like this:
最后,您可以像这样定义 udf:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
day = udf(lambda date_time: date_time.day, IntegerType())
df.withColumn("day", day(df.date_time))
EDIT:
编辑:
Actually if you use raw SQL day
function is already defined (at least in Spark 1.4) so you can omit udf registration. It also provides a number of different date processing functions including:
实际上,如果您使用原始 SQLday
函数已经定义(至少在 Spark 1.4 中),那么您可以省略 udf 注册。它还提供了许多不同的日期处理功能,包括:
getters like
year
,month
,dayofmonth
- parsers like
from_unixtime
and formatters likedate_format
吸气剂像
year
,month
,dayofmonth
- 解析器之类的
from_unixtime
和格式化器之类的date_format
It is also possible to use simple date expressions like:
也可以使用简单的日期表达式,例如:
current_timestamp() - expr("INTERVAL 1 HOUR")
It mean you can build relatively complex queries without passing data to Python. For example:
这意味着您可以构建相对复杂的查询,而无需将数据传递给 Python。例如:
df = sc.parallelize([
(1, "2016-01-06 00:04:21"),
(2, "2016-05-01 12:20:00"),
(3, "2016-08-06 00:04:21")
]).toDF(["id", "ts_"])
now = lit("2016-06-01 00:00:00").cast("timestamp")
five_months_ago = now - expr("INTERVAL 5 MONTHS")
(df
# Cast string to timestamp
# For Spark 1.5 use cast("double").cast("timestamp")
.withColumn("ts", unix_timestamp("ts_").cast("timestamp"))
# Find all events in the last five months
.where(col("ts").between(five_months_ago, now))
# Find first Sunday after the event
.withColumn("next_sunday", next_day(col("ts"), "Sun"))
# Compute difference in days
.withColumn("diff", datediff(col("ts"), col("next_sunday"))))