天天看點

Flink計算PV,UV的案例及問題分析

PV(通路量):即Page View, 即頁面浏覽量或點選量,使用者每次重新整理即被計算一次。

UV(獨立訪客):即Unique Visitor,通路您網站的一台電腦用戶端為一個訪客。00:00-24:00内相同的用戶端隻被計算一次。

一個UV可以用很多PV,一個PV也隻能對應一個IP

沒有這些資料的支援,意味着你不知道産品的發展情況,使用者擷取成本,UV,PV,注冊轉化率;沒有這些資料做參考,你不會知道接下來提供什麼建議給上司采納,也推測不出上司為啥煩憂,那麼就麼有任何表現的機會。

舉兩個UV計算的場景:

  1. 實時計算當天零點起,到目前時間的uv。
  2. 實時計算當天每個小時的UV。0點...12點...24點

請問這個用spark streaming如何實作呢?是不是很難有好的思路呢?

今天主要是想給大家用flink來實作一下,在這方面flink确實比較優秀了。

主要技術點就在group by的使用。

下面就是完整的案例:

package org.table.uv;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.table.descriptors.Json;

import org.apache.flink.table.descriptors.Kafka;

import org.apache.flink.table.descriptors.Rowtime;

import org.apache.flink.table.descriptors.Schema;

import org.apache.flink.types.Row;

public class ComputeUVDay {

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
    tEnv.registerFunction("DateUtil",new DateUtil());
    tEnv.connect(
            new Kafka()
                    .version("0.10")
                    //   "0.8", "0.9", "0.10", "0.11", and "universal"
                    .topic("jsontest")
                    .property("bootstrap.servers", "localhost:9092")
                    .property("group.id","test")
                    .startFromLatest()
    )
            .withFormat(
                    new Json()
                            .failOnMissingField(false)
                            .deriveSchema()
            )
            .withSchema(

                    new Schema()
                            .field("rowtime", Types.SQL_TIMESTAMP)
                            .rowtime(new Rowtime()
                                    .timestampsFromField("eventtime")
                                    .watermarksPeriodicBounded(2000)
                            )
                            .field("fruit", Types.STRING)
                            .field("number", Types.INT)
            )
            .inAppendMode()
            .registerTableSource("source");

    // 計算天級別的uv           

// Table table = tEnv.sqlQuery("select DateUtil(rowtime),count(distinct fruit) from source group by DateUtil(rowtime)");

// 計算小時級别uv
    Table table = tEnv.sqlQuery("select  DateUtil(rowtime,'yyyyMMddHH'),count(distinct fruit) from source group by DateUtil(rowtime,'yyyyMMddHH')");

    tEnv.toRetractStream(table, Row.class).addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
        @Override
        public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
            System.out.println(value.f1.toString());
        }
    });

    System.out.println(env.getExecutionPlan());
    env.execute("ComputeUVDay");
}           

}

其中DateUtil類如下:

import org.apache.flink.table.functions.ScalarFunction;

import java.sql.Timestamp;

import java.text.DateFormat;

import java.text.SimpleDateFormat;

public class DateUtil extends ScalarFunction {

public static String eval(long timestamp){
    String result = "null";
    try {
        DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        result = sdf.format(new Timestamp(timestamp));
    } catch (Exception e) {
        e.printStackTrace();
    }
    return result;
}
public static String eval(long ts, String format) {

    String result = "null";
    try {
        DateFormat sdf = new SimpleDateFormat(format);
        result = sdf.format(ts);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return result;
}
public static void main(String[] args) {
    String eval = eval(System.currentTimeMillis(),"yyyyMMddHH");
    System.out.println(eval);
}           

代碼裡面的案例,是可以用于生産中的嗎?

假如資料量小可以直接使用,

遊戲轉讓平台

每秒資料量大的話,就比較麻煩。因為你看group by後面的次元,隻有當天date 這個次元,這樣就會導緻計算狀态超級集中而使得記憶體占用超大進而引發oom。

這種情況解決辦法就是将狀态打散,然後再次聚合即可,典型的分治思想。

具體做法作為福利分享給球友吧。

還有一個問題就是由于存在全局去重及分組操作,flink内部必然要維護一定的狀态資訊,那麼這些狀态資訊肯定不是要一直儲存的,比如uv,我們隻需要更新今天,最多昨天的狀态,這個點之前的狀态要删除的,不能讓他白白占着記憶體,而導緻任務記憶體消耗巨大,甚至因oom而挂掉。

StreamQueryConfig streamQueryConfig = tEnv.queryConfig();

streamQueryConfig.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(15));

tEnv.sqlUpdate(sql,streamQueryConfig);

再有就是能使用事件時間嗎?事件時間假如事件嚴重逾時了,比如,我們狀态保留時間設定的是兩天,兩天之後狀态清除,那麼這時候來了事件時間剛剛好是兩天之前的,由于已經沒有狀态就會重新計算uv覆寫已經生成的值,就導緻值錯誤了,這個問題如何解決呢?

這算是一個疑問吧?

繼續閱讀