天天看点

elasticsearch源码之Transportelasticsearch源码之Transport

elasticsearch源码之Transport

es使用netty来实现client和server,netty的启动在NettyTransport.java中,在此类中使用ChannelPipeline初始化了ClientBootstrap和ServerBootstrap,关于channelpipline这边不再多介绍,是netty中的一项功能。es的每个节点既是一个client也是一个server,先看一下client端的实现,client的主要功能是sendRequest和handle response。

client

client的主要功能是发送请求,并处理response,一般来说这是一个阻塞的过程,es的发送请求和处理response是分开的。

发送数据

es的各个模块在需要发请求是通过调用TransportService中的sendRequest方法。

public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
                                                      final TransportRequestOptions options, TransportResponseHandler<T> handler) throws TransportException {
    if (node == null) {
        throw new ElasticsearchIllegalStateException("can't send request to a null node");
    }
    final long requestId = newRequestId();  //通过原子类生成一个唯一的requestID
    TimeoutHandler timeoutHandler = null;
    try {
        if (options.timeout() != null) {
            timeoutHandler = new TimeoutHandler(requestId);
            timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
        }
        clientHandlers.put(requestId, new RequestHolder<T>(handler, node, action, timeoutHandler));
        transport.sendRequest(node, requestId, action, request, options);
    } catch (final Throwable e) {

        final RequestHolder holderToNotify = clientHandlers.remove(requestId);
        if (timeoutHandler != null) {
            timeoutHandler.future.cancel(false);
        }
        if (holderToNotify != null) {
            final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
            threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
                @Override
                public void run() {
                    holderToNotify.handler().handleException(sendRequestException);
                }
            });
        }

        if (throwConnectException) {
            if (e instanceof ConnectTransportException) {
                throw (ConnectTransportException) e;
            }
        }
    }
           

DiscoveryNode是节点对象,action代表此请求的action类型,TransportRequest存储的是请求的数据,TransportRequestOptions代表请求选项,包括请求的类型(用于后边请求时使用不同的连接),timeout等信息,TransportResponseHandler是请求返回的response处理对象。

1.先使用AtomicLong生成了一个requestId

2.判断是否有超时设置,若是有的话,启动一个定时任务,使用timeoutHandler 处理这个requestId。

3.将requestId 和RequestHolder对象的映射关系存入到clientHandlers中(用于处理response),RequestHolder对象将请求返回的response处理对象,action,timeoutHandler,node进行封装。

4,最后调用Transport中的sendRequest方法,这里的transport实际上就是NettyTransport

再来看NettyTransport中的sendRequest方法。

public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
    Channel targetChannel = nodeChannel(node, options);

    if (compress) {
        options.withCompress(true);
    }

    byte status = 0;
    status = TransportStatus.setRequest(status);

    BytesStreamOutput bStream = new BytesStreamOutput();
    bStream.skip(NettyHeader.HEADER_SIZE);
    StreamOutput stream = bStream;
    // only compress if asked, and, the request is not bytes, since then only
    // the header part is compressed, and the "body" can't be extracted as compressed
    if (options.compress() && (!(request instanceof BytesTransportRequest))) {
        status = TransportStatus.setCompress(status);
        stream = CompressorFactory.defaultCompressor().streamOutput(stream);
    }
    stream = new HandlesStreamOutput(stream);

    // we pick the smallest of the 2, to support both backward and forward compatibility
    // note, this is the only place we need to do this, since from here on, we use the serialized version
    // as the version to use also when the node receiving this request will send the response with
    Version version = Version.smallest(this.version, node.version());

    stream.setVersion(version);
    stream.writeString(action);

    ChannelBuffer buffer;
    // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
    // that create paged channel buffers, but its tricky to know when to do it (where this option is
    // more explicit).
    if (request instanceof BytesTransportRequest) {
        BytesTransportRequest bRequest = (BytesTransportRequest) request;
        assert node.version().equals(bRequest.version());
        bRequest.writeThin(stream);
        stream.close();
        ChannelBuffer headerBuffer = bStream.bytes().toChannelBuffer();
        ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
        // false on gathering, cause gathering causes the NIO layer to combine the buffers into a single direct buffer....
        buffer = new CompositeChannelBuffer(headerBuffer.order(), ImmutableList.<ChannelBuffer>of(headerBuffer, contentBuffer), false);
    } else {
        request.writeTo(stream);
        stream.close();
        buffer = bStream.bytes().toChannelBuffer();
    }
    NettyHeader.writeHeader(buffer, requestId, status, version);
    targetChannel.write(buffer);
    }
           

1.根据node和TransportRequestOptions 来获取Channel,NettyTransport中维护了一个connectedNodes的map

final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
           

key为node,value为NodeChannels对象。NodeChannels对象维护了节点不同种类的Channel,也就是上边说的TransportRequestOptions中的类型,每个类型对应一个Channel数数组,意为有多个连接。这个connectedNodes 在集群启动或者发现新的节点时就会调用TransportService中的connectToNode方法,这个方法最终调用的是NettyTransport中的connectToNode方法。

public void connectToNode(DiscoveryNode node, boolean light)
           

若是light为ture的话,则在new NodeChannels时,其中的各个类型的channel数组的长度都为1,

若为false时,NodeChannels各个channel数组的长度,按照配置文件配置来初始化。channel数组的元素,通过调用clientBootstrap.connect(address).getChannel() 来传入,这个channel数组实质上可以理解为是维护了同一个address的多个连接。生成的NodeChannels对象和node一起存入到map中也就是connectedNodes。

对于channal数组中不同类型的各个channel元素,使用轮询来使用每个channel,比如说bulk的channel数组。

bulk[Math.abs(bulkCounter.incrementAndGet()) % bulk.length]
           

2.代码往下的一系列操作将TransportRequest转为ChannelBuffer,并添加一个头信息等,通过targetChannel.write(buffer)方法,发送数据。

处理response

在netty client的初始化时候,ChannelPipeline中定义了channelHandler

pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
           

我们来关注MessageChannelHandler中的messageReceived方法。这个方法那拿到MessageEvent时候去判断message是否是ChannelBuffer实例,若是的话是自己需要处理的数据。

long requestId = buffer.readLong();
byte status = buffer.readByte();
           

从其中解析出requestId和status,并生成一个StreamInput对象,这个对象的传输的数据。然后根据status去判断数据是request还是response,因为在server端也是使用MessageChannelHandler来处理。若是response的话,做如下处理

TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
    if (TransportStatus.isError(status)) {
        handlerResponseError(wrappedStream, handler);
    } else {
        handleResponse(wrappedStream, handler);
    }
} else {
    // if its null, skip those bytes
    buffer.readerIndex(markedReaderIndex + size);
}
if (buffer.readerIndex() != expectedIndexReader) {
    if (buffer.readerIndex() < expectedIndexReader) {
        logger.warn("Message not fully read (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status));
    } else {
        logger.warn("Message read past expected size (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status));
    }
    buffer.readerIndex(expectedIndexReader);
}
           

1.调用transportServiceAdapter.remove方法,实际上就是从上边的clientHandlers中根据requestId取出RequestHolder对象,再拿到RequestHolder对象中的handler属性得到此requestId对应的TransportResponseHandler。

2.若statuse不是error,则执行handleResponse(wrappedStream, handler);

private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {
    final TransportResponse response = handler.newInstance();
    try {
        response.readFrom(buffer);
    } catch (Throwable e) {
        handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
        return;
    }
    try {
        if (handler.executor() == ThreadPool.Names.SAME) {
            //noinspection unchecked
            handler.handleResponse(response);
        } else {
            threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
        }
    } catch (Throwable e) {
        handleException(handler, new ResponseHandlerFailureTransportException(e));
    }
}
           

得到TransportResponse对象(这个对象用来持有数据,跟TransportRequest对象一样),将数据StreamInput写入到对象中,判断此TransportResponseHandler的执行类型,若是SAME,则直接调用handler.handleResponse(response)方法,若不是,则用线程池来执行。

可以看出es在client的实现是非阻塞的,通过channel直接发送消息,并记录requestId,并且requestId包含在消息中,使用MessageChannelHandler来处理接收到的response,并通过requestId来找到对应的TransportResponseHandler对象来处理此response。

server

server的在ChannelPipeline中最后一个channelHandler也是MessageChannelHandler,我们在client的response是也讲到过在messageReceived方法中,运行此方法使从channelbuffer中解析得到requestId,status和StreamInput,然后判断status是否是request,若是的话进行一下处理。

if (TransportStatus.isRequest(status)) {  // 判断是否是request
    String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
    if (buffer.readerIndex() != expectedIndexReader) {
        if (buffer.readerIndex() < expectedIndexReader) {
            logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
        } else {
            logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
        }
        buffer.readerIndex(expectedIndexReader);
    }
} else { ....}
           

实际上调用了handleRequest方法。

private String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
        final String action = buffer.readString(); 
        final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version);
        try {
            final TransportRequestHandler handler = transportServiceAdapter.handler(action);
            if (handler == null) {
                throw new ActionNotFoundTransportException(action);
            }
            final TransportRequest request = handler.newInstance();
            request.readFrom(buffer);
            if (handler.executor() == ThreadPool.Names.SAME) {
                //noinspection unchecked
                handler.messageReceived(request, transportChannel);
            } else {
                threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
            }
        } catch (Throwable e) {
            try {
                transportChannel.sendResponse(e);
            } catch (IOException e1) {
                logger.warn("Failed to send error message back to client for action [" + action + "]", e);
                logger.warn("Actual Exception", e1);
            }
        }
        return action;
    }
           

分析以上代码

1.从StreamInput中解析出action,这个action用来区分TransportRequestHandler,后边调用了transportServiceAdapter.handler(action)方法来获取TransportRequestHandler,这个方法实际上使用的是TransportService的serverHandlers属性,它是一个map结构,key是action,value是对应的TransportRequestHandler对象,map中的元素在es的各个模块中通过调用TransportService中的registerHandler(String action, TransportRequestHandler handler)方法进行添加。

volatile ImmutableMap<String, TransportRequestHandler> serverHandlers = ImmutableMap.of();
           

2.生成一个NettyTransportChannel对象,用于发送response信息。

3.生成TransportRequest对象,并写入数据

4.最后判断此TransportRequestHandler的执行类型,若是SAME,则直接调用handler.messageReceived(request, transportChannel)方法,若不是,则用线程池来执行。

可以看出server端使用MessageChannelHandler作一个转发,解析出action,根据action去寻找对应的TransportRequestHandler来处理这个请求,并作出响应。

总结

es的Transport的核心实际上包含三个类,TransportService,NettyTransport,MessageChannelHandler

1.TransportService

对外暴露服务,通过调用sendRequest发请求(实际上调用的是NettyTransport中的方法),connectToNode方法来连接到node(实际上调用的是NettyTransport中的方法),registerHandler来注册TransportRequestHandler,提供TransportServiceAdapter内部类来根据requestId获取TransportResponseHandler和根据action获取TransportRequestHandler。一下是其中的主要属性。

// 保存了action name  和对应的  RequestHandler
volatile ImmutableMap<String, TransportRequestHandler> serverHandlers = ImmutableMap.of();
// requestId  和requestHolder的映射   在处理response时使用
final ConcurrentMapLong<RequestHolder> clientHandlers = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
final AtomicLong requestIds = new AtomicLong();  // 原子的requestID
final CopyOnWriteArrayList<TransportConnectionListener> connectionListeners = new CopyOnWriteArrayList<TransportConnectionListener>();
           

2.NettyTransport

初始化client和server,提供底层方法,供TransportService使用,sendRequest方法,connectToNode方法,维护了NodeChannels对象。NodeChannels对象维护了节点的5种类型的channel,NettyTransport主要属性如下:

维护了node和对应的channel
final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private volatile ClientBootstrap clientBootstrap; //客户端启动
private volatile ServerBootstrap serverBootstrap; // 服务端启动
// 管理node的连接锁
private final KeyedLock<String> connectionLock = new KeyedLock<String>();
//连接节点或者stop时用到
private final ReadWriteLock globalLock = new ReentrantReadWriteLock();
           

3.MessageChannelHandler

用于客户端response消息处理,服务端request消息处理。

继续阅读