文章目录
-
- 一、需求
- 二、代码实现
-
-
- 1、主线代码
- 2、ActivityBean
- 3、Constant 自定义的常量
- 4、自定义的RedisSink
-
一、需求
有以下数据:
用户ID,活动ID,时间,事件类型,省份
u001,A1,2019-09-02 10:10:11,1,北京市
u001,A1,2019-09-02 14:10:11,1,北京市
u001,A1,2019-09-02 14:10:11,2,北京市
u002,A1,2019-09-02 14:10:11,1,北京市
u002,A2,2019-09-02 14:10:11,1,北京市
u002,A2,2019-09-02 15:10:11,1,北京市
u002,A2,2019-09-02 15:10:11,2,北京市
事件类型:
0:曝光
1:点击
2:参与
建立上述各个维度的数据立方体,并将统计的次数,写入到Redis中
二、代码实现
1、主线代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @date: 2020/3/13 19:44
* @site: www.ianlou.cn
* @author: lekko 六水
* @qq: 496208110
* @description: 根据上面的活动,构造数据立方体,进行多维度分组
*/
public class ActivityCountWithMultiDimension {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//u001,A1,2019-09-02 10:10:11,1,北京市 数据
DataStreamSource<String> lines = env.socketTextStream("linux01", 8888);
// 一、数据的读取,做切割处理
SingleOutputStreamOperator<ActivityBean> beanDS =
lines.map(new MapFunction<String, ActivityBean>() {
@Override
public ActivityBean map(String lines) throws Exception {
String[] spArr = lines.split(",");
String uid = spArr[0];
String act = spArr[1];
String dt = spArr[2];
String type = spArr[3];
String province = spArr[4];
return ActivityBean.of(uid, act, dt, type, province);
}
});
// 二、根据条件进行聚合,生成多维数据立方体
SingleOutputStreamOperator<ActivityBean> res1 = beanDS.keyBy("aid", "type").sum("count");
SingleOutputStreamOperator<ActivityBean> res2 = beanDS.keyBy("aid", "type", "date").sum("count");
SingleOutputStreamOperator<ActivityBean> res3 = beanDS.keyBy("aid", "type", "date", "province").sum("count");
/**
* 三、将数据写入到自定义的Redis中
* ① 将聚合中的事件类型type字段,作为里面的小key
* ② 将除了事件类型type的其他字段,进行组合拼接成大key
* ③ 将聚合的count次数,作为值 存储
*/
res1.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(ActivityBean bean) throws Exception {
return Tuple3.of(Constant.ACTIVITY_COUNT + "-" + bean.getUid(), bean.getType(),
bean.getCount() + "");
}
}).addSink(new MySinkToRedis());
res2.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(ActivityBean bean) throws Exception {
return Tuple3.of(Constant.DAILY_ACTIVITY_COUNT + "-" + bean.getUid() + "-" + bean.getDt(),
bean.getType(),
bean.getCount() + "");
}
}).addSink(new MySinkToRedis());
res3.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(ActivityBean bean) throws Exception {
return Tuple3.of(Constant.PROVINCE_DAILY_ACTIVITY_COUNT + "-" + bean.getUid() + "-" + bean.getDt() +
"-" + bean.getProvince(), bean.getType(), bean.getCount() + "");
}
}).addSink(new MySinkToRedis());
env.execute("ActivityCountWithMultiDimension");
}
}
2、ActivityBean
当要切割处理的字段很多的时候,Tuple可能放不下,并且来回写数据类型也很麻烦。这样可以自定义一个JavaBean。
- 自定义一个等于1的常量,用于后面的统计计数
- 仿照Tuple,自定义一个of的静态方法
public class ActivityBean {
private String uid;
private String avtId;
private String dt;
private String type;
private String province;
//作为分组的时候,聚合计数
private long count = 1L;
public ActivityBean() {
}
public ActivityBean(String uid, String avtId, String dt, String type, String province) {
this.uid = uid;
this.avtId = avtId;
this.dt = dt;
this.type = type;
this.province = province;
}
//仿照Tuple 自定义一个静态的of方法
public static ActivityBean of(String uid, String avtId, String dt, String type, String province) {
return new ActivityBean(uid, avtId, dt, type, province);
}
@Override
public String toString() {
return "ActivityBean{" +
"uid='" + uid + '\'' +
", avtId='" + avtId + '\'' +
", dt='" + dt + '\'' +
", type='" + type + '\'' +
", province='" + province + '\'' +
", count=" + count +
'}';
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getAvtId() {
return avtId;
}
public void setAvtId(String avtId) {
this.avtId = avtId;
}
public String getDt() {
return dt;
}
public void setDt(String dt) {
this.dt = dt;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
}
3、Constant 自定义的常量
public class Constant {
public static final String ACTIVITY_COUNT = "ACTIVITY_COUNT";
public static final String DAILY_ACTIVITY_COUNT = "DAILY_ACTIVITY_COUNT";
public static final String PROVINCE_DAILY_ACTIVITY_COUNT = "PROVINCE_DAILY_ACTIVITY_COUNT";
}
4、自定义的RedisSink
- jedis就是集成了redis的一些命令操作,封装了redis的java客户端。提供了连接池管理。
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
public class MySinkToRedis extends RichSinkFunction<Tuple3<String, String, String>> {
//定义Jedis,不让其参与序列化
private transient Jedis jedis;
// 创建Redis连接
@Override
public void open(Configuration parameters) throws Exception {
// 1、获取全局的配置文件
ParameterTool params = (ParameterTool) getRuntimeContext()
.getExecutionConfig()
.getGlobalJobParameters();
// 2、取出响应的配置信息
String host = params.getRequired("redis.host");
String password = params.getRequired("redis.password");
int port = params.getInt("redis.port", 6379);
int db = params.getInt("redis.db", 0);
// 3、建立Jedis连接
Jedis jedis = new Jedis(host, port);
jedis.auth(password);
jedis.select(db);
this.jedis = jedis;
}
// 向Redis中存放值
@Override
public void invoke(Tuple3<String, String, String> input, Context context) throws Exception {
if (!jedis.isConnected()) {
jedis.connect();
}
jedis.hset(input.f0, input.f1, input.f2);
}
@Override
public void close() throws Exception {
jedis.close();
}
}