天天看點

[源碼分析] 消息隊列 Kombu 之 啟動過程

本系列我們介紹消息隊列 Kombu。Kombu 的定位是一個相容 AMQP 協定的消息隊列抽象。通過本文,大家可以了解 Kombu 是如何啟動,以及如何搭建一個基本的架子。

[源碼分析] 消息隊列 Kombu 之 啟動過程

0x00 摘要

因為之前有一個綜述,是以大家會發現,一些概念講解文字會同時出現在後續文章和綜述之中。

0x01 示例

下面使用如下代碼來進行說明。

本示例來自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感謝。

def main(arguments):
    hub = Hub()
    exchange = Exchange('asynt_exchange')
    queue = Queue('asynt_queue', exchange, 'asynt_routing_key')

    def send_message(conn):
        producer = Producer(conn)
        producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
        print('message sent')

    def on_message(message):
        print('received: {0!r}'.format(message.body))
        message.ack()
        # hub.stop()  # <-- exit after one message

    conn = Connection('redis://localhost:6379')
    conn.register_with_event_loop(hub)

    def p_message():
        print(' kombu ')

    with Consumer(conn, [queue], on_message=on_message):
        send_message(conn)
        hub.timer.call_repeatedly(3, p_message)
        hub.run_forever()

if __name__ == '__main__':
    sys.exit(main(sys.argv[1:]))      

0x02 啟動

讓我們順着程式流程看看Kombu都做了些什麼,也可以對 Kombu 内部有所了解。

本文關注的重點是:Connection,Channel 和 Hub 是如何聯系在一起的。

2.1 Hub

在程式開始,我們建立了Hub。

Hub的作用是建立消息Loop,但是此時尚未建立,是以隻是一個靜态執行個體。

hub = Hub()      

其定義如下:

class Hub:
    """Event loop object.
    Arguments:
        timer (kombu.asynchronous.Timer): Specify custom timer instance.
    """
    def __init__(self, timer=None):
        self.timer = timer if timer is not None else Timer()

        self.readers = {}
        self.writers = {}
        self.on_tick = set()
        self.on_close = set()
        self._ready = set()

        self._running = False
        self._loop = None

        self.consolidate = set()
        self.consolidate_callback = None

        self.propagate_errors = ()
        self._create_poller()      

因為此時沒有建立loop,是以目前重要的步驟是建立Poll,其Stack如下:

_get_poller, eventio.py:321
poll, eventio.py:328
_create_poller, hub.py:113
__init__, hub.py:96
main, testUb.py:22, testUb.py:55      

在eventio.py中有如下,我們可以看到Kombu可以使用多種模型來進行核心消息處理:

def _get_poller():
    if detect_environment() != 'default':
        # greenlet
        return _select
    elif epoll:
        # Py2.6+ Linux
        return _epoll
    elif kqueue and 'netbsd' in sys.platform:
        return _kqueue
    elif xpoll:
        return _poll
    else:
        return _select      

因為本機情況,這裡選擇的是:_poll。

+------------------+
| Hub              |
|                  |
|                  |            +-------------+
|      poller +---------------> | _poll       |
|                  |            |             |         +-------+
|                  |            |    _poller+---------> |  poll |
+------------------+            |             |         +-------+
                                +-------------+      

2.2 Exchange與Queue

其次建立了Exchange與Queue。

  • Exchange:交換機,消息發送者将消息發至 Exchange,Exchange 負責将消息分發至 Queue;
  • Queue:消息隊列,存儲着即将被應用消費掉的消息,Exchange 負責将消息分發 Queue,消費者從 Queue 接收消息;

因為此時也沒有具體消息,是以我們暫且無法探究Exchange機制。

exchange = Exchange('asynt')
queue = Queue('asynt', exchange, 'asynt')      

此時将把Exchange與Queue聯系起來。圖示如下:

+------------------+
| Hub              |
|                  |
|                  |            +-------------+
|      poller +---------------> | _poll       |
|                  |            |             |         +-------+
|                  |            |    _poller+---------> |  poll |
+------------------+            |             |         +-------+
                                +-------------+


+----------------+         +-------------------+
| Exchange       |         | Queue             |
|                |         |                   |
|                |         |                   |
|     channel    | <------------+ exchange     |
|                |         |                   |
|                |         |                   |
+----------------+         +-------------------+      

2.3 Connection

第三步就是建立Connection。

Connection是對 MQ 連接配接的抽象,一個 Connection 就對應一個 MQ 的連接配接。現在就是對'redis://localhost:6379'連接配接進行抽象。

conn = Connection('redis://localhost:6379')      

2.3.1 定義

由定義注釋可知,Connection是到broker的連接配接。從具體代碼可以看出,Connection更接近是一個邏輯概念,具體功能都委托給别人完成。

消息從來不直接發送給隊列,甚至 Producers 都可能不知道隊列的存在。 Producer如何才能将消息發送給Consumer呢?這中間需要經過 Message Broker 的處理和傳遞。

AMQP中,承擔 Message Broker 功能的就是 AMQP Server。也正是從這個角度講,AMQP 的 Producer 和Consumer 都是 AMQP Client。

在Kombu 體系中,用 transport 對所有的 broker 進行了抽象,為不同的 broker 提供了一緻的解決方案。通過Kombu,開發者可以根據實際需求靈活的選擇或更換broker。

Connection主要成員變量是,但是此時沒有指派:

  • _connection:
  • _transport:就是上面提到的對 broker 的抽象。
  • cycle:與broker互動的排程政策。
  • failover_strategy:在連接配接失效時,選取其他hosts的政策。
  • heartbeat:用來實施心跳。

代碼如下:

class Connection:
    """A connection to the broker"""

    port = None
    virtual_host = '/'
    connect_timeout = 5

    _connection = None
    _default_channel = None
    _transport = None
    uri_prefix = None

    #: The cache of declared entities is per connection,
    #: in case the server loses data.
    declared_entities = None

    #: Iterator returning the next broker URL to try in the event
    #: of connection failure (initialized by :attr:`failover_strategy`).
    cycle = None

    #: Additional transport specific options,
    #: passed on to the transport instance.
    transport_options = None

    #: Strategy used to select new hosts when reconnecting after connection
    #: failure.  One of "round-robin", "shuffle" or any custom iterator
    #: constantly yielding new URLs to try.
    failover_strategy = 'round-robin'

    #: Heartbeat value, currently only supported by the py-amqp transport.
    heartbeat = None

    resolve_aliases = resolve_aliases
    failover_strategies = failover_strategies

    hostname = userid = password = ssl = login_method = None      

2.3.2 init 與 transport

Connection内部主要任務是建立了transport。

Stack大緻如下:

Transport, redis.py:1039, redis.py:1031
import_module, __init__.py:126
symbol_by_name, imports.py:56
resolve_transport, __init__.py:70
get_transport_cls, __init__.py:85
__init__, connection.py:183
main, testUb.py:40, testUb.py:55      

2.4 Transport

在Kombu體系中,用transport對所有的broker進行了抽象,為不同的broker提供了一緻的解決方案。通過Kombu,開發者可以根據實際需求靈活的選擇或更換broker。

Transport:真實的 MQ 連接配接,也是真正連接配接到 MQ(redis/rabbitmq) 的執行個體。就是存儲和發送消息的實體,用來區分底層消息隊列是用amqp、Redis還是其它實作的。

Transport負責具體操作,但是很多操作移交給 loop 與 MultiChannelPoller 進行。

2.4.1 定義

其主要成員變量為:

  • 本transport的驅動類型,名字;
  • 對應的 Channel;
  • cycle:MultiChannelPoller,具體下文提到;

定義如下:

class Transport(virtual.Transport):
    """Redis Transport."""

    Channel = Channel

    polling_interval = None  # disable sleep between unsuccessful polls.
    default_port = DEFAULT_PORT
    driver_type = 'redis'
    driver_name = 'redis'

    implements = virtual.Transport.implements.extend(
        asynchronous=True,
        exchange_type=frozenset(['direct', 'topic', 'fanout'])
    )

    def __init__(self, *args, **kwargs):
        if redis is None:
            raise ImportError('Missing redis library (pip install redis)')
        super().__init__(*args, **kwargs)

        # Get redis-py exceptions.
        self.connection_errors, self.channel_errors = self._get_errors()
        # All channels share the same poller.
        self.cycle = MultiChannelPoller()      

2.4.2 移交操作

Transport負責具體操作,但是很多操作移交給 loop 與 MultiChannelPoller 進行,具體從下面代碼可見。

def register_with_event_loop(self, connection, loop):
    cycle = self.cycle
    cycle.on_poll_init(loop.poller)
    cycle_poll_start = cycle.on_poll_start
    add_reader = loop.add_reader
    on_readable = self.on_readable

    def _on_disconnect(connection):
        if connection._sock:
            loop.remove(connection._sock)
    cycle._on_connection_disconnect = _on_disconnect

    def on_poll_start():
        cycle_poll_start()
        [add_reader(fd, on_readable, fd) for fd in cycle.fds]
        
    loop.on_tick.add(on_poll_start)
    loop.call_repeatedly(10, cycle.maybe_restore_messages)
    
    health_check_interval = connection.client.transport_options.get(
        'health_check_interval',
        DEFAULT_HEALTH_CHECK_INTERVAL
    )
    
    loop.call_repeatedly(
        health_check_interval,
        cycle.maybe_check_subclient_health
    )      

其中重點是MultiChannelPoller。一個Connection有一個Transport, 一個Transport有一個MultiChannelPoller,對poll操作都是由MultiChannelPoller完成,redis操作由channel完成。

2.4.3 MultiChannelPoller

定義如下,可以了解為執行engine,主要作用是:

  • 收集channel;
  • 建立fd到channel的映射;
  • 建立channel到socks的映射;
  • 使用poll;
class MultiChannelPoller:
    """Async I/O poller for Redis transport."""

    eventflags = READ | ERR

    def __init__(self):
        # active channels
        self._channels = set()
        # file descriptor -> channel map.
        self._fd_to_chan = {}
        # channel -> socket map
        self._chan_to_sock = {}
        # poll implementation (epoll/kqueue/select)
        self.poller = poll()
        # one-shot callbacks called after reading from socket.
        self.after_read = set()      

2.4.4 擷取

Transport是預先生成的,若需要,則依據名字取得。

TRANSPORT_ALIASES = {
    'amqp': 'kombu.transport.pyamqp:Transport',
    'amqps': 'kombu.transport.pyamqp:SSLTransport',
    'pyamqp': 'kombu.transport.pyamqp:Transport',
    'librabbitmq': 'kombu.transport.librabbitmq:Transport',
    'memory': 'kombu.transport.memory:Transport',
    'redis': 'kombu.transport.redis:Transport',
	......
    'pyro': 'kombu.transport.pyro:Transport'
}

_transport_cache = {}


def resolve_transport(transport=None):
    """Get transport by name. """
    if isinstance(transport, str):
        try:
            transport = TRANSPORT_ALIASES[transport]
        except KeyError:
            if '.' not in transport and ':' not in transport:
                from kombu.utils.text import fmatch_best
                alt = fmatch_best(transport, TRANSPORT_ALIASES)
        else:
            if callable(transport):
                transport = transport()
        return symbol_by_name(transport)
    return transport

def get_transport_cls(transport=None):
    """Get transport class by name.
    """
    if transport not in _transport_cache:
        _transport_cache[transport] = resolve_transport(transport)
    return _transport_cache[transport]      

此時Connection資料如下,注意其部分成員變量尚且沒有意義:

conn = {Connection}alt = {list: 0} []
 connect_timeout = {int} 5
 connection = {Transport}cycle = {NoneType} None
 declared_entities = {set: 0} set()
 default_channel = {Channel}failover_strategies = {dict: 2} {'round-robin':, 'shuffle':}
 failover_strategy = {type}heartbeat = {int} 0
 host = {str} 'localhost:6379'
 hostname = {str} 'localhost'
 manager = {Management}port = {int} 6379
 recoverable_channel_errors = {tuple: 0} ()
 resolve_aliases = {dict: 2} {'pyamqp': 'amqp', 'librabbitmq': 'amqp'}
 transport = {Transport}transport_cls = {str} 'redis'
 uri_prefix = {NoneType} None
 userid = {NoneType} None
 virtual_host = {str} '/'      

至此,Kombu的基本就建立完成,但是彼此之間沒有建立邏輯聯系。

是以此時示例如下,注意此時三者沒有聯系:

+-------------------+       +---------------------+       +--------------------+
| Connection        |       | redis.Transport     |       | MultiChannelPoller |
|                   |       |                     |       |                    |
|                   |       |                     |       |     _channels      |
|                   |       |        cycle +------------> |     _fd_to_chan    |
|     transport +---------> |                     |       |     _chan_to_sock  |
|                   |       |                     |       |     poller         |
+-------------------+       +---------------------+       |     after_read     |
                                                          |                    |
                                                          +--------------------+
+------------------+
| Hub              |
|                  |
|                  |            +-------------+
|      poller +---------------> | _poll       |
|                  |            |             |         +-------+
|                  |            |    _poller+---------> |  poll |
+------------------+            |             |         +-------+
                                +-------------+
+----------------+         +-------------------+
| Exchange       |         | Queue             |
|                |         |                   |
|                |         |                   |
|     channel    | <------------+ exchange     |
|                |         |                   |
|                |         |                   |
+----------------+         +-------------------+      

0x03 Connection注冊hub

之前我們提到,基本架子已經建立起來,但是各個子產品之間彼此沒有聯系,下面我們就看看如何建立聯系。

示例代碼來到:

conn.register_with_event_loop(hub)      

這裡進行了注冊,此時作用是把hub與Connection聯系起來。随之調用到:

def register_with_event_loop(self, loop):
    self.transport.register_with_event_loop(self.connection, loop)      

進而調用到transport類:

具體代碼如下:

def register_with_event_loop(self, connection, loop):
    cycle = self.cycle
    cycle.on_poll_init(loop.poller)# 這裡建立聯系,loop就是hub
    cycle_poll_start = cycle.on_poll_start
    add_reader = loop.add_reader
    on_readable = self.on_readable

    def _on_disconnect(connection):
        if connection._sock:
            loop.remove(connection._sock)
    cycle._on_connection_disconnect = _on_disconnect

    def on_poll_start():
        cycle_poll_start()
        [add_reader(fd, on_readable, fd) for fd in cycle.fds]
        
    loop.on_tick.add(on_poll_start)
    loop.call_repeatedly(10, cycle.maybe_restore_messages)
    
    health_check_interval = connection.client.transport_options.get(
        'health_check_interval',
        DEFAULT_HEALTH_CHECK_INTERVAL
    )
    
    loop.call_repeatedly(
        health_check_interval,
        cycle.maybe_check_subclient_health
    )      

3.1 建立Channel

注冊最初是建立Channel。這裡有一個連接配接的動作,就是在這裡,建立了Channel。

@property
def connection(self):
    """The underlying connection object"""
    if not self._closed:
        if not self.connected:
            return self._ensure_connection(
                max_retries=1, reraise_as_library_errors=False
            )
        return self._connection      

具體建立是在 base.py 中完成,這是 Transport 基類。Stack 如下:

create_channel, base.py:920
establish_connection, base.py:938
_establish_connection, connection.py:801
_connection_factory, connection.py:866
retry_over_time, functional.py:325
_ensure_connection, connection.py:439
connection, connection.py:859
register_with_event_loop, connection.py:266
main, testUb.py:41, testUb.py:55      

3.2 Channel

Channel:與AMQP中概念類似,可以了解成共享一個Connection的多個輕量化連接配接。就是真正的連接配接。

可以認為是 redis 操作和連接配接的封裝。每個 Channel 都可以與 redis 建立一個連接配接,在此連接配接之上對 redis 進行操作,每個連接配接都有一個 socket,每個 socket 都有一個 file,從這個 file 可以進行 poll。

為了更好的說明,我們提前給出這個通訊流程大約如下:

            +---------------------------------------------------------------------------------------------------------------------------------------+
            |                                     +--------------+                                   6                       parse_response         |
            |                                +--> | Linux Kernel | +---+                                                                            |
            |                                |    +--------------+     |                                                                            |
            |                                |                         |                                                                            |
            |                                |                         |  event                                                                     |
            |                                |  1                      |                                                                            |
            |                                |                         |  2                                                                         |
            |                                |                         |                                                                            |
    +-------+---+    socket                  +                         |                                                                            |
    |   redis   |  port +-->  fd +--->+                  v                                                                            |
    |           |                                   |           +------+--------+                                                                   |
    |           |    socket                         |           |  Hub          |                                                                   |
    |           |  port +-->  fd +--->----------> |               |                                                                   |
    | port=6379 |                                   |           |               |                                                                   |
    |           |    socket                         |           |     readers +----->  Transport.on_readable                                        |
    |           |  port +-->  fd +--->+           |               |                     +                                             |
    +-----------+                                               +---------------+                     |                                             |
                                                                                                      |                                             |
                                                        3                                             |                                             |
             +----------------------------------------------------------------------------------------+                                             |
             |                                                                                                                                      v
             |                                                                                                                                                  _receive_callback
             |                                                                                                                            5    +-------------+                      +-----------+
+------------+------+                     +-------------------------+                                    'BRPOP' = Channel._brpop_read +-----> | Channel     | +------------------> | Consumer  |
|       Transport   |                     |  MultiChannelPoller     |      +------>  channel . handlers  'LISTEN' = Channel._receive           +-------------+                      +---+-------+
|                   |                     |                         |      |                                                                                           8                |
|                   | on_readable(fileno) |                         |      |                                                                         ^                                  |
|           cycle +---------------------> |          _fd_to_chan +---------------->  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  |
|                   |        4            |                         |      |                             'LISTEN' = Channel._receive                 |                                  |
|  _callbacks[queue]|                     |                         |      |                                                                         |                            on_m  |  9
|          +        |                     +-------------------------+      +------>  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  |
+-------------------+                                                                                    'LISTEN' = Channel._receive                 |                                  |
           |                                                                                                                                         |                                  v
           |                                                7           _callback                                                                    |
           +-----------------------------------------------------------------------------------------------------------------------------------------+                            User Function      

手機上如下:

[源碼分析] 消息隊列 Kombu 之 啟動過程

3.2.1 定義

Channel 主要成員是:

  • async_pool :redis異步連接配接池;
  • pool :redis連接配接池;
  • channel_id :Channel ID;
  • client :就是StrictRedis之類的driver;
  • connection :對應的Transport;
  • cycle = {FairCycle}
  • queue_order_strategy :擷取queue的政策;
  • state :BrokerState狀态;
  • subclient :PubSub所用的client;

    keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX) :bing用到的key;

比如_get_client可以看出來client。

def _get_client(self):
    if redis.VERSION < (3, 2, 0):
        raise VersionMismatch(
            'Redis transport requires redis-py versions 3.2.0 or later. '
            'You have {0.__version__}'.format(redis))
    return redis.StrictRedis      

簡化版定義如下:

class Channel(virtual.Channel):
    """Redis Channel."""

    QoS = QoS

    _client = None
    _subclient = None
    keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX)
    keyprefix_fanout = '/{db}.'
    sep = '\x06\x16'
    _fanout_queues = {}
    unacked_key = '{p}unacked'.format(p=KEY_PREFIX)
    unacked_index_key = '{p}unacked_index'.format(p=KEY_PREFIX)
    unacked_mutex_key = '{p}unacked_mutex'.format(p=KEY_PREFIX)
    unacked_mutex_expire = 300  # 5 minutes
    unacked_restore_limit = None
    visibility_timeout = 3600   # 1 hour
    max_connections = 10
    queue_order_strategy = 'round_robin'

    _async_pool = None
    _pool = None

    from_transport_options = (
        virtual.Channel.from_transport_options +
        ('sep',
         'ack_emulation',
         'unacked_key',
		 ......
         'max_connections',
         'health_check_interval',
         'retry_on_timeout',
         'priority_steps')  # <-- do not add comma here!
    )

    connection_class = redis.Connection if redis else None      

3.2.2 基類

基類定義如下:

class Channel(AbstractChannel, base.StdChannel):
    """Virtual channel.

    Arguments:
        connection (ConnectionT): The transport instance this
            channel is part of.
    """

    #: message class used.
    Message = Message

    #: QoS class used.
    QoS = QoS

    #: flag to restore unacked messages when channel
    #: goes out of scope.
    do_restore = True

    #: mapping of exchange types and corresponding classes.
    exchange_types = dict(STANDARD_EXCHANGE_TYPES)

    #: flag set if the channel supports fanout exchanges.
    supports_fanout = False

    #: Binary  ASCII codecs.
    codecs = {'base64': Base64()}

    #: Default body encoding.
    #: NOTE: ``transport_options['body_encoding']`` will override this value.
    body_encoding = 'base64'

    #: counter used to generate delivery tags for this channel.
    _delivery_tags = count(1)

    #: Optional queue where messages with no route is delivered.
    #: Set by ``transport_options['deadletter_queue']``.
    deadletter_queue = None

    # List of options to transfer from :attr:`transport_options`.
    from_transport_options = ('body_encoding', 'deadletter_queue')

    # Priority defaults
    default_priority = 0
    min_priority = 0
    max_priority = 9      

最終具體舉例如下:

self = {Channel}Client = {type}Message = {type}QoS = {type}active_fanout_queues = {set: 0} set()
 active_queues = {set: 0} set()
 async_pool = {ConnectionPool} ConnectionPool<Connection>
 auto_delete_queues = {set: 0} set()
 channel_id = {int} 1
 client = {Redis} Redis<ConnectionPool<Connection>>
 codecs = {dict: 1} {'base64':}
 connection = {Transport}connection_class = {type}cycle = {FairCycle}deadletter_queue = {NoneType} None
 exchange_types = {dict: 3} {'direct':, 'topic':, 
 handlers = {dict: 2} {'BRPOP':<bound method Channel._brpop_read of >, 'LISTEN':<bound method Channel._receive of >}
 pool = {ConnectionPool} ConnectionPool<Connection>
 qos = {QoS}queue_order_strategy = {str} 'round_robin'
 state = {BrokerState}subclient = {PubSub}      

3.2.3 redis消息回調函數

關于上面成員變量,這裡需要說明的是

 handlers = {dict: 2} 
  {
    'BRPOP':<bound method Channel._brpop_read of >, 
    'LISTEN':<bound method Channel._receive of >
  }      

這是redis有消息時的回調函數,即:

  • BPROP 有消息時候,調用 Channel._brpop_read;
  • LISTEN 有消息時候,調用 Channel._receive;

3.2.4  Redis 直接相關的主要成員

與Redis 直接相關的成員定義在:redis/client.py。

與 Redis 直接相關的主要成員是如下,會利用如下變量進行具體 redis操作:

分别對應如下類型:

channel = {Channel}Client = {type}async_pool = {ConnectionPool} ConnectionPool<Connection>
 client = {Redis} Redis<ConnectionPool<Connection>>
 connection = {Transport}connection_class = {type}connection_class_ssl = {type}pool = {ConnectionPool} ConnectionPool<Connection>
 subclient = {PubSub}      
def _create_client(self, asynchronous=False):
    if asynchronous:
        return self.Client(connection_pool=self.async_pool)
    return self.Client(connection_pool=self.pool)

def _get_pool(self, asynchronous=False):
    params = self._connparams(asynchronous=asynchronous)
    self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])
    return redis.ConnectionPool(**params)

def _get_client(self):
    if redis.VERSION < (3, 2, 0):
        raise VersionMismatch(
            'Redis transport requires redis-py versions 3.2.0 or later. '
            'You have {0.__version__}'.format(redis))
    return redis.StrictRedis

@property
def pool(self):
    if self._pool is None:
        self._pool = self._get_pool()
    return self._pool

@property
def async_pool(self):
    if self._async_pool is None:
        self._async_pool = self._get_pool(asynchronous=True)
    return self._async_pool

@cached_property
def client(self):
    """Client used to publish messages, BRPOP etc."""
    return self._create_client(asynchronous=True)

@cached_property
def subclient(self):
    """Pub/Sub connection used to consume fanout queues."""
    client = self._create_client(asynchronous=True)
    return client.pubsub()      

因為添加了Channel,是以此時如下:

+-----------------+
| Channel         |
|                 |      +-----------------------------------------------------------+
|    client  +---------> | Redis<ConnectionPool<Connection|
|                 |      +-----------------------------------------------------------+
|                 |
|                 |      +---------------------------------------------------+-+
|    pool  +---------->  |ConnectionPool<Connection|
|                 |      +---------------------------------------------------+-+
|                 |
|                 |
|                 |
|    connection   |
|                 |
+-----------------+

+-------------------+       +---------------------+       +--------------------+
| Connection        |       | redis.Transport     |       | MultiChannelPoller |
|                   |       |                     |       |                    |
|                   |       |                     |       |     _channels      |
|                   |       |        cycle +------------> |     _fd_to_chan    |
|     transport +---------> |                     |       |     _chan_to_sock  |
|                   |       |                     |       |     poller         |
+-------------------+       +---------------------+       |     after_read     |
                                                          |                    |
+------------------+                                      +--------------------+
| Hub              |
|                  |
|                  |            +-------------+
|      poller +---------------> | _poll       |
|                  |            |             |         +-------+
|                  |            |    _poller+---------> |  poll |
+------------------+            |             |         +-------+
                                +-------------+
+----------------+         +-------------------+
| Exchange       |         | Queue             |
|                |         |                   |
|                |         |                   |
|     channel    | <------------+ exchange     |
|                |         |                   |
|                |         |                   |
+----------------+         +-------------------+      

3.3 channel 與 Connection 聯系

講到這裡,基本道理大家都懂,但是具體兩者之間如何聯系,我們需要再剖析下。

3.3.1 從Connection得到channel

在Connection定義中有如下,原來 Connection 是通過 transport 來得到 channel:

def channel(self):
    """Create and return a new channel."""
    self._debug('create channel')
    chan = self.transport.create_channel(self.connection)
    return chan      

3.3.2 Transport具體建立

在Transport之中有:

def create_channel(self, connection):
    try:
        return self._avail_channels.pop()
    except IndexError:
        channel = self.Channel(connection)
        self.channels.append(channel)
        return channel      

原來在 Transport 有兩個channels 清單:

self._avail_channels
self.channels      

如果_avail_channels 有内容則直接擷取,否則生成一個新的Channel。

在真正連接配接時候,會調用 establish_connection 放入self._avail_channels。

def establish_connection(self):
    # creates channel to verify connection.
    # this channel is then used as the next requested channel.
    # (returned by ``create_channel``).
    self._avail_channels.append(self.create_channel(self))
    return self     # for drain events      

其堆棧如下:

__init__, redis.py:557
create_channel, base.py:921
establish_connection, base.py:939
_establish_connection, connection.py:801
_connection_factory, connection.py:866
retry_over_time, functional.py:313
_ensure_connection, connection.py:439
connection, connection.py:859
channel, connection.py:283, node.py:11      

3.3.3 建立聯系

在init中有:

def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)

    if not self.ack_emulation:  # disable visibility timeout
        self.QoS = virtual.QoS

    self._queue_cycle = cycle_by_name(self.queue_order_strategy)()
    self.Client = self._get_client()
    self.ResponseError = self._get_response_error()
    self.active_fanout_queues = set()
    self.auto_delete_queues = set()
    self._fanout_to_queue = {}
    self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}
 
    ......

    self.connection.cycle.add(self)  # add to channel poller.

    if register_after_fork is not None:
        register_after_fork(self, _after_fork_cleanup_channel)      

重點是:

self.connection.cycle.add(self)  # add to channel poller.      

這就是把 Channel與Transport 中的 poller 聯系起來,這樣Transport可以利用Channel去與真實的redis進行互動。

堆棧如下:

add, redis.py:277
__init__, redis.py:531
create_channel, base.py:920
establish_connection, base.py:938
_establish_connection, connection.py:801
_connection_factory, connection.py:866
retry_over_time, functional.py:325
_ensure_connection, connection.py:439
connection, connection.py:859
register_with_event_loop, connection.py:266
main, testUb.py:41      

因為已經聯系起來,是以此時如下:

+-----------------+
| Channel         |
|                 |      +-----------------------------------------------------------+
|    client  +---------> | Redis<ConnectionPool<Connection|
|                 |      +-----------------------------------------------------------+
|                 |
|                 |      +---------------------------------------------------+-+
|    pool  +---------->  |ConnectionPool<Connection|
|                 |      +---------------------------------------------------+-+
|                 |
|                 |    |     _fd_to_chan    |
|     transport +---------> |                     |       |     _chan_to_sock  |
|                   |       |                     |       |     poller         |
+-------------------+       +---------------------+       |     after_read     |
                                                          |                    |
+------------------+                                      +--------------------+
| Hub              |
|                  |
|                  |            +-------------+
|      poller +---------------> | _poll       |
|                  |            |             |         +-------+
|                  |            |    _poller+---------> |  poll |
+------------------+            |             |         +-------+
                                +-------------+
+----------------+         +-------------------+
| Exchange       |         | Queue             |
|                |         |                   |
|                |         |                   |
|     channel    | <------------+ exchange     |
|                |         |                   |
|                |         |                   |
+----------------+         +-------------------+      

3.3 Transport 與 Hub 聯系

on_poll_init 這裡就是把 kombu.transport.redis.Transport 與 Hub 聯系起來。

self.poller = poller

把Transport與Hub的poll聯系起來。這樣 Transport 就可以利用 poll。

def on_poll_init(self, poller):
    self.poller = poller
    for channel in self._channels:
        return channel.qos.restore_visible(
            num=channel.unacked_restore_limit,
        )      

此時變量如下:

poller = {_poll}self = {MultiChannelPoller}after_read = {set: 0} set()
 eventflags = {int} 25
 fds = {dict: 0} {}
 poller = {_poll}      

是以,我們最終如下:

+-----------------+
| Channel         |
|                 |      +-----------------------------------------------------------+
|    client  +---------> | Redis<ConnectionPool<Connection|
|                 |      +-----------------------------------------------------------+
|                 |
|                 |      +---------------------------------------------------+-+
|    pool  +---------->  |ConnectionPool<Connection|
|                 |      +---------------------------------------------------+-+
|                 |
|                 |    |     _fd_to_chan    |
|     transport +---------> |                     |       |     _chan_to_sock  |
|                   |       |                     |    + | _poll       |
|                  |            |             |         +-------+
|                  |            |    _poller+---------> |  poll |
+------------------+            |             |         +-------+
                                +-------------+
+----------------+         +-------------------+
| Exchange       |         | Queue             |
|                |         |                   |
|                |         |                   |
|     channel    | <------------+ exchange     |
|                |         |                   |
|                |         |                   |
+----------------+         +-------------------+      

0x04 總結

具體如圖,可以看出來,上面三個基本子產品已經聯系到了一起。

可以看到,

  • 目前是以Transport為中心,把 Channel代表的真實 redis 與 Hub其中的poll聯系起來,但是具體如何使用則尚未得知。
  • 使用者是通過Connection來作為API入口,connection可以得到Transport。

既然基本架構已經搭好,是以從下文開始,我們看看 Consumer 是如何運作的。

0xFF 參考

celery 7 優秀開源項目kombu源碼分析之registry和entrypoint

(二)放棄pika,選擇kombu

kombu消息架構

AMQP中的概念

AMQP的基本概念

深入了解AMQP協定

繼續閱讀