天天看点

Hive按特定时间窗口分组求和实例

Hive按特定时间窗口分组求和实例

样例数据:

> select * from tmp.lanfz_log;
2020-09-14 13:47:12,771 [ForkJoinPool-1-worker-3] INFO  cn.jpush.spark.parser.SqlStatisticsParser - queryId : e1a036de-3463-4ab9-a3e9-9ba6e6229227
usera    lb    2020091410    60                                              
usera    la    2020091412    60
userb    la    2020091409    60
usera    la    2020091409    60
userb    la    2020091411    60
userb    lb    2020091410    60
usera    la    2020091408    60
usera    la    2020091407    30
usera    lb    2020091413    60
userb    la    2020091408    60
usera    la    2020091411    60
Time taken: 5.622 seconds, Fetched 11 row(s)
spark-sql>
           

sql

with cte1 as (
    select
        user,
        location,
        time,
        m,
        lag(location) OVER (ORDER BY user, time) as lag_location   --
    from tmp.lanfz_log
),
cte2 as (
    select
        user,
        location,
        time,
        m,
        if(lag_location is not null and location != lag_location, 1, 0) as mark
    from cte1
),
cte3 as (
    select
        user,
        location,
        time,
        m,
        sum(mark) OVER (PARTITION BY user ORDER BY time) as session
    from cte2
)
SELECT
    user,
    location,
    min(time) as time,
    sum(m) as m
from cte3
group by user, location, session;
           

结果:

usera	la	2020091407	150
usera	lb	2020091410	60
usera	la	2020091411	120
usera	lb	2020091413	60
Time taken: 3.406 seconds, Fetched 4 row(s)
spark-sql> 
           

pyspark版

data = [
['usera', 'lcationA', '2020091407', 60],
['usera', 'lcationA', '2020091408', 30],
['usera', 'lcationA', '2020091409', 60],
['usera', 'lcationB', '2020091410', 60],
['usera', 'lcationA', '2020091411', 60],
['usera', 'lcationA', '2020091412', 60],
['usera', 'lcationB', '2020091413', 20]
]


data = sc.parallelize(data)


def merger(x):
    if len(x) == 1:
        return x
    data = sorted(list(x), key=lambda x: x[2])
    i = 1
    x = data[0]
    res = list()
    while 0 < i < len(data):
        y = data[i]
        if x[1] == y[1]:
            x[3] += y[3]
        else:
            res.append(x)
            x = y
        if i == len(data) - 1:
            res.append(x)
        i += 1
    return res


d = data.map(lambda line: ((line[0], line))).groupByKey().flatMapValues(lambda x: merger(x)).values().toDF().show()

+-----+--------+----------+---+                                                 
|   _1|      _2|        _3| _4|
+-----+--------+----------+---+
|usera|lcationA|2020091407|150|
|usera|lcationB|2020091410| 60|
|usera|lcationA|2020091411|120|
|usera|lcationB|2020091413| 20|
+-----+--------+----------+---+
           

继续阅读