天天看點

知乎 AI 使用者模型服務性能優化實踐

作者:閃念基因

使用者模型簡介

知乎 AI 使用者模型服務于知乎兩億多使用者,主要為首頁、推薦、廣告、知識服務、想法、關注頁等業務場景提供資料和服務,例如首頁個性化 Feed 的召回和排序、相關回答等用到的使用者長期興趣特征,問題路由、回答排序中用到的 TPR「作者創作權威度」,廣告定向投放用到的基礎屬性等。

主要功能

提供的資料和功能主要有:

  • 使用者興趣:長期興趣、實時興趣、分類興趣、話題興趣、keyword 興趣、作者創作權威度等,
  • 使用者 Embedding 表示:最近鄰使用者、人群劃分、特定使用者圈定等,
  • 使用者社交屬性:使用者親密度、二度好友、共同好友、相似優秀回答者等,
  • 使用者實時屬性: LastN 行為、LastLogin 等,
  • 使用者基礎屬性:使用者性别預測、年齡段計算、職業預估等。

服務架構

整體主要分為 Streaming / 離線計算、線上服務和 HBase 多叢集同步三部分組成,下面将依次進行介紹。

知乎 AI 使用者模型服務性能優化實踐

使用者模型服務架構圖

Streaming / 離線計算

Streaming 計算主要涉及功能 LastRead、LastSearch、LastDisplay,實時話題/ Keyword 興趣、最後登入時間、最後活躍的省市等。

知乎 AI 使用者模型服務性能優化實踐

使用者模型實時興趣計算邏輯圖

實時興趣的計算流程

  1. 相應日志擷取。從 CardshowLog、PageshowLog、QueryLog 中抽取<使用者,contentToken,actionType >等内容。
  2. 映射到對應的内容次元。對于問題、回答、文章、搜尋分别擷取對應的 Topic 和 Keyword,搜尋内容對應的 Topic。在 Redis 中用 contentToken 置換 contentId 後,請求 ContentProfile 擷取其對應話題和關鍵詞;對于 Query,調用 TopicMatch 服務,傳遞搜尋内容給服務,服務傳回其對應的 Topic;調用 Znlp 的 KeywordExtractorJar 包,傳遞搜尋内容并獲得其對應的 Keyword 。
  3. 使用者-内容次元彙總。根據使用者的行為,在<使用者,topic,actionType>和<使用者,keyword,actionType>層面進行 groupBy 聚合彙總後,并以 hashmap 的格式存儲到 Redis,作為計算使用者實時興趣的基礎資料,按時間衰減系數 timeDecay 進行新舊興趣的 merge 後存儲。
  4. 計算興趣。在使用者的曆史基礎資料上,按一定的 decay 速度進行衰減,按威爾遜置信區間計算使用者興趣 score,并以 Sortedset 的格式存儲到 Redis。

關于興趣計算,已經優化的地方主要是:如何快速的計算平滑參數 alpha 和 beta,如何 daily_update 平滑參數,以及用卡方計算置信度時,是否加入平滑參數等都會對最終的興趣分值有很大的影響,當 display 為 1 曝光數量不足的情況下,興趣 score 和 confidence 計算出現 的 bias 問題等。

線上服務

随之知乎日益增加的使用者量,以及不斷豐富的業務場景和與之相對應出現的調用量上升等,對線上服務的穩定性和請求時延要求也越來越高。 舊服務本身也存在一些問題,比如:

  1. 線上服務直連 HBase,當資料熱點的時候,造成某些 Region Server 的負載很高,P95 上升,輕者造成服務抖動,監控圖偶發有「毛刺」現象,重者造成服務幾分鐘的不可用,需要平台技術人員将 Region 從負載較高的 RegionServer 上移走。
  2. 離線任務每次計算完成後一次大批量同時寫入離線和線上叢集,會加重 HBase 線上叢集Region Server 的負載,增大 HBase get 請求的時延,進而影響線上服務穩定性和 P95。

針對問題一,我們在原來的服務架構中增加緩存機制,以此來增強服務的穩定型、減小 Region Server 的負載。

針對問題二,修改了離線計算和多叢集資料同步的方式,詳見「HBase多叢集存儲機制」部分。

Cache機制具體實作

沒有 Cache 機制時,所有的 get 和 batchGet 方法直接請求到 HBase,具體如下圖:

知乎 AI 使用者模型服務性能優化實踐

使用者模型服務請求序列圖

  1. UserProfileServiceApp 啟動服務,将收到的請求交由 UserProfileServiceImpl 具體處理
  2. UserProfileServiceImp 根據請求參數,調用 GetTranslator 将 UserProfileRequest.GetRequest 轉化成 HBase 中的 Get Object(在 Map 中維護每個 requestField 對應 HBase 中的 tablename,cf,column,prefix 等資訊),以格式Map[String, util.List[(AvailField, Get)]]傳回。
  3. UserProfileServiceImp 用 Future 異步向 HBase 發送 get 請求,擷取到結果傳回。

增加 Cache 機制的具體方法,在上面的第二步中,增加一個 CacheMap,用來維護 get 中 AvailField 對應 Cache 中的 key,key 的組成格式為:「 tablename 縮寫| columnfamily 縮寫| columnname 縮寫| rowkey 全寫」。這裡使用的 Redis 資料結構主要有兩種,SortedSet 和 Key-Value對。服務端收到請求後先去轉化 requestField 為 Cache 中的 key,從 Cache 中擷取資料。對于沒有擷取到 requestField 的轉化成 GetObject,請求 HBase 擷取,将結果儲存到 Cache 中并傳回。

最終效果

使用者模型的通路量大概為 100K QPS,每個請求轉化為多個 get 請求。 增加 Cache 前 get 請求的 P95 為30ms,增加 Cache 後降低到小于 15ms,Cache 命中率 90% 以上。

HBase 多叢集存儲機制

離線任務和 Streaming 計算主要采用 Spark 計算實作, 結果儲存到 HBase 的有每次寫一條、批量寫入和BulkLoad 方式方式,前兩種比較簡單有相關的API直接調用就可以。

對于BulkLoad 方式,建立 HFiles,調用 LoadIncrementalHFiles 作業将它們移到 HBase 表中。首先需要根據表名 getRegionLocator 得到 RegionLocator,根據 RegionLocator 得到 partition,因為在 HFile 中是有序的是以,需要調用 rdd.repartitionAndSortWithinPartitions(partitioner) 将 rdd 重新排序。 HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator) 進行任務增量Load 到具體表的配置 實作并執行映射( 并減少) 作業,使用 HFileOutputFormat2 輸出格式将有序的放置或者 KeyValue 對象寫入 HFile 檔案。 Reduce 階段通過調用 HFileOutputFormat2.configureIncrementalLoad 配置在場景後面。執行LoadIncrementalHFiles 作業将 HFile 檔案移動到系統檔案。

static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator,
  Class<? extends OutputFormat<?, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration();

job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(cls);

// Based on the configured map output class, set the correct reducer to properly
// sort the incoming values.
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
  job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
  job.setReducerClass(PutSortReducer.class);
} else if (Text.class.equals(job.getMapOutputValueClass())) {
  job.setReducerClass(TextSortReducer.class);
} else {
  LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}

conf.setStrings("io.serializations", conf.get("io.serializations"),
    MutationSerialization.class.getName(), ResultSerialization.class.getName(),
    KeyValueSerialization.class.getName());

configurePartitioner(job, startKeys);
// Set compression algorithms based on column families
configureCompression(table, conf);
configureBloomType(table, conf);
configureBlockSize(table, conf);
configureDataBlockEncoding(table, conf);

TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + table.getName() + " output configured.");
}

public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
  throws IOException {
configureIncrementalLoad(job, table, regionLocator, HFileOutputFormat2.class);
}
val hFileLoader = new LoadIncrementalHFiles(conf)
hFileLoader.doBulkLoad(hFilePath, new HTable(conf, table.getName))           

将 HFile 檔案 Bulk Load 到已存在的表中。 由于 HBase 的 BulkLoad 方式是繞過了 Write to WAL,Write to MemStore 及 Flush to disk 的過程,是以并不能通過 WAL 來進行一些複制資料的操作。 由于 Bulkload 方式還是對叢集 RegionServer 造成很高的負載,最終采用方案三,下面是兩個叢集進行資料同步。

存儲同步機制

技術選型 HBase 常見的 Replication 方法有 SnapShot、CopyTable/Export、BulkLoad、Replication、應用層并發讀寫等。 應用層并發讀寫 優點:應用層可以自由靈活控制對 HBase寫入速度,打開或關閉兩個叢集間的同步,打開或關閉兩個叢集間具體到表或者具體到列簇的同步,對 HBase 叢集性能的影響最小,缺點是增加了應用層的維護成本。 初期沒有更好的叢集資料同步方式的時候,使用者模型和内容模型自己負責兩叢集間的資料同步工作。

知乎 AI 使用者模型服務性能優化實踐

使用者模型存儲多機房同步架構圖

具體實作細節

第一步:定義用于在 Kafka 的 Producer 和 Consumer 中流轉的統一資料 Protobuf 格式

message ColumnValue {
required bytes qualifier = 1;
......
}
message PutMessage {
required string tablename = 1;
......
}           

第二步:發送需要同步的資料到 Kafka,(如果有必要,需要對資料做相應的格式處理),這裡對資料的處理,有兩種方式。 第一種:如果程式中有統一的存儲到 HBase 的工具(另一個項目是使用自定義的 HBaseHandler,業務層面隻生成 tableName,rowKey,columnFamily,column 等值,由 HBaseHandler 統一建構成 Put 對象,并儲存 HBase 中),這種方式在業務層面改動較小,理論上可以直接用原來的格式發給 Kafka,但是如果 HBaseHandler 處理的格式和 PutMessage 格式有不符的地方,做下适配即可。

/**
* tableName: hbase table name
* rdd: RDD[(rowkey, family, column, value)]
*/
def convert(tableName: String, rdd: RDD): RDD = {
rdd.map {
  case (rowKey: String, family: String, column: String, value: Array[Byte]) =>
    val message = KafkaMessages.newBuilder()
    val columnValue = ColumnValue.newBuilder()
    columnValue.set
     ......
    (rowKey, message.build().toByteArray)
 }
}           

第二種:程式在 RDD 中直接建構 HBase 的 Put 對象,調用 PairRDD 的 saveAsNewAPIHadoopDataset 方法儲存到 HBase 中。此種情況,為了相容已有的代碼,做到代碼和業務邏輯的改動最小,發送到 Kafka 時,需要将 Put 對象轉換為上面定義的 PutMessage Protobuf 格式,然後發送給 Kafka。

/**
* tableName: hbase table namne
* rdd: RDD[(rowKey, put)]
*/
def convert(tableName: String, familyNames: Array[String], rdd: RDD): RDD = {
rdd.map {
  case (_, put: Put) =>
    val message = PutMessage.newBuilder()
    for(familyName <- familyNames){
      if(put.getFamilyMap().get(Bytes.toBytes(familyName))!=null){
      val keyValueList = put.getFamilyMap()
        .asInstanceOf[java.util.ArrayList[KeyValue]].asScala
        for( keyvalue <- keyValueList){
          message.setRowkey(ByteString.copyFrom(keyvalue.getRow))
        ......
        }
        message.setTablename(tableName)
      }
    }
    (null, message.build().toByteArray)
 }
}           

第三步:發送到 Kafka,不同的表發送到不同的 Topic,對每個 Topic 的消費做監控。

/**
* 發送 rdd 中的内容到 brokers 的指定 topic 中
* tableName: hbase table namne
* rdd: RDD[(rowKey, put)]
*/
def send[T](brokers: String,
               rdd: RDD[(String, T)],
               topic: String)(implicit cTag: ClassTag[T]): Unit = {
  rdd.foreachPartition(partitionOfRecords => {
      val producer = getProducer[T](brokers)
      partitionOfRecords.map(r => new ProducerRecord[String, T](topic, r._1, r._2))
        .foreach(m => producer.send(m))
      producer.close()
  })
}           

第四步:另啟動 Streaming Consumer 或者服務消費 Kafka 中内容,将 putMessage 的 Protobuf 格式轉成 HBase 的 put 對象,同時寫入到線上 HBase 叢集中。 Streaming 消費Kafka ,不同的表發送到不同的 Topic,對每個 Topic 的消費做監控。

val toHBaseTagsTopic = validKafkaStreamTagsTopic.map {
      record =>
        val tableName_r = record.getTablename()
        val put = new Put(record.getRowkey.toByteArray)
        for (cv <- record.getColumnsList) {
          put.addColumn(record.getFamily.toByteArray)
          ......
        }
        if(put.isEmpty){
          (new ImmutableBytesWritable(), null)
        }else{
          (new ImmutableBytesWritable(), put)
        }
    }.filter(_._2!=null)
    if(!isClean) {
      toHbaseTagsTopic.foreachRDD { rdd =>
        rdd.saveAsNewAPIHadoopDataset(
          AccessUtils.createOutputTableConfiguration(
            constants.Constants.NAMESPACE + ":" + constants.Constants.TAGS_TOPIC_TABLE_NAME
          )
        )
      }
   }           

如下為另一種啟動服務消費 Kafka 的方式。

val consumer = new KafkaConsumer[String, Array[Byte]](probs)
consumer.subscribe(topics)
val records = consumer.poll(100)
for (p <- records.partitions) {
   val recordsOfPartition = records.records(p)
   recordsOfPartition.foreach { r =>
      Try(KafkaMessages.parseFrom(r.value())) match {
         case Success(record) =>
            val tableName = record.getTableName 
            if (validateTables.contains(tableName)) {
               val messageType = record.getType
               ......
               try {
                  val columns = record.getColumnsList.map(c => (c.getColumn, c.getValue.toByteArray)).toArray
                   HBaseHandler.write(tableName)
                ......
               } catch {
                  case ex: Throwable =>
                    LOG.error("write hbase fail")
                    HaloClient.increment(s"content_write_hbase_fail")
               }
            } else {
              LOG.error(s"table $tableName is valid")
            }
         }
      }
      //update offset
      val lastOffset = recordsOfPartition.get(recordsOfPartition.size - 1).offset()
      consumer.commitSync(java.util.Collections.singletonMap(p, new OffsetAndMetadata(lastOffset + 1)))
}           

結語

最後,目前采用的由應用控制和管理線上離線叢集的同步機制,在随着平台多機房項目的推動下,平台将推出 HBase 的統一同步機制 HRP (HBase Replication Proxy),屆時業務部門可以将更多的時間和精力集中在模型優化層面。

Reference

[1] HBase Cluster Replication

[2] 通過 BulkLoad 快速将海量資料導入到 HBase

[3] HBase Replication 源碼分析

[4] HBase 源碼之 TableRecordWriter

[5] HBase 源碼之 TableOutputFormat

[6] Spark2.1.1寫入 HBase 的三種方法性能對比

作者:王政英

出處:https://zhuanlan.zhihu.com/p/45907950

繼續閱讀