天天看點

《Flink應用實戰》(四)--合并流-IntervalJoin算子

目錄

一、基本概念

1.流合并條件

2.Flink 中支援 雙流join 的算子

二、IntervalJoin介紹

1.IntervalJoin說明

2.文法格式

3. 原碼解析

三、IntervalJoin開發實踐

1.訂單主表和明細表合成訂單寬表

一、基本概念

1.流合并條件

Flink 中的兩個流要實作 Join 操作,必須滿足以下兩點:

  1. 流需要能夠等待,即:兩個流必須在同一個視窗中;
  2. 雙流等值 Join,即:兩個流中,必須有一個字段相等才能夠 Join 上。

2.Flink 中支援 雙流join 的算子

Flink 中支援雙流 Join 的算子目前已知有5種,如下:

  1. union

    :union 支援雙流 Join,也支援多流 Join。多個流類型必須一緻;
  2. connector

    :connector 支援雙流 Join,兩個流的類型可以不一緻;
  3. join

    :該方法隻支援 inner join,即:相同視窗下,兩個流中,Key都存在且相同時才會關聯成功;
  4. coGroup

    :同樣能夠實作雙流 Join。即:将同一 Window 視窗内的兩個DataStream 聯合起來,兩個流按照 Key 來進行關聯,并通過 apply()方法 new CoGroupFunction() 的形式,重寫 join() 方法進行邏輯處理。
  5. intervalJoin

    :Interval Join 沒有 Window 視窗的概念,直接用時間戳作為關聯的條件,更具表達力。

join() 和 coGroup() 都是 Flink 中用于連接配接多個流的算子,但是兩者也有一定的差別,推薦能使用 coGroup 不要使用Join,因為coGroup更強大(

**inner join 除外。就 inner join 的話推薦使用 join ,因為在 join 的政策上做了優化,更高效**

二、IntervalJoin介紹

1.IntervalJoin說明

Flink中基于DataStream的join,隻能實作在同一個視窗的兩個資料流進行join,但是在實際中常常會存在資料亂序或者延時的情況,導緻兩個流的資料進度不一緻,就會出現資料跨視窗的情況,那麼資料就無法在同一個視窗内join。

Flink基于KeyedStream提供的interval join機制,intervaljoin 連接配接兩個keyedStream, 按照相同的key在一個相對資料時間的時間段内進行連接配接。

2.文法格式

LeftDStream.keyBy(訂單主表::Id)
    .intervalJoin(RightDStream.keyBy(訂單明細表::Order_id))
    .between(Time.seconds(-5), Time.seconds(5))
    .process();
           
  1. 分别對LeftDStream和RightDStream通過訂單id進行keyBy操作,得到兩個KeyedStream,再進行intervalJoin操作。
  2. between方法傳遞的兩個參數lowerBound和upperBound,用來控制右邊的流可以與哪個時間範圍内的左邊的流進行關聯,即:
    leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound

相當于左邊的流可以晚到lowerBound(lowerBound為負的話)時間,也可以早到upperBound(upperBound為正的話)時間。

  1. 使用Interval Join時,必須要指定的時間類型為EventTime。
  2. 兩個KeyedStream在進行intervalJoin并調用between方法後,跟着使用process方法
  3. process方法傳遞一個自定義的 ProcessJoinFunction 作為參數,ProcessJoinFunction的三個參數就是左邊流的元素類型,右邊流的元素類型,輸出流的元素類型。
  4. intervalJoin,底層是将兩個KeyedStream進行connect操作,得到ConnectedStreams,這樣的兩個資料流之間就可以實作狀态共享,對于intervalJoin來說就是兩個流相同key的資料可以互相通路。

概念圖:

《Flink應用實戰》(四)--合并流-IntervalJoin算子

3. 原碼解析

  1. intervaljoin首先會将兩個KeyedStream 進行connect操作得到一個ConnectedStreams, ConnectedStreams表示的是連接配接兩個資料流,并且這兩個資料流之前可以實作狀态共享, 對于intervaljoin 來說就是兩個流相同key的資料可以互相通路
  2. 在ConnectedStreams之上進行IntervalJoinOperator算子操作,該算子是intervaljoin 的核心,接下來分析一下其實作:

    a. 定義了兩個MapState<Long, List<BufferEntry<T1>>>類型的狀态對象,分别用來存儲兩個流的資料,其中Long對應資料的時間戳,List<BufferEntry<T1>>對應相同時間戳的資料

    b. 包含processElement1、processElement2兩個方法,這兩個方法都會調用processElement方法,真正資料處理的地方

private <THIS, OTHER> void processElement(
		final StreamRecord<THIS> record,
		final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
		final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
		final long relativeLowerBound,
		final long relativeUpperBound,
		final boolean isLeft) throws Exception {

	final THIS ourValue = record.getValue();
	final long ourTimestamp = record.getTimestamp();

	if (ourTimestamp == Long.MIN_VALUE) {
		throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
				"interval stream joins need to have timestamps meaningful timestamps.");
	}

	if (isLate(ourTimestamp)) {
		return;
	}

	addToBuffer(ourBuffer, ourValue, ourTimestamp);

	for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
		final long timestamp  = bucket.getKey();

		if (timestamp < ourTimestamp + relativeLowerBound ||
				timestamp > ourTimestamp + relativeUpperBound) {
			continue;
		}

		for (BufferEntry<OTHER> entry: bucket.getValue()) {
			if (isLeft) {
				collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
			} else {
				collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
			}
		}
	}

	long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
	if (isLeft) {
		internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
	} else {
		internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
	}
}
           
  • 判斷延時,資料時間小于目前的watermark值認為資料延時,則不處理
  • 将資料添加到對應的MapState<Long, List<BufferEntry<T1>>>緩存狀态中,key為資料的時間
  • 循環周遊另外一個狀态,如果滿足ourTimestamp + relativeLowerBound <=timestamp<= ourTimestamp + relativeUpperBound , 則将資料輸出給ProcessJoinFunction調用,ourTimestamp表示流入的資料時間,timestamp表示對應join的資料時間
  • 注冊一個資料清理時間方法,會調用onEventTime方法清理對應狀态資料。對于例子中orderStream比addressStream早到1到5秒,那麼orderStream的資料清理時間就是5秒之後,也就是orderStream.time+5,當watermark大于該時間就需要清理,對于addressStream是晚來的資料不需要等待,當watermark大于資料時間就可以清理掉。

整個處理邏輯都是基于資料時間的,也就是intervaljoin 必須基于EventTime語義,在between 中有做TimeCharacteristic是否為EventTime校驗, 如果不是則抛出異常。

三、IntervalJoin開發實踐

1.訂單主表和明細表合成訂單寬表

(1)需求:

  1. 實作訂單基本資訊表和訂單明細表合成訂單寬表
  2. 實作類似以下SQL功能
select a.*,b.* from OrderInfo a
left join OrderDetail b on a.id = b.order_id
           

(2)代碼實作(主要代碼):

//将orderInf資料流轉換為JavaBean并提取時間戳生成WaterMark
SingleOutputStreamOperator<OrderInfo> orderInfoWithWMDS = orderInfoStrDS.map(line -> {
            OrderInfo orderInfo = JSON.parseObject(line, OrderInfo.class);

            //yyyy-MM-dd HH:mm:ss
            String create_time = orderInfo.getCreate_time();
            String[] dateHourArr = create_time.split(" ");
            orderInfo.setCreate_date(dateHourArr[0]);
            orderInfo.setCreate_hour(dateHourArr[1].split(":")[0]);

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            long ts = sdf.parse(create_time).getTime();
            orderInfo.setCreate_ts(ts);

            return orderInfo;
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
            @Override
            public long extractTimestamp(OrderInfo element, long recordTimestamp) {
                return element.getCreate_ts();
            }
        }));

//将orderDetail資料流轉換為JavaBean并提取時間戳生成WaterMark
SingleOutputStreamOperator<OrderDetail> orderDetailWithWMDS = orderDetailStrDS.map(line -> {
            OrderDetail orderDetail = JSON.parseObject(line, OrderDetail.class);

            String create_time = orderDetail.getCreate_time();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            long ts = sdf.parse(create_time).getTime();

            orderDetail.setCreate_ts(ts);
            return orderDetail;
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
            @Override
            public long extractTimestamp(OrderDetail element, long recordTimestamp) {
                return element.getCreate_ts();
            }
        }));

//将兩個流進行JOIN
SingleOutputStreamOperator<OrderWide> orderWideDS = orderInfoWithWMDS.keyBy(OrderInfo::getId)
        .intervalJoin(orderDetailWithWMDS.keyBy(OrderDetail::getOrder_id))
        .between(Time.seconds(-5), Time.seconds(5))//生産環境,為了不丢資料,設定時間為最大網絡延遲,這裡設定了正負5秒,以防止在業務系統中主表與從表儲存的時間差
        .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
            @Override
            public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector<OrderWide> out) throws Exception {
                out.collect(new OrderWide(orderInfo, orderDetail));
            }
        });


//Bean:OrderWide
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.commons.lang3.ObjectUtils;
import java.math.BigDecimal;

@Data
@AllArgsConstructor
public class OrderWide {
    Long detail_id;
    Long order_id;
    Long sku_id;
    BigDecimal order_price;
    Long sku_num;
    String sku_name;
    Long province_id;
    String order_status;
    Long user_id;

    BigDecimal total_amount;
    BigDecimal activity_reduce_amount;
    BigDecimal coupon_reduce_amount;
    BigDecimal original_total_amount;
    BigDecimal feight_fee;
    BigDecimal split_feight_fee;
    BigDecimal split_activity_amount;
    BigDecimal split_coupon_amount;
    BigDecimal split_total_amount;

    String expire_time;
    String create_time; //yyyy-MM-dd HH:mm:ss
    String operate_time;
    String create_date; // 把其他字段處理得到
    String create_hour;

    String province_name;//查詢維表得到
    String province_area_code;
    String province_iso_code;
    String province_3166_2_code;

    Integer user_age;
    String user_gender;

    Long spu_id;     //作為次元資料 要關聯進來
    Long tm_id;
    Long category3_id;
    String spu_name;
    String tm_name;
    String category3_name;

    public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail) {
        mergeOrderInfo(orderInfo);
        mergeOrderDetail(orderDetail);
    }

    public void mergeOrderInfo(OrderInfo orderInfo) {
        if (orderInfo != null) {
            this.order_id = orderInfo.id;
            this.order_status = orderInfo.order_status;
            this.create_time = orderInfo.create_time;
            this.create_date = orderInfo.create_date;
            this.create_hour = orderInfo.create_hour;
            this.activity_reduce_amount = orderInfo.activity_reduce_amount;
            this.coupon_reduce_amount = orderInfo.coupon_reduce_amount;
            this.original_total_amount = orderInfo.original_total_amount;
            this.feight_fee = orderInfo.feight_fee;
            this.total_amount = orderInfo.total_amount;
            this.province_id = orderInfo.province_id;
            this.user_id = orderInfo.user_id;
        }
    }

    public void mergeOrderDetail(OrderDetail orderDetail) {
        if (orderDetail != null) {
            this.detail_id = orderDetail.id;
            this.sku_id = orderDetail.sku_id;
            this.sku_name = orderDetail.sku_name;
            this.order_price = orderDetail.order_price;
            this.sku_num = orderDetail.sku_num;
            this.split_activity_amount = orderDetail.split_activity_amount;
            this.split_coupon_amount = orderDetail.split_coupon_amount;
            this.split_total_amount = orderDetail.split_total_amount;
        }
    }

    public void mergeOtherOrderWide(OrderWide otherOrderWide) {
        this.order_status = ObjectUtils.firstNonNull(this.order_status, otherOrderWide.order_status);
        this.create_time = ObjectUtils.firstNonNull(this.create_time, otherOrderWide.create_time);
        this.create_date = ObjectUtils.firstNonNull(this.create_date, otherOrderWide.create_date);
        this.coupon_reduce_amount = ObjectUtils.firstNonNull(this.coupon_reduce_amount, otherOrderWide.coupon_reduce_amount);
        this.activity_reduce_amount = ObjectUtils.firstNonNull(this.activity_reduce_amount, otherOrderWide.activity_reduce_amount);
        this.original_total_amount = ObjectUtils.firstNonNull(this.original_total_amount, otherOrderWide.original_total_amount);
        this.feight_fee = ObjectUtils.firstNonNull(this.feight_fee, otherOrderWide.feight_fee);
        this.total_amount = ObjectUtils.firstNonNull(this.total_amount, otherOrderWide.total_amount);
        this.user_id = ObjectUtils.<Long>firstNonNull(this.user_id, otherOrderWide.user_id);
        this.sku_id = ObjectUtils.firstNonNull(this.sku_id, otherOrderWide.sku_id);
        this.sku_name = ObjectUtils.firstNonNull(this.sku_name, otherOrderWide.sku_name);
        this.order_price = ObjectUtils.firstNonNull(this.order_price, otherOrderWide.order_price);
        this.sku_num = ObjectUtils.firstNonNull(this.sku_num, otherOrderWide.sku_num);
        this.split_activity_amount = ObjectUtils.firstNonNull(this.split_activity_amount);
        this.split_coupon_amount = ObjectUtils.firstNonNull(this.split_coupon_amount);
        this.split_total_amount = ObjectUtils.firstNonNull(this.split_total_amount);
    }
}

//Bean:OrderInfo
import lombok.Data;
import java.math.BigDecimal;

@Data
public class OrderInfo {
    Long id;
    Long province_id;
    String order_status;
    Long user_id;
    BigDecimal total_amount;
    BigDecimal activity_reduce_amount;
    BigDecimal coupon_reduce_amount;
    BigDecimal original_total_amount;
    BigDecimal feight_fee;
    String expire_time;
    String create_time;
    String operate_time;
    String create_date; // 把其他字段處理得到
    String create_hour;
    Long create_ts;
}

//Bean:OrderDetail
import lombok.Data;
import java.math.BigDecimal;

@Data
public class OrderDetail {
    Long id;
    Long order_id;
    Long sku_id;
    BigDecimal order_price;
    Long sku_num;
    String sku_name;
    String create_time;
    BigDecimal split_total_amount;
    BigDecimal split_activity_amount;
    BigDecimal split_coupon_amount;
    Long create_ts;
}