天天看點

離線批量資料通道Tunnel的最佳實踐及常見問題基本介紹及應用場景SDK上傳最佳實踐常見問題

基本介紹及應用場景

Tunnel是MaxCompute提供的離線批量資料通道服務,主要提供大批量離線資料上傳和下載下傳,

僅提供每次批量大于等于64MB資料的場景,小批量流式資料場景請使用DataHub實時資料通道以獲得更好的性能和體驗。

SDK上傳最佳實踐

import java.io.IOException;
import java.util.Date;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;

public class UploadSample {
 private static String accessId = "<your access id>";
 private static String accessKey = "<your access Key>";
 private static String odpsUrl = "http://service.odps.aliyun.com/api";

 private static String project = "<your project>";
 private static String table = "<your table name>";
 private static String partition = "<your partition spec>";

 public static void main(String args[]) {
   // 準備工作,僅需做一次
   Account account = new AliyunAccount(accessId, accessKey);
   Odps odps = new Odps(account);
   odps.setEndpoint(odpsUrl);
   odps.setDefaultProject(project);
   TableTunnel tunnel = new TableTunnel(odps);

   try {
     // 确定寫入分區
     PartitionSpec partitionSpec = new PartitionSpec(partition);
     // 在服務端建立一個在本表本分區上有效期24小時的session,24小時内該session可以共計上傳20000個Block資料
     // 建立Session的時耗為秒級,會在服務端使用部分資源、建立臨時目錄等,操作較重,是以強烈建議同一個分區資料盡可能複用Session上傳。
     UploadSession uploadSession = tunnel.createUploadSession(project,
         table, partitionSpec);
     System.out.println("Session Status is : "
         + uploadSession.getStatus().toString());
     TableSchema schema = uploadSession.getSchema();
     // 準備資料後打開Writer開始寫入資料,準備資料後寫入一個Block,每個Block僅能成功上傳一次,不可重複上傳,CloseWriter成功代表該Block上傳完成,失敗可以重新上傳該Block,同一個Session下最多允許20000個BlockId,即0-19999,若超出請CommitSession并且再建立一個新Session使用,以此類推。
     // 單個Block内寫入資料過少會産生大量小檔案 嚴重影響計算性能, 強烈建議每次寫入64MB以上資料(100GB以内資料均可寫入同一Block)
     // 可通過資料的平均大小與記錄數量大緻計算總量即 64MB < 平均記錄大小*記錄數 < 100GB

     // maxBlockID服務端限制為20000,使用者可以根據自己業務需求,每個Session使用一定數量的block例如100個,但是建議每個Session内使用block越多越好,因為建立Session是一個很重的操作
     // 如果建立一個Session後僅僅上傳少量資料,不僅會造成小檔案、空目錄等問題,還會嚴重影響上傳整體性能(建立Session花費秒級,真正上傳可能僅僅用了十幾毫秒)
     int maxBlockID = 20000;
     for (int blockId = 0; blockId < maxBlockID; blockId++) {
       // 準備好至少64MB以上資料,準備完成後方可寫入
       // 例如:讀取若幹檔案或者從資料庫中讀取資料
       try {
         // 在該Block上建立一個Writer,writer建立後任意一段時間内,若某連續2分鐘沒有寫入4KB以上資料,則會逾時斷開連接配接
         // 是以建議在建立writer前在記憶體中準備可以直接進行寫入的資料
         RecordWriter recordWriter = uploadSession.openRecordWriter(blockId);

         // 将讀取到的所有資料轉換為Tunnel Record格式并切入
         int recordNumber = 1000000;
         for (int index = 0; i < recordNumber; i++) {
           // 将第index條原始資料轉化為odps record
           Record record = uploadSession.newRecord();
           for (int i = 0; i < schema.getColumns().size(); i++) {
             Column column = schema.getColumn(i);
             switch (column.getType()) {
               case BIGINT:
                 record.setBigint(i, 1L);
                 break;
               case BOOLEAN:
                 record.setBoolean(i, true);
                 break;
               case DATETIME:
                 record.setDatetime(i, new Date());
                 break;
               case DOUBLE:
                 record.setDouble(i, 0.0);
                 break;
               case STRING:
                 record.setString(i, "sample");
                 break;
               default:
                 throw new RuntimeException("Unknown column type: "
                     + column.getType());
             }
           }
           // Write本條資料至服務端,每寫入4KB資料會進行一次網絡傳輸
           // 若120s沒有網絡傳輸服務端将會關閉連接配接,屆時該Writer将不可用,需要重新寫入
           recordWriter.write(record);
         }
         // close成功即代表該block上傳成功,但是在整個Session Commit前,這些資料是在odps 臨時目錄中不可見的
         recordWriter.close();
       } catch (TunnelException e) {
         // 建議重試一定次數
         e.printStackTrace();
         System.out.println("write failed:" + e.getMessage());
       } catch (IOException e) {
         // 建議重試一定次數
         e.printStackTrace();
         System.out.println("write failed:" + e.getMessage());
       }
     }
     // 送出所有Block,uploadSession.getBlockList()可以自行指定需要送出的Block,Commit成功後資料才會正式寫入Odps分區,Commit失敗建議重試10次
     for (int retry = 0; retry < 10; ++retry) {
       try {
         // 秒級操作,正式送出資料
         uploadSession.commit(uploadSession.getBlockList());
         break;
       } catch (TunnelException e) {
         System.out.println("uploadSession commit failed:" + e.getMessage());
       } catch (IOException e) {
         System.out.println("uploadSession commit failed:" + e.getMessage());
       }
     }
     System.out.println("upload success!");

   } catch (TunnelException e) {
     e.printStackTrace();
   } catch (IOException e) {
     e.printStackTrace();
   }
 }
}           

構造器舉例說明:

PartitionSpec(String spec):通過字元串構造此類對象。

參數:

spec: 分區定義字元串,比如: pt='1',ds='2'。

是以程式中應該這樣配置:private static String partition = "pt='XXX',ds='XXX'";

常見問題

MaxCompute Tunnel是什麼?

Tunnel是MaxCompute的資料通道,使用者可以通過Tunnel向MaxCompute中上傳或者下載下傳資料。目前Tunnel僅支援表(不包括視圖View)資料的上傳下載下傳。

BlockId是否可以重複?

同一個UploadSession裡的blockId不能重複。也就是說,對于同一個UploadSession,用一個blockId打開RecordWriter,寫入一批資料後,調用close,

然後再commit完成後,寫入成功後不可以重新再用該blockId打開另一個RecordWriter寫入資料。 Block預設最多20000個,即0-19999。

Block大小是否存在限制?

一個block大小上限 100GB,強烈建議大于64M的資料,每一個Block對應一個檔案,小于64MB的檔案統稱為小檔案,小檔案過多将會影響使用性能。

使用新版BufferedWriter可以更簡單的進行上傳功能避免小檔案等問題 Tunnel-SDK-BufferedWriter

Session是否可以共享使用,存在生命周期嗎?

每個Session在服務端的生命周期為24小時,建立後24小時内均可使用,也可以跨程序/線程共享使用,但是必須保證同一個BlockId沒有重複使用,分布式上傳可以按照如下步驟:

建立Session->資料量估算->配置設定Block(例如線程1使用0-100,線程2使用100-200)->準備資料->上傳資料->Commit所有寫入成功的Block。

Session建立後不使用是否對系統有消耗?

每個Session在建立時會生成兩個檔案目錄,如果大量建立而不使用,會導緻臨時目錄增多,大量堆積時可能造成系統負擔,請一定避免此類行為,盡量共享利用session。

遇到Write/Read逾時或IOException怎麼處理?

上傳資料時,Writer每寫入8KB資料會觸發一次網絡動作,如果120秒内沒有網絡動作,服務端将主動關閉連接配接,屆時Writer将不可用,請重新打開一個新的Writer寫入。

建議使用 [Tunnel-SDK-BufferedWriter]接口上傳資料,該接口對使用者屏蔽了blockId的細節,并且内部帶有資料緩存區,會自動進行失敗重試。

下載下傳資料時,Reader也有類似機制,若長時間沒有網絡IO會被斷開連接配接,建議Read過程連續進行中間不穿插其他系統的接口。

MaxCompute Tunnel目前有哪些語言的SDK?

MaxCompute Tunnel目前提供Java版的SDK。

MaxCompute Tunnel 是否支援多個用戶端同時上傳同一張表?

支援。

MaxCompute Tunnel适合批量上傳還是流式上傳

MaxCompute Tunnel用于批量上傳,不适合流式上傳,流式上傳可以使用[DataHub高速流式資料通道],毫秒級延時寫入。

MaxCompute Tunnel上傳資料時一定要先存在分區嗎?

是的,Tunnel不會自動建立分區。

Dship 與 MaxCompute Tunnel的關系?

dship是一個工具,通過MaxCompute Tunnel來上傳下載下傳。

Tunnel upload資料的行為是追加還是覆寫?

追加的模式。

Tunnel路由功能是怎麼回事?

路由功能指的是Tunnel SDK通過設定MaxCompute擷取Tunnel Endpoint的功能。是以,SDK可以隻設定MaxCompute的endpoint來正常工作。

用MaxCompute Tunnel上傳資料時,一個block的資料量大小多大比較合适

沒有一個絕對最優的答案,要綜合考慮網絡情況,實時性要求,資料如何使用以及叢集小檔案等因素。一般,如果數量較大是持續上傳的模式,可以在64M - 256M,

如果是每天傳一次的批量模式,可以設大一些到1G左右

使用MaxCompute Tunnel 下載下傳, 總是提示timeout

一般是endpoint錯誤,請檢查Endpoint配置,簡單的判斷方法是通過telnet等方法檢測網絡連通性。

通過MaxCompute Tunnel下載下傳,抛出You have NO privilege ‘odps:Select‘ on {acs:odps:*:projects/XXX/tables/XXX}. project ‘XXX‘ is protected的異常

該project開啟了資料保護功能,使用者操作這是從一個項目的資料導向另一個項目,這需要該project的owner操作。

Tunnel上傳抛出異常ErrorCode=FlowExceeded, ErrorMessage=Your flow quota is exceeded.**

Tunnel對請求的并發進行了控制,預設上傳和下載下傳的并發Quota為2000,任何相關的請求發出到結束過程中均會占用一個Quota機關。若出現類似錯誤,有如下幾種建議的解決方案:

1 sleep一下再重試;

2 将project的tunnel并發quota調大,需要聯系管理者評估流量壓力;

3 報告project owner調查誰占用了大量并發quota,控制一下。

離線批量資料通道Tunnel的最佳實踐及常見問題基本介紹及應用場景SDK上傳最佳實踐常見問題