樣例資料:
> select * from tmp.lanfz_log;
2020-09-14 13:47:12,771 [ForkJoinPool-1-worker-3] INFO cn.jpush.spark.parser.SqlStatisticsParser - queryId : e1a036de-3463-4ab9-a3e9-9ba6e6229227
usera lb 2020091410 60
usera la 2020091412 60
userb la 2020091409 60
usera la 2020091409 60
userb la 2020091411 60
userb lb 2020091410 60
usera la 2020091408 60
usera la 2020091407 30
usera lb 2020091413 60
userb la 2020091408 60
usera la 2020091411 60
Time taken: 5.622 seconds, Fetched 11 row(s)
spark-sql>
sql
with cte1 as (
select
user,
location,
time,
m,
lag(location) OVER (ORDER BY user, time) as lag_location --
from tmp.lanfz_log
),
cte2 as (
select
user,
location,
time,
m,
if(lag_location is not null and location != lag_location, 1, 0) as mark
from cte1
),
cte3 as (
select
user,
location,
time,
m,
sum(mark) OVER (PARTITION BY user ORDER BY time) as session
from cte2
)
SELECT
user,
location,
min(time) as time,
sum(m) as m
from cte3
group by user, location, session;
結果:
usera la 2020091407 150
usera lb 2020091410 60
usera la 2020091411 120
usera lb 2020091413 60
Time taken: 3.406 seconds, Fetched 4 row(s)
spark-sql>
pyspark版
data = [
['usera', 'lcationA', '2020091407', 60],
['usera', 'lcationA', '2020091408', 30],
['usera', 'lcationA', '2020091409', 60],
['usera', 'lcationB', '2020091410', 60],
['usera', 'lcationA', '2020091411', 60],
['usera', 'lcationA', '2020091412', 60],
['usera', 'lcationB', '2020091413', 20]
]
data = sc.parallelize(data)
def merger(x):
if len(x) == 1:
return x
data = sorted(list(x), key=lambda x: x[2])
i = 1
x = data[0]
res = list()
while 0 < i < len(data):
y = data[i]
if x[1] == y[1]:
x[3] += y[3]
else:
res.append(x)
x = y
if i == len(data) - 1:
res.append(x)
i += 1
return res
d = data.map(lambda line: ((line[0], line))).groupByKey().flatMapValues(lambda x: merger(x)).values().toDF().show()
+-----+--------+----------+---+
| _1| _2| _3| _4|
+-----+--------+----------+---+
|usera|lcationA|2020091407|150|
|usera|lcationB|2020091410| 60|
|usera|lcationA|2020091411|120|
|usera|lcationB|2020091413| 20|
+-----+--------+----------+---+