PV(通路量):即Page View, 即頁面浏覽量或點選量,使用者每次重新整理即被計算一次。
UV(獨立訪客):即Unique Visitor,通路您網站的一台電腦用戶端為一個訪客。00:00-24:00内相同的用戶端隻被計算一次。
一個UV可以用很多PV,一個PV也隻能對應一個IP
沒有這些資料的支援,意味着你不知道産品的發展情況,使用者擷取成本,UV,PV,注冊轉化率;沒有這些資料做參考,你不會知道接下來提供什麼建議給上司采納,也推測不出上司為啥煩憂,那麼就麼有任何表現的機會。
舉兩個UV計算的場景:
- 實時計算當天零點起,到目前時間的uv。
- 實時計算當天每個小時的UV。0點...12點...24點
請問這個用spark streaming如何實作呢?是不是很難有好的思路呢?
今天主要是想給大家用flink來實作一下,在這方面flink确實比較優秀了。
主要技術點就在group by的使用。
下面就是完整的案例:
package org.table.uv;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class ComputeUVDay {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.registerFunction("DateUtil",new DateUtil());
tEnv.connect(
new Kafka()
.version("0.10")
// "0.8", "0.9", "0.10", "0.11", and "universal"
.topic("jsontest")
.property("bootstrap.servers", "localhost:9092")
.property("group.id","test")
.startFromLatest()
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(
new Schema()
.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(2000)
)
.field("fruit", Types.STRING)
.field("number", Types.INT)
)
.inAppendMode()
.registerTableSource("source");
// 計算天級別的uv
// Table table = tEnv.sqlQuery("select DateUtil(rowtime),count(distinct fruit) from source group by DateUtil(rowtime)");
// 計算小時級别uv
Table table = tEnv.sqlQuery("select DateUtil(rowtime,'yyyyMMddHH'),count(distinct fruit) from source group by DateUtil(rowtime,'yyyyMMddHH')");
tEnv.toRetractStream(table, Row.class).addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
System.out.println(value.f1.toString());
}
});
System.out.println(env.getExecutionPlan());
env.execute("ComputeUVDay");
}
}
其中DateUtil類如下:
import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
public class DateUtil extends ScalarFunction {
public static String eval(long timestamp){
String result = "null";
try {
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
result = sdf.format(new Timestamp(timestamp));
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
public static String eval(long ts, String format) {
String result = "null";
try {
DateFormat sdf = new SimpleDateFormat(format);
result = sdf.format(ts);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
public static void main(String[] args) {
String eval = eval(System.currentTimeMillis(),"yyyyMMddHH");
System.out.println(eval);
}
代碼裡面的案例,是可以用于生産中的嗎?
假如資料量小可以直接使用,
遊戲轉讓平台每秒資料量大的話,就比較麻煩。因為你看group by後面的次元,隻有當天date 這個次元,這樣就會導緻計算狀态超級集中而使得記憶體占用超大進而引發oom。
這種情況解決辦法就是将狀态打散,然後再次聚合即可,典型的分治思想。
具體做法作為福利分享給球友吧。
還有一個問題就是由于存在全局去重及分組操作,flink内部必然要維護一定的狀态資訊,那麼這些狀态資訊肯定不是要一直儲存的,比如uv,我們隻需要更新今天,最多昨天的狀态,這個點之前的狀态要删除的,不能讓他白白占着記憶體,而導緻任務記憶體消耗巨大,甚至因oom而挂掉。
StreamQueryConfig streamQueryConfig = tEnv.queryConfig();
streamQueryConfig.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(15));
tEnv.sqlUpdate(sql,streamQueryConfig);
再有就是能使用事件時間嗎?事件時間假如事件嚴重逾時了,比如,我們狀态保留時間設定的是兩天,兩天之後狀态清除,那麼這時候來了事件時間剛剛好是兩天之前的,由于已經沒有狀态就會重新計算uv覆寫已經生成的值,就導緻值錯誤了,這個問題如何解決呢?
這算是一個疑問吧?