(1.8版本)client和worker之間的block子產品的通訊架構
block作為alluxio檔案讀取或者存儲的最小基本機關,都是通過BlockOutStream和BlockInputtream實作的
其中具體的資料包傳輸有Short circuit和netty兩種實作:
- Short circuit:通過本地的ramdisk傳輸和netty傳輸
- tcp傳輸:隻通過netty傳輸
輸入流代碼中,關于兩種實作的選擇邏輯:
1 public static BlockInStream create(FileSystemContext context, BlockInfo info,
2 WorkerNetAddress dataSource, BlockInStreamSource dataSourceType, InStreamOptions options)
3 throws IOException {
4 URIStatus status = options.getStatus();
5 OpenFileOptions readOptions = options.getOptions();
6
7 boolean promote = readOptions.getReadType().isPromote();
8
9 long blockId = info.getBlockId();
10 long blockSize = info.getLength();
11
12 // Construct the partial read request
13 Protocol.ReadRequest.Builder builder =
14 Protocol.ReadRequest.newBuilder().setBlockId(blockId).setPromote(promote);
15 // Add UFS fallback options
16 builder.setOpenUfsBlockOptions(options.getOpenUfsBlockOptions(blockId));
17
18 boolean shortCircuit = Configuration.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED);
19 boolean sourceSupportsDomainSocket = NettyUtils.isDomainSocketSupported(dataSource);
20 boolean sourceIsLocal = dataSourceType == BlockInStreamSource.LOCAL;
21
22 // Short circuit
23 if (sourceIsLocal && shortCircuit && !sourceSupportsDomainSocket) {
24 LOG.debug("Creating short circuit input stream for block {} @ {}", blockId, dataSource);
25 try {
26 return createLocalBlockInStream(context, dataSource, blockId, blockSize, options);
27 } catch (NotFoundException e) {
28 // Failed to do short circuit read because the block is not available in Alluxio.
29 // We will try to read via netty. So this exception is ignored.
30 LOG.warn("Failed to create short circuit input stream for block {} @ {}. Falling back to "
31 + "network transfer", blockId, dataSource);
32 }
33 }
34
35 // Netty
36 LOG.debug("Creating netty input stream for block {} @ {} from client {} reading through {}",
37 blockId, dataSource, NetworkAddressUtils.getClientHostName(), dataSource);
38 return createNettyBlockInStream(context, dataSource, dataSourceType, builder.buildPartial(),
39 blockSize, options);
40 }
輸出流代碼中,關于兩種實作的選擇邏輯:
1 /**
2 * @param context the file system context
3 * @param blockId the block ID
4 * @param blockSize the block size in bytes
5 * @param address the Alluxio worker address
6 * @param options the out stream options
7 * @return the {@link PacketWriter} instance
8 */
9 public static PacketWriter create(FileSystemContext context, long blockId, long blockSize,
10 WorkerNetAddress address, OutStreamOptions options) throws IOException {
11 if (CommonUtils.isLocalHost(address) && Configuration
12 .getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED) && !NettyUtils
13 .isDomainSocketSupported(address)) {
14 LOG.debug("Creating short circuit output stream for block {} @ {}", blockId, address);
15 return LocalFilePacketWriter.create(context, address, blockId, options);
16 } else {
17 LOG.debug("Creating netty output stream for block {} @ {} from client {}", blockId, address,
18 NetworkAddressUtils.getClientHostName());
19 return NettyPacketWriter
20 .create(context, address, blockId, blockSize, Protocol.RequestType.ALLUXIO_BLOCK,
21 options);
22 }
23 }
short-circuit政策
針對block的的建立/摧毀,通過netty進行網絡通訊
針對block的實際内容,直接通過ramdisk寫到本地,避免了使用netty進行網絡通訊,
short-circuit通訊-用戶端:
下圖是Netty版本的用戶端+LocalFileBlockWriter+LocalFileBlockReader調用過程
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISM9AnYldnJwAzN9c3PnBnauQ0MlQ0MlcnW3BXbMJTQU90dNRUT0UEVOhHMT9kMNR1TxMGVOFTRE1EeNRUT1UERNlHMT9UeJRlTyEEVNZXTE1UNFRUT5hzUPlXSU5kMBRVT2NmMiNnSywEd5ITW110MaZHetlVdO1GT0UERNl3YXJGc5kHT20ESjBjUIF2Lc12bj5SYphXa5VWen5WY35iclN3Ztl2Lc9CX6MHc0RHaiojIsJye.jpg)
short-circuit通訊-服務端:
下圖是Netty版本的服務端調用
alluxio-rpc調用概述-client和worker之間的block子產品的通訊架構(netty版本)(2)
非short-circuit
非short-circuit通訊-用戶端:
下圖是Netty版本的用戶端調用過程
非short-circuit通訊-服務端:
下圖是Netty版本的服務端調用
下一篇将介紹BlockWorker相關的功能