对时间序列的index进行resample是很常见的操作。比如,按日、周、月、季度统计用户新增、活跃、累计等,就需要对用户表进行resample操作。 pandas 的resample函数可以轻松地对时间序列数据进行重采样,并按照一定的频率聚合数据。但是因为spark中没有index的概念,所以做起来并不容易。
以下介绍是如何在 spark 中进行重采样的示例。
1. 笨拙的方法
代码语言:txt复制def resample(column, agg_interval=900, time_format='yyyy-MM-dd HH:mm:ss'):
if type(column)==str:
column = F.col(column)
# Convert the timestamp to unix timestamp format.
# Unix timestamp = number of seconds since 00:00:00 UTC, 1 January 1970.
col_ut = F.unix_timestamp(column, format=time_format)
# Divide the time into dicrete intervals, by rounding.
col_ut_agg = F.floor(col_ut / agg_interval) * agg_interval
# Convert to and return a human readable timestamp
return F.from_unixtime(col_ut_agg)`
测试如下
导入数据:
代码语言:txt复制sdf = spark.read.csv('production.csv', header=True, inferSchema=True)
sdf = (
sdf
.withColumn('_c0',f.to_timestamp(f.col('_c0')))
)
sdf.show(2)
运行
代码语言:txt复制sdf = sdf.withColumn('dt_resampled', resample(sdf._c0, agg_interval=3600)) # 1 hour
sdf.show(5)
- groupby window
group = sdf.groupBy('Cantons', f.window("_c0", "1 day")).agg(f.sum("Production").alias('Sum Production'))
group.show(5,truncate=False)
代码语言:txt复制sdf_resampled = group.select(group.window.start.alias("Start"), group.window.end.alias("End"), "Cantons", "Sum Production").orderBy('Start', ascending=True)
sdf_resampled.show()
)