Python PySpark:按日期为字符串的范围内的日期字段过滤 DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36115687/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 17:26:20  来源:igfitidea点击:

PySpark: filtering a DataFrame by date field in range where date is string

pythondatedatetimedataframepyspark

提问by mar tin

My dataframes contains one field which is a date and it appears in the string format, as example

我的数据框包含一个日期字段,它以字符串格式显示,例如

'2015-07-02T11:22:21.050Z'

I need to filter the DataFrame on the date to get only the records in the last week. So, I was trying a map approach where I transformed the string dates to datetime objects with strptime:

我需要在日期过滤 DataFrame 以仅获取上周的记录。所以,我正在尝试一种地图方法,我使用 strptime 将字符串日期转换为日期时间对象:

def map_to_datetime(row):
     format_string = '%Y-%m-%dT%H:%M:%S.%fZ'
     row.date = datetime.strptime(row.date, format_string)

df = df.map(map_to_datetime)

and then I would apply a filter as

然后我会应用一个过滤器

df.filter(lambda row:
    row.date >= (datetime.today() - timedelta(days=7)))

I manage to get the mapping working but the filter fails with

我设法使映射正常工作,但过滤器失败了

TypeError: condition should be string or Column

类型错误:条件应该是字符串或列

Is there a way to use a filtering in a way that works or should I change the approach and how?

有没有办法以有效的方式使用过滤,或者我应该改变方法以及如何改变?

采纳答案by zero323

You can solve this without using worker side Python code and switching to RDDs. First of all, since you use ISO 8601 string, your data can be directly casted to date or timestamp:

您可以在不使用工作端 Python 代码并切换到 RDD 的情况下解决此问题。首先,由于您使用 ISO 8601 字符串,您的数据可以直接转换为日期或时间戳:

from pyspark.sql.functions import col

df = sc.parallelize([
    ('2015-07-02T11:22:21.050Z', ),
    ('2016-03-20T21:00:00.000Z', )
]).toDF(("d_str", ))

df_casted = df.select("*",
    col("d_str").cast("date").alias("dt"), 
    col("d_str").cast("timestamp").alias("ts"))

This will save one roundtrip between JVM and Python. There are also a few way you can approach the second part. Date only:

这将节省 JVM 和 Python 之间的一次往返。还有一些方法可以处理第二部分。仅限日期:

from pyspark.sql.functions import current_date, datediff, unix_timestamp

df_casted.where(datediff(current_date(), col("dt")) < 7)

Timestamps:

时间戳:

def days(i: int) -> int:
    return 60 * 60 * 24 * i

df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7))

You can also take a look at current_timestampand date_sub

您还可以查看current_timestampdate_sub

Note: I would avoid using DataFrame.map. It is better to use DataFrame.rdd.mapinstead. It will save you some work when switching to 2.0+

注意:我会避免使用DataFrame.map. 最好DataFrame.rdd.map改用。切换到 2.0+ 时,它会为您节省一些工作

回答by mar tin

I figured out a way to solve my problem by using the SparkSQL API with dates in String format.

我想出了一种方法来解决我的问题,方法是使用带有字符串格式的日期的 SparkSQL API。

Here is an example:

下面是一个例子:

last_week = (datetime.today() - timedelta(days=7)).strftime(format='%Y-%m-%d')

new_df = df.where(df.date >= last_week)