目錄
一、基本概念
1.流合并條件
2.Flink 中支援 雙流join 的算子
二、IntervalJoin介紹
1.IntervalJoin說明
2.文法格式
3. 原碼解析
三、IntervalJoin開發實踐
1.訂單主表和明細表合成訂單寬表
一、基本概念
1.流合并條件
Flink 中的兩個流要實作 Join 操作,必須滿足以下兩點:
- 流需要能夠等待,即:兩個流必須在同一個視窗中;
- 雙流等值 Join,即:兩個流中,必須有一個字段相等才能夠 Join 上。
2.Flink 中支援 雙流join 的算子
Flink 中支援雙流 Join 的算子目前已知有5種,如下:
-
:union 支援雙流 Join,也支援多流 Join。多個流類型必須一緻;union
-
:connector 支援雙流 Join,兩個流的類型可以不一緻;connector
-
:該方法隻支援 inner join,即:相同視窗下,兩個流中,Key都存在且相同時才會關聯成功;join
-
:同樣能夠實作雙流 Join。即:将同一 Window 視窗内的兩個DataStream 聯合起來,兩個流按照 Key 來進行關聯,并通過 apply()方法 new CoGroupFunction() 的形式,重寫 join() 方法進行邏輯處理。coGroup
-
:Interval Join 沒有 Window 視窗的概念,直接用時間戳作為關聯的條件,更具表達力。intervalJoin
join() 和 coGroup() 都是 Flink 中用于連接配接多個流的算子,但是兩者也有一定的差別,推薦能使用 coGroup 不要使用Join,因為coGroup更強大(
**inner join 除外。就 inner join 的話推薦使用 join ,因為在 join 的政策上做了優化,更高效**
)
二、IntervalJoin介紹
1.IntervalJoin說明
Flink中基于DataStream的join,隻能實作在同一個視窗的兩個資料流進行join,但是在實際中常常會存在資料亂序或者延時的情況,導緻兩個流的資料進度不一緻,就會出現資料跨視窗的情況,那麼資料就無法在同一個視窗内join。
Flink基于KeyedStream提供的interval join機制,intervaljoin 連接配接兩個keyedStream, 按照相同的key在一個相對資料時間的時間段内進行連接配接。
2.文法格式
LeftDStream.keyBy(訂單主表::Id)
.intervalJoin(RightDStream.keyBy(訂單明細表::Order_id))
.between(Time.seconds(-5), Time.seconds(5))
.process();
- 分别對LeftDStream和RightDStream通過訂單id進行keyBy操作,得到兩個KeyedStream,再進行intervalJoin操作。
- between方法傳遞的兩個參數lowerBound和upperBound,用來控制右邊的流可以與哪個時間範圍内的左邊的流進行關聯,即:
leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound
相當于左邊的流可以晚到lowerBound(lowerBound為負的話)時間,也可以早到upperBound(upperBound為正的話)時間。
- 使用Interval Join時,必須要指定的時間類型為EventTime。
- 兩個KeyedStream在進行intervalJoin并調用between方法後,跟着使用process方法
- process方法傳遞一個自定義的 ProcessJoinFunction 作為參數,ProcessJoinFunction的三個參數就是左邊流的元素類型,右邊流的元素類型,輸出流的元素類型。
- intervalJoin,底層是将兩個KeyedStream進行connect操作,得到ConnectedStreams,這樣的兩個資料流之間就可以實作狀态共享,對于intervalJoin來說就是兩個流相同key的資料可以互相通路。
概念圖:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL5I2NhRWMmJGNlZjY2EWZ5IzMhRjYyEGO5IDM0czNjJ2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
3. 原碼解析
- intervaljoin首先會将兩個KeyedStream 進行connect操作得到一個ConnectedStreams, ConnectedStreams表示的是連接配接兩個資料流,并且這兩個資料流之前可以實作狀态共享, 對于intervaljoin 來說就是兩個流相同key的資料可以互相通路
-
在ConnectedStreams之上進行IntervalJoinOperator算子操作,該算子是intervaljoin 的核心,接下來分析一下其實作:
a. 定義了兩個MapState<Long, List<BufferEntry<T1>>>類型的狀态對象,分别用來存儲兩個流的資料,其中Long對應資料的時間戳,List<BufferEntry<T1>>對應相同時間戳的資料
b. 包含processElement1、processElement2兩個方法,這兩個方法都會調用processElement方法,真正資料處理的地方
private <THIS, OTHER> void processElement(
final StreamRecord<THIS> record,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
final long relativeLowerBound,
final long relativeUpperBound,
final boolean isLeft) throws Exception {
final THIS ourValue = record.getValue();
final long ourTimestamp = record.getTimestamp();
if (ourTimestamp == Long.MIN_VALUE) {
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
"interval stream joins need to have timestamps meaningful timestamps.");
}
if (isLate(ourTimestamp)) {
return;
}
addToBuffer(ourBuffer, ourValue, ourTimestamp);
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
final long timestamp = bucket.getKey();
if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound) {
continue;
}
for (BufferEntry<OTHER> entry: bucket.getValue()) {
if (isLeft) {
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}
- 判斷延時,資料時間小于目前的watermark值認為資料延時,則不處理
- 将資料添加到對應的MapState<Long, List<BufferEntry<T1>>>緩存狀态中,key為資料的時間
- 循環周遊另外一個狀态,如果滿足ourTimestamp + relativeLowerBound <=timestamp<= ourTimestamp + relativeUpperBound , 則将資料輸出給ProcessJoinFunction調用,ourTimestamp表示流入的資料時間,timestamp表示對應join的資料時間
- 注冊一個資料清理時間方法,會調用onEventTime方法清理對應狀态資料。對于例子中orderStream比addressStream早到1到5秒,那麼orderStream的資料清理時間就是5秒之後,也就是orderStream.time+5,當watermark大于該時間就需要清理,對于addressStream是晚來的資料不需要等待,當watermark大于資料時間就可以清理掉。
整個處理邏輯都是基于資料時間的,也就是intervaljoin 必須基于EventTime語義,在between 中有做TimeCharacteristic是否為EventTime校驗, 如果不是則抛出異常。
三、IntervalJoin開發實踐
1.訂單主表和明細表合成訂單寬表
(1)需求:
- 實作訂單基本資訊表和訂單明細表合成訂單寬表
- 實作類似以下SQL功能
select a.*,b.* from OrderInfo a
left join OrderDetail b on a.id = b.order_id
(2)代碼實作(主要代碼):
//将orderInf資料流轉換為JavaBean并提取時間戳生成WaterMark
SingleOutputStreamOperator<OrderInfo> orderInfoWithWMDS = orderInfoStrDS.map(line -> {
OrderInfo orderInfo = JSON.parseObject(line, OrderInfo.class);
//yyyy-MM-dd HH:mm:ss
String create_time = orderInfo.getCreate_time();
String[] dateHourArr = create_time.split(" ");
orderInfo.setCreate_date(dateHourArr[0]);
orderInfo.setCreate_hour(dateHourArr[1].split(":")[0]);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long ts = sdf.parse(create_time).getTime();
orderInfo.setCreate_ts(ts);
return orderInfo;
}).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
@Override
public long extractTimestamp(OrderInfo element, long recordTimestamp) {
return element.getCreate_ts();
}
}));
//将orderDetail資料流轉換為JavaBean并提取時間戳生成WaterMark
SingleOutputStreamOperator<OrderDetail> orderDetailWithWMDS = orderDetailStrDS.map(line -> {
OrderDetail orderDetail = JSON.parseObject(line, OrderDetail.class);
String create_time = orderDetail.getCreate_time();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long ts = sdf.parse(create_time).getTime();
orderDetail.setCreate_ts(ts);
return orderDetail;
}).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
@Override
public long extractTimestamp(OrderDetail element, long recordTimestamp) {
return element.getCreate_ts();
}
}));
//将兩個流進行JOIN
SingleOutputStreamOperator<OrderWide> orderWideDS = orderInfoWithWMDS.keyBy(OrderInfo::getId)
.intervalJoin(orderDetailWithWMDS.keyBy(OrderDetail::getOrder_id))
.between(Time.seconds(-5), Time.seconds(5))//生産環境,為了不丢資料,設定時間為最大網絡延遲,這裡設定了正負5秒,以防止在業務系統中主表與從表儲存的時間差
.process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
@Override
public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector<OrderWide> out) throws Exception {
out.collect(new OrderWide(orderInfo, orderDetail));
}
});
//Bean:OrderWide
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.commons.lang3.ObjectUtils;
import java.math.BigDecimal;
@Data
@AllArgsConstructor
public class OrderWide {
Long detail_id;
Long order_id;
Long sku_id;
BigDecimal order_price;
Long sku_num;
String sku_name;
Long province_id;
String order_status;
Long user_id;
BigDecimal total_amount;
BigDecimal activity_reduce_amount;
BigDecimal coupon_reduce_amount;
BigDecimal original_total_amount;
BigDecimal feight_fee;
BigDecimal split_feight_fee;
BigDecimal split_activity_amount;
BigDecimal split_coupon_amount;
BigDecimal split_total_amount;
String expire_time;
String create_time; //yyyy-MM-dd HH:mm:ss
String operate_time;
String create_date; // 把其他字段處理得到
String create_hour;
String province_name;//查詢維表得到
String province_area_code;
String province_iso_code;
String province_3166_2_code;
Integer user_age;
String user_gender;
Long spu_id; //作為次元資料 要關聯進來
Long tm_id;
Long category3_id;
String spu_name;
String tm_name;
String category3_name;
public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail) {
mergeOrderInfo(orderInfo);
mergeOrderDetail(orderDetail);
}
public void mergeOrderInfo(OrderInfo orderInfo) {
if (orderInfo != null) {
this.order_id = orderInfo.id;
this.order_status = orderInfo.order_status;
this.create_time = orderInfo.create_time;
this.create_date = orderInfo.create_date;
this.create_hour = orderInfo.create_hour;
this.activity_reduce_amount = orderInfo.activity_reduce_amount;
this.coupon_reduce_amount = orderInfo.coupon_reduce_amount;
this.original_total_amount = orderInfo.original_total_amount;
this.feight_fee = orderInfo.feight_fee;
this.total_amount = orderInfo.total_amount;
this.province_id = orderInfo.province_id;
this.user_id = orderInfo.user_id;
}
}
public void mergeOrderDetail(OrderDetail orderDetail) {
if (orderDetail != null) {
this.detail_id = orderDetail.id;
this.sku_id = orderDetail.sku_id;
this.sku_name = orderDetail.sku_name;
this.order_price = orderDetail.order_price;
this.sku_num = orderDetail.sku_num;
this.split_activity_amount = orderDetail.split_activity_amount;
this.split_coupon_amount = orderDetail.split_coupon_amount;
this.split_total_amount = orderDetail.split_total_amount;
}
}
public void mergeOtherOrderWide(OrderWide otherOrderWide) {
this.order_status = ObjectUtils.firstNonNull(this.order_status, otherOrderWide.order_status);
this.create_time = ObjectUtils.firstNonNull(this.create_time, otherOrderWide.create_time);
this.create_date = ObjectUtils.firstNonNull(this.create_date, otherOrderWide.create_date);
this.coupon_reduce_amount = ObjectUtils.firstNonNull(this.coupon_reduce_amount, otherOrderWide.coupon_reduce_amount);
this.activity_reduce_amount = ObjectUtils.firstNonNull(this.activity_reduce_amount, otherOrderWide.activity_reduce_amount);
this.original_total_amount = ObjectUtils.firstNonNull(this.original_total_amount, otherOrderWide.original_total_amount);
this.feight_fee = ObjectUtils.firstNonNull(this.feight_fee, otherOrderWide.feight_fee);
this.total_amount = ObjectUtils.firstNonNull(this.total_amount, otherOrderWide.total_amount);
this.user_id = ObjectUtils.<Long>firstNonNull(this.user_id, otherOrderWide.user_id);
this.sku_id = ObjectUtils.firstNonNull(this.sku_id, otherOrderWide.sku_id);
this.sku_name = ObjectUtils.firstNonNull(this.sku_name, otherOrderWide.sku_name);
this.order_price = ObjectUtils.firstNonNull(this.order_price, otherOrderWide.order_price);
this.sku_num = ObjectUtils.firstNonNull(this.sku_num, otherOrderWide.sku_num);
this.split_activity_amount = ObjectUtils.firstNonNull(this.split_activity_amount);
this.split_coupon_amount = ObjectUtils.firstNonNull(this.split_coupon_amount);
this.split_total_amount = ObjectUtils.firstNonNull(this.split_total_amount);
}
}
//Bean:OrderInfo
import lombok.Data;
import java.math.BigDecimal;
@Data
public class OrderInfo {
Long id;
Long province_id;
String order_status;
Long user_id;
BigDecimal total_amount;
BigDecimal activity_reduce_amount;
BigDecimal coupon_reduce_amount;
BigDecimal original_total_amount;
BigDecimal feight_fee;
String expire_time;
String create_time;
String operate_time;
String create_date; // 把其他字段處理得到
String create_hour;
Long create_ts;
}
//Bean:OrderDetail
import lombok.Data;
import java.math.BigDecimal;
@Data
public class OrderDetail {
Long id;
Long order_id;
Long sku_id;
BigDecimal order_price;
Long sku_num;
String sku_name;
String create_time;
BigDecimal split_total_amount;
BigDecimal split_activity_amount;
BigDecimal split_coupon_amount;
Long create_ts;
}