天天看点

Flink实战 - 利用SessionWindow处理心跳数据

Flink实战 - 利用SessionWindow处理直播间用户心跳数据

1.SessionWindow 介绍

先看看Flink提供的多种计算窗口

  • countWindow:基于数据,根据数据的条数来划定一次聚合计算范围。
  • timeWindow:根据时间划分聚合计算的范围。

timeWindow又分为三种:

  • Tumbling window 滚动窗口
  • Sliding Window 滑动窗口
  • Session Window 会话窗口

其他窗口不再叙述,主要说说SessionWindow。

先说下session,web开发中,服务器会给每个用户浏览器创建一个session(会话对象),用户的状态都会在这个session中保存,这样我们不需要重复登录就可以做一些操作,当用户的操作停止了一定的时间后,该session会失效,表示一个session的结束。

流计算的是无界的,但是很多场景下,数据的计算周期还是有范围的。根据上述session的特性,flink提供了一个Session Window,每个窗口的划分范围为一簇数据从开始出现到最后一次出现之间的所有数据。

2.数据需求

  1. 用户在观看直播间时,进入直播间的这个事件可以在业务逻辑中进行判断,退出直播间在正常情况下,也可以在业务逻辑中判断。但是会出现非正常的情况,比如我每次停止使用一个APP时,最简单的方式就是杀死这个APP,而不是正常的退出操作,还有其他的情况,比如没话费断网了,家里停电WiFi断掉了,就没法正常统计用户观看时长。
  2. 实时观看直播间的uv,就是实时统计直播间有多少用户在观看。这时如果有1中所述的情况,就会出现无法正常的判断观众退出,uv的数据会不准确。
  3. 用户足迹,当我观看一个直播间的时长超过1分钟,那么表示我可能喜欢这个直播间,就会把这个APP记录到我的观看记录中,方便后续直接进入。

3.需求分析

  1. 用户的每个心跳数据进行累计,但用户退出直播间时,可以计算他的停留时长,这样也可以解决第三个问题。
  2. 第二个需求是需要实时统计有多少人在,这时就需要统计有多少人还在直播间观看,第一步就是要先记录在观看的人数,然后还需要统计退出直播间的人数。这里需要注意的是怎么去获取用户进入直播间和退出直播间的事件。

4.数据的处理

  • 源数据:

心跳数据:每个用户在每个直播间,每10秒发送一次心跳数据,其中包含了用户、主播、直播间的信息,还有每次的心跳时间。

数据存储:心跳数据量很大,可以放在高吞吐的kafka中。

心跳数据格式:

{
	"userId": 10000,
	"liveRoomId": 123456,
	"broadcasterId": 100002,
	"scene": 2,
	"heartTimestamp": 1592106002035,
	"roomType": 2,
	"userIp": "192.168.1.1",
	"liveScene": 123
}
           
  • 程序处理

源数据对应的Java类,用户每次心跳的数据信息,这里只列举本次能用到的字段

public class UserHeart implements Serializable {
    /** 用户id */
    private Long userId;
    /** 直播间id */
    private Long liveRoomId;
    /** 直播间主播id */
    private Long broadcasterId;
    /** 心跳场景:1直播心跳、2观众心跳 */
    private Integer scene;
    /** 心跳时间:ms */
    private Long heartTimestamp;
    /** 直播间类型 */
    private Integer roomType;
    /** 客户ip */
    private String userIp;
    /** 直播间开播场次 */
    private Integer liveScene;
}
           

需要计算用户心跳数据,就需要其他的一些属性,比如每次进入直播间后第一次心跳为首次心跳时间,最近的一次心跳为最近的心跳时间(退出直播间为退出直播间时间)。

消费kafka中的数据并将json转为对象

// kafka 数据读取,具体配置依照具体情况来定
Properties kafkaSourcePro = new Properties();
kafkaSourcePro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092");
kafkaSourcePro.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
kafkaSourcePro.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
kafkaSourcePro.put(ConsumerConfig.GROUP_ID_CONFIG, "heart-kafka-group");

// 获取心跳源数据
SingleOutputStreamOperator<String> odsUserHeartStream = env.addSource(new FlinkKafkaConsumer011<String>("user-heart-topic", new SimpleStringSchema(), kafkaSourcePro));

// 心跳源数据转换成UserHeart对象
SingleOutputStreamOperator<UserHeart> userHeartStream = odsUserHeartStream
        .filter(StringUtils::isBlank).name("filter-stringisnull")
        .map(a -> JSONObject.parseObject(a, UserHeart.class)).name("userheart-map").uid("userheart-map");
           

过滤出观众的数据,并转成方便统计心跳的对象(此处的逻辑也可以在UserHeart对象中完成,但是为了将每个步骤都体现出来,因此创建了UserStayTime对象)转换为UserStayTime 对象,UserStayTime 类继承了UserHeart 类。

UserStayTime 类:

public class UserStayTime extends UserHeart {
    /** 本次进入直播间的首次心跳时间 */
    private Long startTime;
    /** 本次进入直播间的最后一次心跳时间 */
    private Long endTime;
    /** 统计进入直播间后一共有多少次心跳 */
    private Long heartCount;
    /** 在直播间时长 */
    private Long stayTime;
	/** 用户进入直播间事件:1,用户退出直播间:2 */
    private Integer eventType;
}
           

所有的心跳数据过滤出观众的心跳数据并转换。此处只是将心跳数据格式作了个转换,每个单独的心跳数据,首次心跳时间和结束时间都用本次心跳时间来表示,心跳次数也设置为1次。

// 过滤观众心跳计算
SingleOutputStreamOperator<UserStayTime> audienceHeartStream = userHeartStream
        .flatMap(new FlatMapFunction<UserHeart, UserStayTime>() {
                     @Override
                     public void flatMap(UserHeart userHeart, Collector<UserStayTime> collector) throws Exception {
                         if (userHeart.getScene() == 2) {
                             UserStayTime userStayTime = new UserStayTime();
                             userStayTime.setUserId(userHeart.getUserId());
                             userStayTime.setLiveRoomId(userHeart.getLiveRoomId());
                             userStayTime.setBroadcasterId(userHeart.getBroadcasterId());
                             userStayTime.setLiveScene(userHeart.getLiveScene());
                             userStayTime.setStartTime(userHeart.getHeartTimestamp());   // 首次心跳时间设置为此次心跳时间
                             userStayTime.setEndTime(userHeart.getHeartTimestamp());     // 最后一次心跳时间也设置为心跳时间
                             userStayTime.setHeartCount(1L);     // 心跳次数设置为1
                             userStayTime.setRoomType(userHeart.getRoomType());
                             collector.collect(userStayTime);
                         }
                     }
                 }
        ).name("audience-heart-flatmap").uid("audience-heart-flatmap");
           

------------------------------------------------- 重点来啦 -------------------------------------------------

利用SessionWindow计算心跳数据

对数据打上水印,此处看下flink1.11中是怎么设置水位(Watermark水印)的。

// 此处是flink1.11.1版本中的写法
audienceHeartStream
	   	.assignTimestampsAndWatermarks(WatermarkStrategy
	           .<UserStayTime>forBoundedOutOfOrderness(Duration.ofSeconds(3))
	           .withTimestampAssigner(new SerializableTimestampAssigner<UserStayTime>() {
	               @Override
	               public long extractTimestamp(UserStayTime userStayTime, long l) {
	                   return userStayTime.getHeartTimestamp();
	               }
	           }));

// flink 1.11 之前可以这样打水印,此方式在1.11中已经设置为了过期
audienceHeartStream
	   .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserStayTime>(Time.seconds(3)) {
	       @Override
	       public long extractTimestamp(UserStayTime element) {
	           return element.getHeartTimestamp();
	       }
	   });
           

数据的维度是 “观众-直播间”,所有key为userid + roomid。

统计的地方多说下,此处使用了 ReduceFunction,有两个原因,一是因为 reduceFunction 是增量计算函数,在使用窗口计算时,利用增量计算可以大大减少在内存中数据的堆积,因为reduce会把每个数据计算完后丢弃,只留下最终计算结果。第二个原因是,reduce可以将数据进行交换操作,比如此处心跳计算,第一个心跳到来,reduce将心跳次数记为1,首次心跳时间也记为该次心跳发生时间,最后一次心跳时间也记为该次时间,保存到state中,这样,第二条心跳数据过来后,就可以拿到上条数据的首次心跳时间,心跳次数 +1,将自己的首次心跳时间设置为state中的开始时间,最后一次心跳时间为此心跳时间。依次可以看到,每次输出的数据中都会携带本次进入直播间的首次心跳时间,并且计算了心跳次数。

audienceHeartWMStream
        .keyBy(a -> Long.toString(a.getLiveRoomId()) + a.getUserId())
        .window(EventTimeSessionWindows.withGap(Time.seconds(30)))  // 心跳间隔超过30s表示已经退出此直播间
        .reduce((x, y) -> {
            y.setHeartCount(x.getHeartCount() + y.getHeartCount()); // 心跳次数累加
            y.setStartTime(x.getStartTime());    // 利用 reduce的特性,用户进入直播间开始
            y.setStayTime(y.getHeartTimestamp() - x.getStartTime());    // 停留时长计算:最后一次的心跳时间 - 首次心跳的时间
            return y;
        }).name("audience-heart-reduce");
           

得到的结果就是用户此次在直播间的首次心跳时间和最后一次心跳时间,时间相减就是观看直播间的时长。

完整的代码:

package com.zyx.bigdata.flink.streaming.userheart.userstay;

import com.alibaba.fastjson.JSONObject;
import com.zyx.bigdata.flink.streaming.userheart.entity.UserHeart;
import com.zyx.bigdata.flink.streaming.userheart.userstay.entity.UserStayTime;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.time.Duration;
import java.util.Properties;

/**
 * @author axin
 * @version 1.0
 * @date 2020/9/13 17:08
 * @describe 使用sessionwindow 计算用户在直播间停留时长统计
 */
public class UserRoomStayTimeJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(300000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setParallelism(1);  // 本地测试一般都需要将并行度设置为1

        Properties kafkaSourcePro = new Properties();
        kafkaSourcePro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092");
        kafkaSourcePro.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        kafkaSourcePro.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        kafkaSourcePro.put(ConsumerConfig.GROUP_ID_CONFIG, "heart-kafka-group");

        // 获取心跳源数据
        SingleOutputStreamOperator<String> odsUserHeartStream = env.addSource(new FlinkKafkaConsumer011<String>("user-heart-topic", new SimpleStringSchema(), kafkaSourcePro));

        // 心跳源数据转换
        SingleOutputStreamOperator<UserHeart> userHeartStream = odsUserHeartStream
                .filter(StringUtils::isNotBlank).name("filter-stringisnull")
                .map(a -> JSONObject.parseObject(a, UserHeart.class)).name("userheart-map").uid("userheart-map");

        // 过滤观众心跳计算
        SingleOutputStreamOperator<UserStayTime> audienceHeartStream = userHeartStream
                .flatMap(new FlatMapFunction<UserHeart, UserStayTime>() {
                             @Override
                             public void flatMap(UserHeart userHeart, Collector<UserStayTime> collector) throws Exception {
                                 if (userHeart.getScene() == 2) {
                                     UserStayTime userStayTime = new UserStayTime();
                                     userStayTime.setUserId(userHeart.getUserId());
                                     userStayTime.setLiveRoomId(userHeart.getLiveRoomId());
                                     userStayTime.setBroadcasterId(userHeart.getBroadcasterId());
                                     userStayTime.setLiveScene(userHeart.getLiveScene());
                                     userStayTime.setStartTime(userHeart.getHeartTimestamp());   // 首次心跳时间设置为此次心跳时间
                                     userStayTime.setEndTime(userHeart.getHeartTimestamp());     // 最后一次心跳时间也设置为心跳时间
                                     userStayTime.setHeartCount(1L);     // 心跳次数设置为1
                                     userStayTime.setRoomType(userHeart.getRoomType());
                                     userStayTime.setHeartTimestamp(userHeart.getHeartTimestamp());
                                     collector.collect(userStayTime);
                                 }
                             }
                         }
                ).name("audience-heart-flatmap").uid("audience-heart-flatmap");

        // 利用sessionwindow计算
        SingleOutputStreamOperator<UserStayTime> audienceHeartResultStream = audienceHeartStream
                .assignTimestampsAndWatermarks(WatermarkStrategy
                .<UserStayTime>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<UserStayTime>() {
                    @Override
                    public long extractTimestamp(UserStayTime userStayTime, long l) {
                        return userStayTime.getHeartTimestamp();
                    }
                }))
                .keyBy(a -> Long.toString(a.getLiveRoomId()) + a.getUserId())
                .window(EventTimeSessionWindows.withGap(Time.seconds(30)))  // 心跳间隔超过30s表示已经退出此直播间
                .reduce((x, y) -> {
                    y.setHeartCount(x.getHeartCount() + y.getHeartCount()); // 心跳次数累加
                    y.setStartTime(x.getStartTime());    // 利用 reduce的特性,用户进入直播间开始
                    y.setStayTime(y.getHeartTimestamp() - x.getStartTime());    // 停留时长计算:最后一次的心跳时间 - 首次心跳的时间
                    return y;
                }).name("audience-heart-reduce");

        // 结果输出
        audienceHeartResultStream.print("all");
//        audienceHeartResultStream.addSink(...);
        env.execute("audience-heart");
    }
}
           

还有一个需求就是实时统计在直播间的观看人数

这时需要解决两个问题:进入直播间事件和退出直播间事件

上面的代码中需要再写一个逻辑获取首次心跳事件,后面将两个事件结合,即可求得。

继续阅读