天天看點

Flink動态表

阿裡的一篇文章

,可以先看看會對動态表有一個模糊的概念

動态表就是一個根據流在動态變化的表。從阿裡的例子可以看出,當一個表Stream發生改變的時候,就會引起Keyed Table這張表的一個動态變化,表Stream是一個無法撤回的表,Stream表是隻能不停增加的一張表,但是Keyed Table 會根據Stream中資料的增長的變化來修改自己count出來的值,随着count值的改變就會使得以count為key的第二張表的改變,第二張表才是我們需要的結果。第一張表隻是一個過渡的表,但是有了第一張表才能滿足我們第二張的要求。

将阿裡的第一張表以java代碼寫出

package com.yjp.flink.retraction;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
 
public class RetractionITCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
        env.getConfig().disableSysoutLogging();
        DataStream<Tuple2<String, Integer>> dataStream =
                env.fromElements(
                        new Tuple2<>("hello", 1),
                        new Tuple2<>("word", 1),
                        new Tuple2<>("hello", 1),
                        new Tuple2<>("bark", 1),
                        new Tuple2<>("bark", 1),
                        new Tuple2<>("bark", 1),
                        new Tuple2<>("bark", 1),
                        new Tuple2<>("bark", 1),
                        new Tuple2<>("bark", 1),
                        new Tuple2<>("flink", 1)
                );
        tEnv.registerDataStream("demo1", dataStream, "word ,num");
        Table table = tEnv.sqlQuery("select * from demo1 ").groupBy("word")
                .select("word AS word ,num.sum AS count")
                .groupBy("count").select("count , word.count as frequency");
        tEnv.toRetractStream(table, Word.class).print();
        env.execute("demo");
    }
}

package com.yjp.flink.retraction;
 
public class Word {
    private Integer count;
    private Long frequency;
 
    public Word() {
    }
 
    public Integer getCount() {
        return count;
    }
 
    public void setCount(Integer count) {
        this.count = count;
    }
 
    public Long getFrequency() {
        return frequency;
    }
 
    public void setFrequency(Long frequency) {
        this.frequency = frequency;
    }
 
    @Override
    public String toString() {
        return "Word{" +
                "count=" + count +
                ", frequency=" + frequency +
                '}';
    }
}

結果:

2> (true,Word{count=1, frequency=1})
2> (false,Word{count=1, frequency=1})
2> (true,Word{count=1, frequency=2})
4> (true,Word{count=3, frequency=1})
4> (false,Word{count=3, frequency=1})
4> (true,Word{count=4, frequency=1})
4> (false,Word{count=4, frequency=1})
2> (false,Word{count=1, frequency=2})
2> (true,Word{count=1, frequency=3})
2> (false,Word{count=1, frequency=3})
3> (true,Word{count=6, frequency=1})
1> (true,Word{count=2, frequency=1})
1> (false,Word{count=2, frequency=1})
1> (true,Word{count=5, frequency=1})
1> (false,Word{count=5, frequency=1})
1> (true,Word{count=2, frequency=1})
2> (true,Word{count=1, frequency=2})
2> (false,Word{count=1, frequency=2})
2> (true,Word{count=1, frequency=3})
2> (false,Word{count=1, frequency=3})
2> (true,Word{count=1, frequency=2})

從結果來分析,我們所希望達到的的目标是:6,1 6個bark 2,1兩個hello 1,2 分别是word flink

前面數字相同的是同一組操作,true代表的是寫入,false代表的是撤回,true和false一樣就會抵消,然後就會發現結果和我們預想的結果是一樣的,如果沒有撤回操作,阿裡的文章已經說明了。
我們在看阿裡的第二個例子:看第二個例子的時候會好奇StringLast這個函數應該怎樣去實作,java實作如下

package com.yjp.flink.retract;
 
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
 
public class ALiTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
        env.getConfig().disableSysoutLogging();
        DataStreamSource<Tuple3<String, String, Long>> dataStream = env.fromElements(
                new Tuple3<>("0001", "中通", 1L),
                new Tuple3<>("0002", "中通", 2L),
                new Tuple3<>("0003", "圓通", 3L),
                new Tuple3<>("0001", "圓通", 4L)
 
        );
        tEnv.registerDataStream("Ali", dataStream, "order_id ,company,timestamp");
        tEnv.registerFunction("agg", new AliAggrete());
        Table table = tEnv.sqlQuery("select * from Ali  ")
                .groupBy("order_id").select(" order_id,agg(company,timestamp) As company")
                .groupBy("company").select("company , order_id.count as order_cnt");
        tEnv.toRetractStream(table, ALi.class).print();
        env.execute("ALi");
    }
}

package com.yjp.flink.retract;
 
import org.apache.flink.table.functions.AggregateFunction;
 
public class AliAggrete extends AggregateFunction<String, ALiAccum> {
    @Override
    public ALiAccum createAccumulator() {
        return new ALiAccum();
    }
 
    @Override
    public String getValue(ALiAccum aLiAccum) {
        return aLiAccum.company;
    }
 
    //更改累加器中的結果
    public void accumulate(ALiAccum aLiAccum, String company, Long time) {
        if (time > aLiAccum.timeStamp) {
            aLiAccum.company = company;
        }
    }
 
 
//    public void retract(ALiAccum aLiAccum, String company, Long time) {
//        aLiAccum.company = company;
//        aLiAccum.timeStamp = time;
//    }
 
 
//    public void resetAccumulator(ALiAccum aLiAccum) {
//        aLiAccum.company = null;
//        aLiAccum.timeStamp = 0L;
//    }
 
//    public void merge(ALiAccum acc, Iterable<ALiAccum> it) {
//        Iterator<ALiAccum> iter = it.iterator();
//        while (iter.hasNext()) {
//            ALiAccum aLiAccum = iter.next();
//            if (aLiAccum.timeStamp > acc.timeStamp) {
//                acc.company = aLiAccum.company;
//            }
//        }
//    }
}

package com.yjp.flink.retract;
 
public class ALiAccum {
    public String company = null;
    public Long timeStamp = 0L;
 
 
}

package com.yjp.flink.retract;
 
public class ALi {
    private String company;
    private Long order_cnt;
 
 
    public ALi() {
    }
 
    public String getCompany() {
        return company;
    }
 
    public void setCompany(String company) {
        this.company = company;
    }
 
    public Long getOrder_cnt() {
        return order_cnt;
    }
 
    public void setOrder_cnt(Long order_cnt) {
        this.order_cnt = order_cnt;
    }
 
    @Override
    public String toString() {
        return "ALi{" +
                "company='" + company + '\'' +
                ", order_cnt=" + order_cnt +
                '}';
    }
}

           

這個整個就是阿裡第二個例子用代碼去實作,timestamp這個字段其實可以不用給,因為每個流進入的時候就會自帶一個時間戳,但是會有亂序的考慮,如果不考慮亂序就用自帶的時間戳就可以了。

分析整個邏輯代碼

tEnv.registerFunction(“agg”, new AliAggrete());

将我們自己實作的聚合的函數注冊, Table table = tEnv.sqlQuery(“select * from Ali “)将流轉換為第一張Stream表, .groupBy(“order_id”).select(” order_id,agg(company,timestamp) As company”)以訂單id分組,相同id的訂單會進入同一組,然後我們通過我們自定義的聚合函數去實作隻發送時間戳最大的那個記錄,實作的原理,ALiAccum這個類是為了将我們company,timestamp兩個字段形成映射關系,然後AggregateFunction String為傳回類型,我們這裡需要傳回的是公司的名字,是以為String類型,ALiAccum是我們傳入的兩個字段,之前将兩個字段映射為了POJP對象,首先會調用createAccumulator()方法,建立一個資料結構來儲存聚合的中間結果,然後通過accumulate()方法來該更中間結果的值,最後通過getValue()來傳回我們真正需要的值。最後對我們操作過的這張表進行查詢操作,就得到我們想要的結果了。主要就是自己需要實作Agg函數。