Spark resampling

2021-09-18 16:08:15 浏览数 (1)

对时间序列的index进行resample是很常见的操作。比如,按日、周、月、季度统计用户新增、活跃、累计等,就需要对用户表进行resample操作。 pandas 的resample函数可以轻松地对时间序列数据进行重采样,并按照一定的频率聚合数据。但是因为spark中没有index的概念,所以做起来并不容易。

以下介绍是如何在 spark 中进行重采样的示例。

1. 笨拙的方法

代码语言:txt复制
def resample(column, agg_interval=900, time_format='yyyy-MM-dd HH:mm:ss'):
    if type(column)==str:
        column = F.col(column)

    # Convert the timestamp to unix timestamp format.
    # Unix timestamp = number of seconds since 00:00:00 UTC, 1 January 1970.
    col_ut =  F.unix_timestamp(column, format=time_format)

    # Divide the time into dicrete intervals, by rounding. 
    col_ut_agg =  F.floor(col_ut / agg_interval) * agg_interval  

    # Convert to and return a human readable timestamp
    return F.from_unixtime(col_ut_agg)`

测试如下

导入数据:

代码语言:txt复制
sdf = spark.read.csv('production.csv', header=True, inferSchema=True) 
sdf = (
    sdf
    .withColumn('_c0',f.to_timestamp(f.col('_c0')))
)
sdf.show(2)

运行

代码语言:txt复制
sdf = sdf.withColumn('dt_resampled', resample(sdf._c0, agg_interval=3600)) # 1 hour
sdf.show(5)
  1. groupby window
代码语言:txt复制
group = sdf.groupBy('Cantons', f.window("_c0", "1 day")).agg(f.sum("Production").alias('Sum Production'))
group.show(5,truncate=False)
代码语言:txt复制
sdf_resampled = group.select(group.window.start.alias("Start"), group.window.end.alias("End"), "Cantons", "Sum Production").orderBy('Start', ascending=True)
sdf_resampled.show()

)

0 人点赞