天天看點

億級消息系統的核心存儲:Tablestore釋出Timeline 2.0模型背景2.0時代到來入手指南專家服務

背景

網際網路快速發展的今天,社交類應用、消息類功能大行其道,占據了大量網絡流量。大至釘釘、微信、微網誌、知乎,小至各類App的推送通知,消息類功能幾乎成為所有應用的标配。根據場景特點,我們可以将消息類場景歸納成三大類:IM(釘釘、微信)、Feed流(微網誌、知乎)以及正常消息隊列。是以,如何開發一個簡便而又高效IM或Feed流功能,成為了很多架構師、開發人員不得不面對的問題。

Timeline 1.0版模型

針對消息類場景,表格存儲團隊針對JAVA語言打造了一個TableStore-Timeline 1.0版資料模型模型(簡稱Timeline模型)。基于場景經驗與了解,将消息場景封裝成一個資料模型,提供了表結構設計,讀寫方式等解決方案供需求者使用。使用者隻需依托模型API,直接忽略Timeline到底層存儲系統之間的架構方案,直接基于接口實作業務邏輯。它能滿足消息資料場景對消息保序、海量消息存儲、實時同步等特殊需求。Timeline 1.0是定義在表格存儲之上抽象出來的資料模型,具體内容參見《

TableStore Timeline:輕松建構千萬級IM和Feed流系統

》。

全文檢索、模糊查詢需求

在表格存儲的Timeline模型受到廣泛使用的過程中,我們也逐漸發現消息類資料的全文檢索、模糊查詢這一很強需求。而原有模型的線上查詢能力存在一定短闆。随着表格存儲支援了SearchIndex能力,使得Timeline模型支援線上全文檢索、模糊查詢成為了可能。是以我們基于原有的架構設計,重新打造了Timeline 2.0模型,引入了強大的查詢能力與資料管理新方案。

項目代碼目前已經開源在了GitHub上:

Timeline@GitHub

2.0時代到來

此次推出的Timeline模型2.0版,沒有直接基于1.X版本直接改造。而是在相容原有模型架構之上,定義、封裝了新的使用接口。重新打造更新新的模型,增加了如下功能:

  • 增加了Timeline Meta的管理能力;
  • 基于多元索引功能,增加了Meta、Timeline的全文檢索、多元組合查詢能力;
  • 支援SequenceId兩種設定方式:自增列、手動設定;
  • 支援多列的Timeline Identifier設定,提供Timeline的分組管理能力;
  • 相容Timeline 1.X模型,提供的TimelineMessageForV1樣例可直接讀、寫1.X版本消息,使用者也可仿照實作。

架構解析

億級消息系統的核心存儲:Tablestore釋出Timeline 2.0模型背景2.0時代到來入手指南專家服務

Timeline做為表格存儲直接支援的一種資料模型,以『簡單』為設計目标,其存核心子產品構成比較清晰明了。Timeline盡量提升使用者的使用自由度,讓使用者能夠根據自身場景需求選擇更為合适的實作方案。模型的子產品架構如上圖,主要包括如下重要部分:

  • Store:存儲庫,類似資料庫的表的概念,模型包含兩類Store分别為Meta Store、Timeline Store。
  • Identifier:用于區分Timeline的唯一辨別,用以辨別單行Meta,以及相應的Timeline Queue。
  • Meta:用于描述Timeline的中繼資料,中繼資料描述采用free-schema結構,可自由包含任意列。
  • Queue:Queue為單Identifier對應的所有消息隊列,一個Timeline下Message按照Queue單元存儲。
  • SequenceId:Queue中消息體的序列号,需保證遞增、唯一,模型支援自增列、自定義兩種實作模式。
  • Message:Timeline内傳遞的消息體,是一個free-schema的結構,可自由包含任意列。
  • Index:基于SearchIndex實作的索引,針對不同Store分為Meta Index和Message Index兩類,可對Meta或Message内的任意列自定義索引,提供靈活的多條件組合查詢和搜尋。

性能優勢

Timeline 模型是基于Tablestore抽象、封裝出的一類場景資料模型,因而具有Tablestore自身的所有優點。同時結合場景設計的接口,讓使用者更直覺、清晰的實作業務邏輯,總結如下:

  • 支撐海量資料存儲:分布式架構,高可擴充,支援10PB級的消息。
  • 低存儲成本:表格存儲提供低成本的存儲方式,按量付費、資源包、預留Cu。
  • 資料生命周期管理:不同類型(表級别)消息資料,可自定義不同生命周期。
  • 極高的寫入吞吐:具備極高的寫入吞吐能力,可應對00M TPS的消息寫入。
  • 低延遲的讀:查詢消息延遲低,毫秒量級。
  • 接口設計:可讀性高,接口功能全面、清晰。

Maven位址

Timeline Lib

<dependency>
    <groupId>com.aliyun.openservices.tablestore</groupId>
    <artifactId>Timeline</artifactId>
    <version>2.0.0</version>
</dependency>           

TableStore Java SDK

Timeline模型在TableStore Java SDK >= 4.12.1作為基本資料模型直接提供,表格存儲老使用者可更新SDK直接使用

<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>tablestore</artifactId>
  <version>4.12.1</version>
</dependency>           

入手指南

初始化

初始化Factory

使用者将SyncClient作為參數,初始化StoreFactory,通過工廠建立Meta資料、Timeline資料的管理Store。錯誤重試的實作依賴SyncClient的重試政策,使用者通過設定SyncClient實作重試。如有特殊需求,可自定義政策(隻需實作RetryStrategy接口)。

/**
 * 重試政策設定
 * Code: configuration.setRetryStrategy(new DefaultRetryStrategy());
 * */
ClientConfiguration configuration = new ClientConfiguration();

SyncClient client = new SyncClient(
        "http://instanceName.cn-shanghai.ots.aliyuncs.com",
        "accessKeyId",
        "accessKeySecret",
        "instanceName", configuration);

TimelineStoreFactory factory = new TimelineStoreFactoryImpl(client);           

初始化MetaStore

建構meta表的Schema(包含Identifier、MetaIndex等參數),通過Store工廠建立并擷取Meta的管理Store;配置參數包含:Meta表名、索引,表名、主鍵字段、索引名、索引類型等參數。

TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
        .addStringField("timeline_id").build();

IndexSchema metaIndex = new IndexSchema();
metaIndex.addFieldSchema( //配置索引字段、類型
        new FieldSchema("group_name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord)
        new FieldSchema("create_time", FieldType.Long).setIndex(true)
);

TimelineMetaSchema metaSchema = new TimelineMetaSchema("groupMeta", idSchema)
        .withIndex("metaIndex", metaIndex); //設定索引

TimelineMetaStore timelineMetaStore = serviceFactory.createMetaStore(metaSchema);           

初始化TimelineStore

建構timeline表的Schema配置,包含Identifier、TimelineIndex等參數,通過Store工廠建立并擷取Timeline的管理Store;配置參數包含:Timeline表名、索引,表名、主鍵字段、索引名、索引類型等參數。

消息的批量寫入,基于Tablestore的DefaultTableStoreWriter提升并發,使用者可以根據自己需求設定線程池數目。

TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
        .addStringField("timeline_id").build();

IndexSchema timelineIndex = new IndexSchema();
timelineIndex.setFieldSchemas(Arrays.asList(//配置索引的字段、類型
        new FieldSchema("text", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
        new FieldSchema("receivers", FieldType.KEYWORD).setIndex(true).setIsArray(true)
));

TimelineSchema timelineSchema = new TimelineSchema("timeline", idSchema)
        .autoGenerateSeqId() //SequenceId 設定為自增列方式
        .setCallbackExecuteThreads(5) //設定Writer初始線程數為5
        .withIndex("metaIndex", timelineIndex); //設定索引

TimelineStore timelineStore = serviceFactory.createTimelineStore(timelineSchema);           

Meta管理

Meta管理提供了增、删、改、單行讀、多條件組合查詢等接口。其中多條件組合查詢功能基于多元索引,隻有設定了IndexSchema的MetaStore才支援組合查詢功能。索引類型支援LONG、DOUBLE、BOOLEAN、KEYWORD、GEO_POINT等類型,屬性包含Index、Store和Array,其含義與多元索引相同。

TimelineIdentifer是區分Timeline的唯一辨別,重複的Identifier會被覆寫。

/**
 * 接口使用參數
 * */
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
        .addField("timeline_id", "group")
        .build();
TimelineMeta meta = new TimelineMeta(identifier)
        .setField("filedName", "fieldValue");

/**
 * 建立Meta表(如果設定索引則會建立索引)
 * */
timelineMetaStore.prepareTables();

/**
 * 插入Meta資料
 * */
timelineMetaStore.insert(meta);

/**
 * 根據id讀取單行Meta資料
 * */
timelineMetaStore.read(identifier);

/**
 * 更新Meta資料
 * */
meta.setField("fieldName", "newValue");
timelineMetaStore.update(meta);

/**
 * 根據id删除單行Meta資料
 * */
timelineMetaStore.delete(identifier);

/**
 * 通過SearchParameter參數檢索
 * */
SearchParameter parameter = new SearchParameter(
        field("fieldName").equals("fieldValue")
);
timelineMetaStore.search(parameter);

/**
 * 通過SearchQuery參數檢索(SearchQuery是SDK原生類型,支援所有多元索引檢索條件)
 * */
TermQuery query = new TermQuery();
query.setFieldName("fieldName");
query.setTerm(ColumnValue.fromString("fieldValue"));

SearchQuery searchQuery = new SearchQuery().setQuery(query);
timelineMetaStore.search(searchQuery);

/**
 * 删除Meta表(如果存在索引,同時删除索引)
 * */
timelineMetaStore.dropAllTables();           

Timeline管理

Timeline管理提供了消息模糊查詢、多條件組合查詢接口。消息的全文檢索依托多元索引,使用者隻需将相應字段索引類型設定為TEXT,即可通過Search接口實作消息的全文檢索。Timeline管理包含消息表的建立、檢索、删除等。

/**
 * 接口使用參數
 * */
SearchParameter searchParameter = new SearchParameter(
        field("text").equals("fieldValue")
);

TermQuery query = new TermQuery();
query.setFieldName("text");
query.setTerm(ColumnValue.fromString("fieldValue"));
SearchQuery searchQuery = new SearchQuery().setQuery(query).setLimit(10);

/**
 * 建立Meta表(如果設定索引則會建立索引)
 * */
timelineStore.prepareTables();


/**
 * 通過SearchParameter參數檢索
 * */
timelineStore.search(searchParameter);

/**
 * 通過SearchQuery參數檢索(SearchQuery是SDK原生類型,支援所有多元索引檢索條件)
 * */

timelineStore.search(searchQuery);

/**
 * 将Writer隊列中未發的請求主動觸發,同步等待直到所有消息完成存儲
 * */
timelineStore.flush();

/**
 * 關閉Writer與Writer中的線程池
 * */
timelineStore.close();

/**
 * 删除Timeline表(如果存在索引,同時删除索引)
 * */
timelineStore.dropAllTables();           

Queue管理

Queue是單個消息隊列的抽象概念,對應一個Store下單個Identifier的所有消息。通過Queue執行個體管理相應Identifer的消息隊列,支援基本的增、删、改、單行查、範圍查等接口。

/**
 * 接口使用參數
 * */
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
        .addField("timeline_id", "group")
        .build();
long sequenceId = 1557133858994L;
TimelineMessage message = new TimelineMessage().setField("text", "Timeline is fine.");
ScanParameter scanParameter = new ScanParameter().scanBackward(Long.MAX_VALUE, 0);
TimelineCallback callback = new TimelineCallback() {
    @Override
    public void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) {
        // do something when succeed.
    }

    @Override
    public void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) {
        // do something when failed.
    }
};

/**
 * 單個Identifier對應的消息隊列
 * */
timelineQueue = timelineStore.createTimelineQueue(identifier);

/**
 * 存儲消息
 * */
//同步
timelineQueue.store(message);
timelineQueue.store(sequenceId, message);
//異步,支援callback
timelineQueue.storeAsync(message, callback);
timelineQueue.storeAsync(sequenceId, message, callback);
//異步批量
timelineQueue.batchStore(message);
timelineQueue.batchStore(sequenceId, message);
//異步批量,支援callback
timelineQueue.batchStore(message, callback);
timelineQueue.batchStore(sequenceId, message, callback);

/**
 * 單行讀取、擷取最新一行、擷取最新SequenceId
 * */
timelineQueue.get(sequenceId);
timelineQueue.getLatestTimelineEntry();
timelineQueue.getLatestSequenceId();

/**
 * 根據SequenceId更新消息
 * */
message.setField("text", "newValue");
timelineQueue.update(sequenceId, message);
timelineQueue.updateAsync(sequenceId, message, callback);

/**
 * 根據SequenceId删除消息
 * */
timelineQueue.delete(sequenceId);

/**
 * 根據範圍參數、Filter擷取批量消息
 * */
timelineQueue.scan(scanParameter);           

專家服務

表格存儲有一批精通Timeline領域的技術專家,在打造IM、Feed流場景方面有着獨到的見解。如果您:

  • 渴望尋覓Timeline領域高手過招;
  • 調研Timeline場景解決方案;
  • 準備入門Timeline場景;
  • 對表格存儲(Tablestore)産品感興趣;

歡迎加入“表格存儲公開交流群”。表格存儲 (Tablestore) 提供專業的免費的技術咨詢服務,期待為您服務。群号 : 23307953

億級消息系統的核心存儲:Tablestore釋出Timeline 2.0模型背景2.0時代到來入手指南專家服務