天天看点

Flink之多维数据立方体的建立及自定义RedisSink

文章目录

    • 一、需求
    • 二、代码实现
        • 1、主线代码
        • 2、ActivityBean
        • 3、Constant 自定义的常量
        • 4、自定义的RedisSink

一、需求

有以下数据:

用户ID,活动ID,时间,事件类型,省份
u001,A1,2019-09-02 10:10:11,1,北京市
u001,A1,2019-09-02 14:10:11,1,北京市
u001,A1,2019-09-02 14:10:11,2,北京市
u002,A1,2019-09-02 14:10:11,1,北京市
u002,A2,2019-09-02 14:10:11,1,北京市
u002,A2,2019-09-02 15:10:11,1,北京市
u002,A2,2019-09-02 15:10:11,2,北京市
           
事件类型:
0:曝光
1:点击
2:参与
           

建立上述各个维度的数据立方体,并将统计的次数,写入到Redis中

二、代码实现

1、主线代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @date: 2020/3/13 19:44
 * @site: www.ianlou.cn
 * @author: lekko 六水
 * @qq: 496208110
 * @description: 根据上面的活动,构造数据立方体,进行多维度分组
 */
public class ActivityCountWithMultiDimension {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //u001,A1,2019-09-02 10:10:11,1,北京市 数据
        DataStreamSource<String> lines = env.socketTextStream("linux01", 8888);

        // 一、数据的读取,做切割处理
        SingleOutputStreamOperator<ActivityBean> beanDS =
                lines.map(new MapFunction<String, ActivityBean>() {

                    @Override
                    public ActivityBean map(String lines) throws Exception {

                        String[] spArr = lines.split(",");
                        String uid = spArr[0];
                        String act = spArr[1];
                        String dt = spArr[2];
                        String type = spArr[3];
                        String province = spArr[4];

                        return ActivityBean.of(uid, act, dt, type, province);
                    }
                });

        // 二、根据条件进行聚合,生成多维数据立方体
        SingleOutputStreamOperator<ActivityBean> res1 = beanDS.keyBy("aid", "type").sum("count");
        SingleOutputStreamOperator<ActivityBean> res2 = beanDS.keyBy("aid", "type", "date").sum("count");
        SingleOutputStreamOperator<ActivityBean> res3 = beanDS.keyBy("aid", "type", "date", "province").sum("count");


        /**
         *   三、将数据写入到自定义的Redis中
         *      ① 将聚合中的事件类型type字段,作为里面的小key
         *      ② 将除了事件类型type的其他字段,进行组合拼接成大key
         *      ③ 将聚合的count次数,作为值 存储
         */

        res1.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(ActivityBean bean) throws Exception {
                return Tuple3.of(Constant.ACTIVITY_COUNT + "-" + bean.getUid(), bean.getType(),
                        bean.getCount() + "");
            }
        }).addSink(new MySinkToRedis());

        res2.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(ActivityBean bean) throws Exception {
                return Tuple3.of(Constant.DAILY_ACTIVITY_COUNT + "-" + bean.getUid() + "-" + bean.getDt(),
                        bean.getType(),
                        bean.getCount() + "");
            }
        }).addSink(new MySinkToRedis());

        res3.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(ActivityBean bean) throws Exception {
                return Tuple3.of(Constant.PROVINCE_DAILY_ACTIVITY_COUNT + "-" + bean.getUid() + "-" + bean.getDt() +
                        "-" + bean.getProvince(), bean.getType(), bean.getCount() + "");
            }
        }).addSink(new MySinkToRedis());

        env.execute("ActivityCountWithMultiDimension");
    }
}
           

2、ActivityBean

当要切割处理的字段很多的时候,Tuple可能放不下,并且来回写数据类型也很麻烦。这样可以自定义一个JavaBean。

  • 自定义一个等于1的常量,用于后面的统计计数
  • 仿照Tuple,自定义一个of的静态方法
public class ActivityBean {
    private String uid;
    private String avtId;
    private String dt;
    private String type;
    private String province;
    //作为分组的时候,聚合计数
    private long count = 1L;

    public ActivityBean() {
    }
    public ActivityBean(String uid, String avtId, String dt, String type, String province) {
        this.uid = uid;
        this.avtId = avtId;
        this.dt = dt;
        this.type = type;
        this.province = province;
    }

    //仿照Tuple 自定义一个静态的of方法
    public static ActivityBean of(String uid, String avtId, String dt, String type, String province) {
        return new ActivityBean(uid, avtId, dt, type, province);

    }

    @Override
    public String toString() {
        return "ActivityBean{" +
                "uid='" + uid + '\'' +
                ", avtId='" + avtId + '\'' +
                ", dt='" + dt + '\'' +
                ", type='" + type + '\'' +
                ", province='" + province + '\'' +
                ", count=" + count +
                '}';
    }

    public String getUid() {
        return uid;
    }
    public void setUid(String uid) {
        this.uid = uid;
    }
    public String getAvtId() {
        return avtId;
    }
    public void setAvtId(String avtId) {
        this.avtId = avtId;
    }
    public String getDt() {
        return dt;
    }
    public void setDt(String dt) {
        this.dt = dt;
    }
    public String getType() {
        return type;
    }
    public void setType(String type) {
        this.type = type;
    }
    public String getProvince() {
        return province;
    }
    public void setProvince(String province) {
        this.province = province;
    }
    public long getCount() {
        return count;
    }
    public void setCount(long count) {
        this.count = count;
    }
}

           

3、Constant 自定义的常量

public class Constant {
    public static final String ACTIVITY_COUNT = "ACTIVITY_COUNT";
    public static final String DAILY_ACTIVITY_COUNT = "DAILY_ACTIVITY_COUNT";
    public static final String PROVINCE_DAILY_ACTIVITY_COUNT = "PROVINCE_DAILY_ACTIVITY_COUNT";
}
           

4、自定义的RedisSink

  • jedis就是集成了redis的一些命令操作,封装了redis的java客户端。提供了连接池管理。
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

public class MySinkToRedis extends RichSinkFunction<Tuple3<String, String, String>> {

    //定义Jedis,不让其参与序列化
    private transient Jedis jedis;

    // 创建Redis连接
    @Override
    public void open(Configuration parameters) throws Exception {
        // 1、获取全局的配置文件
        ParameterTool params = (ParameterTool) getRuntimeContext()
                .getExecutionConfig()
                .getGlobalJobParameters();

        // 2、取出响应的配置信息
        String host = params.getRequired("redis.host");
        String password = params.getRequired("redis.password");
        int port = params.getInt("redis.port", 6379);
        int db = params.getInt("redis.db", 0);

        // 3、建立Jedis连接
        Jedis jedis = new Jedis(host, port);
        jedis.auth(password);
        jedis.select(db);
        this.jedis = jedis;
    }

    // 向Redis中存放值
    @Override
    public void invoke(Tuple3<String, String, String> input, Context context) throws Exception {
    
        if (!jedis.isConnected()) {
            jedis.connect();
        }
        jedis.hset(input.f0, input.f1, input.f2);
    }

    @Override
    public void close() throws Exception {
        jedis.close();
    }
}