pandas 在 Spark 中创建分箱直方图
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/34505529/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Creating binned histograms in Spark
提问by Neel
Suppose I have a dataframe (df) (Pandas) or RDD (Spark) with the following two columns:
假设我有一个包含以下两列的数据框 (df) (Pandas) 或 RDD (Spark):
timestamp, data
12345.0 10
12346.0 12
In Pandas, I can create a binned histogram of different bin lengths pretty easily. For example, to create a histogram over 1 hr, I do the following:
在 Pandas 中,我可以很容易地创建不同 bin 长度的 binned 直方图。例如,要创建超过 1 小时的直方图,我执行以下操作:
df = df[ ['timestamp', 'data'] ].set_index('timestamp')
df.resample('1H',how=sum).dropna()
Moving to Pandas df from Spark RDD is pretty expensive for me (considering the dataset). Consequently, I prefer to stay within the Spark domain as much as possible.
从 Spark RDD 迁移到 Pandas df 对我来说非常昂贵(考虑到数据集)。因此,我更愿意尽可能地留在 Spark 领域内。
Is there a way to do the equivalent in Spark RDD or dataframes?
有没有办法在 Spark RDD 或数据帧中做等效的事情?
回答by zero323
In this particular case all you need is Unix timestamps and basic arithmetics:
在这种特殊情况下,您只需要 Unix 时间戳和基本算术:
def resample_to_minute(c, interval=1):
t = 60 * interval
return (floor(c / t) * t).cast("timestamp")
def resample_to_hour(c, interval=1):
return resample_to_minute(c, 60 * interval)
df = sc.parallelize([
("2000-01-01 00:00:00", 0), ("2000-01-01 00:01:00", 1),
("2000-01-01 00:02:00", 2), ("2000-01-01 00:03:00", 3),
("2000-01-01 00:04:00", 4), ("2000-01-01 00:05:00", 5),
("2000-01-01 00:06:00", 6), ("2000-01-01 00:07:00", 7),
("2000-01-01 00:08:00", 8)
]).toDF(["timestamp", "data"])
(df.groupBy(resample_to_minute(unix_timestamp("timestamp"), 3).alias("ts"))
.sum().orderBy("ts").show(3, False))
## +---------------------+---------+
## |ts |sum(data)|
## +---------------------+---------+
## |2000-01-01 00:00:00.0|3 |
## |2000-01-01 00:03:00.0|12 |
## |2000-01-01 00:06:00.0|21 |
## +---------------------+---------+
(df.groupBy(resample_to_hour(unix_timestamp("timestamp")).alias("ts"))
.sum().orderBy("ts").show(3, False))
## +---------------------+---------+
## |ts |sum(data)|
## +---------------------+---------+
## |2000-01-01 00:00:00.0|36 |
## +---------------------+---------+
Example data from pandas.DataFrame.resample
documentation.
pandas.DataFrame.resample
文档中的示例数据。
In general case see Making histogram with Spark DataFrame column
在一般情况下,请参阅使用 Spark DataFrame 列制作直方图
回答by Ben. B.
Here is an answer using RDDs and not dataframes:
这是使用 RDD 而不是数据帧的答案:
# Generating some data to test with
import random
import datetime
startTS = 12345.0
array = [(startTS+60*k, random.randrange(10, 20)) for k in range(150)]
# Initializing a RDD
rdd = sc.parallelize(array)
# I first map the timestamps to datetime objects so I can use the datetime.replace
# method to round the times
formattedRDD = (rdd
.map(lambda (ts, data): (datetime.fromtimestamp(int(ts)), data))
.cache())
# Putting the minute and second fields to zero in datetime objects is
# exactly like rounding per hour. You can then reduceByKey to aggregate bins.
hourlyRDD = (formattedRDD
.map(lambda (time, msg): (time.replace(minute=0, second=0), 1))
.reduceByKey(lambda a, b : a + b))
hourlyHisto = hourlyRDD.collect()
print hourlyHisto
> [(datetime.datetime(1970, 1, 1, 4, 0), 60), (datetime.datetime(1970, 1, 1, 5, 0), 55), (datetime.datetime(1970, 1, 1, 3, 0), 35)]
In order to do daily aggregates you can use time.date() instead of time.replace(...). Also to bin per hour starting at a not-round date-time object you can increment the original time by the delta to the nearest round hour.
为了进行每日聚合,您可以使用 time.date() 而不是 time.replace(...)。此外,要从非圆形日期时间对象开始每小时进行 bin,您可以通过增量将原始时间增加到最接近的圆形小时。