本系列我們介紹消息隊列 Kombu。Kombu 的定位是一個相容 AMQP 協定的消息隊列抽象。通過本文,大家可以了解 Kombu 是如何啟動,以及如何搭建一個基本的架子。
[源碼分析] 消息隊列 Kombu 之 啟動過程
0x00 摘要
因為之前有一個綜述,是以大家會發現,一些概念講解文字會同時出現在後續文章和綜述之中。
0x01 示例
下面使用如下代碼來進行說明。
本示例來自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感謝。
def main(arguments):
hub = Hub()
exchange = Exchange('asynt_exchange')
queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
print('message sent')
def on_message(message):
print('received: {0!r}'.format(message.body))
message.ack()
# hub.stop() # <-- exit after one message
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
def p_message():
print(' kombu ')
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(3, p_message)
hub.run_forever()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
0x02 啟動
讓我們順着程式流程看看Kombu都做了些什麼,也可以對 Kombu 内部有所了解。
本文關注的重點是:Connection,Channel 和 Hub 是如何聯系在一起的。
2.1 Hub
在程式開始,我們建立了Hub。
Hub的作用是建立消息Loop,但是此時尚未建立,是以隻是一個靜态執行個體。
hub = Hub()
其定義如下:
class Hub:
"""Event loop object.
Arguments:
timer (kombu.asynchronous.Timer): Specify custom timer instance.
"""
def __init__(self, timer=None):
self.timer = timer if timer is not None else Timer()
self.readers = {}
self.writers = {}
self.on_tick = set()
self.on_close = set()
self._ready = set()
self._running = False
self._loop = None
self.consolidate = set()
self.consolidate_callback = None
self.propagate_errors = ()
self._create_poller()
因為此時沒有建立loop,是以目前重要的步驟是建立Poll,其Stack如下:
_get_poller, eventio.py:321
poll, eventio.py:328
_create_poller, hub.py:113
__init__, hub.py:96
main, testUb.py:22, testUb.py:55
在eventio.py中有如下,我們可以看到Kombu可以使用多種模型來進行核心消息處理:
def _get_poller():
if detect_environment() != 'default':
# greenlet
return _select
elif epoll:
# Py2.6+ Linux
return _epoll
elif kqueue and 'netbsd' in sys.platform:
return _kqueue
elif xpoll:
return _poll
else:
return _select
因為本機情況,這裡選擇的是:_poll。
+------------------+
| Hub |
| |
| | +-------------+
| poller +---------------> | _poll |
| | | | +-------+
| | | _poller+---------> | poll |
+------------------+ | | +-------+
+-------------+
2.2 Exchange與Queue
其次建立了Exchange與Queue。
- Exchange:交換機,消息發送者将消息發至 Exchange,Exchange 負責将消息分發至 Queue;
- Queue:消息隊列,存儲着即将被應用消費掉的消息,Exchange 負責将消息分發 Queue,消費者從 Queue 接收消息;
因為此時也沒有具體消息,是以我們暫且無法探究Exchange機制。
exchange = Exchange('asynt')
queue = Queue('asynt', exchange, 'asynt')
此時将把Exchange與Queue聯系起來。圖示如下:
+------------------+
| Hub |
| |
| | +-------------+
| poller +---------------> | _poll |
| | | | +-------+
| | | _poller+---------> | poll |
+------------------+ | | +-------+
+-------------+
+----------------+ +-------------------+
| Exchange | | Queue |
| | | |
| | | |
| channel | <------------+ exchange |
| | | |
| | | |
+----------------+ +-------------------+
2.3 Connection
第三步就是建立Connection。
Connection是對 MQ 連接配接的抽象,一個 Connection 就對應一個 MQ 的連接配接。現在就是對'redis://localhost:6379'連接配接進行抽象。
conn = Connection('redis://localhost:6379')
2.3.1 定義
由定義注釋可知,Connection是到broker的連接配接。從具體代碼可以看出,Connection更接近是一個邏輯概念,具體功能都委托給别人完成。
消息從來不直接發送給隊列,甚至 Producers 都可能不知道隊列的存在。 Producer如何才能将消息發送給Consumer呢?這中間需要經過 Message Broker 的處理和傳遞。
AMQP中,承擔 Message Broker 功能的就是 AMQP Server。也正是從這個角度講,AMQP 的 Producer 和Consumer 都是 AMQP Client。
在Kombu 體系中,用 transport 對所有的 broker 進行了抽象,為不同的 broker 提供了一緻的解決方案。通過Kombu,開發者可以根據實際需求靈活的選擇或更換broker。
Connection主要成員變量是,但是此時沒有指派:
- _connection:
- _transport:就是上面提到的對 broker 的抽象。
- cycle:與broker互動的排程政策。
- failover_strategy:在連接配接失效時,選取其他hosts的政策。
- heartbeat:用來實施心跳。
代碼如下:
class Connection:
"""A connection to the broker"""
port = None
virtual_host = '/'
connect_timeout = 5
_connection = None
_default_channel = None
_transport = None
uri_prefix = None
#: The cache of declared entities is per connection,
#: in case the server loses data.
declared_entities = None
#: Iterator returning the next broker URL to try in the event
#: of connection failure (initialized by :attr:`failover_strategy`).
cycle = None
#: Additional transport specific options,
#: passed on to the transport instance.
transport_options = None
#: Strategy used to select new hosts when reconnecting after connection
#: failure. One of "round-robin", "shuffle" or any custom iterator
#: constantly yielding new URLs to try.
failover_strategy = 'round-robin'
#: Heartbeat value, currently only supported by the py-amqp transport.
heartbeat = None
resolve_aliases = resolve_aliases
failover_strategies = failover_strategies
hostname = userid = password = ssl = login_method = None
2.3.2 init 與 transport
Connection内部主要任務是建立了transport。
Stack大緻如下:
Transport, redis.py:1039, redis.py:1031
import_module, __init__.py:126
symbol_by_name, imports.py:56
resolve_transport, __init__.py:70
get_transport_cls, __init__.py:85
__init__, connection.py:183
main, testUb.py:40, testUb.py:55
2.4 Transport
在Kombu體系中,用transport對所有的broker進行了抽象,為不同的broker提供了一緻的解決方案。通過Kombu,開發者可以根據實際需求靈活的選擇或更換broker。
Transport:真實的 MQ 連接配接,也是真正連接配接到 MQ(redis/rabbitmq) 的執行個體。就是存儲和發送消息的實體,用來區分底層消息隊列是用amqp、Redis還是其它實作的。
Transport負責具體操作,但是很多操作移交給 loop 與 MultiChannelPoller 進行。
2.4.1 定義
其主要成員變量為:
- 本transport的驅動類型,名字;
- 對應的 Channel;
- cycle:MultiChannelPoller,具體下文提到;
定義如下:
class Transport(virtual.Transport):
"""Redis Transport."""
Channel = Channel
polling_interval = None # disable sleep between unsuccessful polls.
default_port = DEFAULT_PORT
driver_type = 'redis'
driver_name = 'redis'
implements = virtual.Transport.implements.extend(
asynchronous=True,
exchange_type=frozenset(['direct', 'topic', 'fanout'])
)
def __init__(self, *args, **kwargs):
if redis is None:
raise ImportError('Missing redis library (pip install redis)')
super().__init__(*args, **kwargs)
# Get redis-py exceptions.
self.connection_errors, self.channel_errors = self._get_errors()
# All channels share the same poller.
self.cycle = MultiChannelPoller()
2.4.2 移交操作
Transport負責具體操作,但是很多操作移交給 loop 與 MultiChannelPoller 進行,具體從下面代碼可見。
def register_with_event_loop(self, connection, loop):
cycle = self.cycle
cycle.on_poll_init(loop.poller)
cycle_poll_start = cycle.on_poll_start
add_reader = loop.add_reader
on_readable = self.on_readable
def _on_disconnect(connection):
if connection._sock:
loop.remove(connection._sock)
cycle._on_connection_disconnect = _on_disconnect
def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
loop.on_tick.add(on_poll_start)
loop.call_repeatedly(10, cycle.maybe_restore_messages)
health_check_interval = connection.client.transport_options.get(
'health_check_interval',
DEFAULT_HEALTH_CHECK_INTERVAL
)
loop.call_repeatedly(
health_check_interval,
cycle.maybe_check_subclient_health
)
其中重點是MultiChannelPoller。一個Connection有一個Transport, 一個Transport有一個MultiChannelPoller,對poll操作都是由MultiChannelPoller完成,redis操作由channel完成。
2.4.3 MultiChannelPoller
定義如下,可以了解為執行engine,主要作用是:
- 收集channel;
- 建立fd到channel的映射;
- 建立channel到socks的映射;
- 使用poll;
class MultiChannelPoller:
"""Async I/O poller for Redis transport."""
eventflags = READ | ERR
def __init__(self):
# active channels
self._channels = set()
# file descriptor -> channel map.
self._fd_to_chan = {}
# channel -> socket map
self._chan_to_sock = {}
# poll implementation (epoll/kqueue/select)
self.poller = poll()
# one-shot callbacks called after reading from socket.
self.after_read = set()
2.4.4 擷取
Transport是預先生成的,若需要,則依據名字取得。
TRANSPORT_ALIASES = {
'amqp': 'kombu.transport.pyamqp:Transport',
'amqps': 'kombu.transport.pyamqp:SSLTransport',
'pyamqp': 'kombu.transport.pyamqp:Transport',
'librabbitmq': 'kombu.transport.librabbitmq:Transport',
'memory': 'kombu.transport.memory:Transport',
'redis': 'kombu.transport.redis:Transport',
......
'pyro': 'kombu.transport.pyro:Transport'
}
_transport_cache = {}
def resolve_transport(transport=None):
"""Get transport by name. """
if isinstance(transport, str):
try:
transport = TRANSPORT_ALIASES[transport]
except KeyError:
if '.' not in transport and ':' not in transport:
from kombu.utils.text import fmatch_best
alt = fmatch_best(transport, TRANSPORT_ALIASES)
else:
if callable(transport):
transport = transport()
return symbol_by_name(transport)
return transport
def get_transport_cls(transport=None):
"""Get transport class by name.
"""
if transport not in _transport_cache:
_transport_cache[transport] = resolve_transport(transport)
return _transport_cache[transport]
此時Connection資料如下,注意其部分成員變量尚且沒有意義:
conn = {Connection}alt = {list: 0} []
connect_timeout = {int} 5
connection = {Transport}cycle = {NoneType} None
declared_entities = {set: 0} set()
default_channel = {Channel}failover_strategies = {dict: 2} {'round-robin':, 'shuffle':}
failover_strategy = {type}heartbeat = {int} 0
host = {str} 'localhost:6379'
hostname = {str} 'localhost'
manager = {Management}port = {int} 6379
recoverable_channel_errors = {tuple: 0} ()
resolve_aliases = {dict: 2} {'pyamqp': 'amqp', 'librabbitmq': 'amqp'}
transport = {Transport}transport_cls = {str} 'redis'
uri_prefix = {NoneType} None
userid = {NoneType} None
virtual_host = {str} '/'
至此,Kombu的基本就建立完成,但是彼此之間沒有建立邏輯聯系。
是以此時示例如下,注意此時三者沒有聯系:
+-------------------+ +---------------------+ +--------------------+
| Connection | | redis.Transport | | MultiChannelPoller |
| | | | | |
| | | | | _channels |
| | | cycle +------------> | _fd_to_chan |
| transport +---------> | | | _chan_to_sock |
| | | | | poller |
+-------------------+ +---------------------+ | after_read |
| |
+--------------------+
+------------------+
| Hub |
| |
| | +-------------+
| poller +---------------> | _poll |
| | | | +-------+
| | | _poller+---------> | poll |
+------------------+ | | +-------+
+-------------+
+----------------+ +-------------------+
| Exchange | | Queue |
| | | |
| | | |
| channel | <------------+ exchange |
| | | |
| | | |
+----------------+ +-------------------+
0x03 Connection注冊hub
之前我們提到,基本架子已經建立起來,但是各個子產品之間彼此沒有聯系,下面我們就看看如何建立聯系。
示例代碼來到:
conn.register_with_event_loop(hub)
這裡進行了注冊,此時作用是把hub與Connection聯系起來。随之調用到:
def register_with_event_loop(self, loop):
self.transport.register_with_event_loop(self.connection, loop)
進而調用到transport類:
具體代碼如下:
def register_with_event_loop(self, connection, loop):
cycle = self.cycle
cycle.on_poll_init(loop.poller)# 這裡建立聯系,loop就是hub
cycle_poll_start = cycle.on_poll_start
add_reader = loop.add_reader
on_readable = self.on_readable
def _on_disconnect(connection):
if connection._sock:
loop.remove(connection._sock)
cycle._on_connection_disconnect = _on_disconnect
def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
loop.on_tick.add(on_poll_start)
loop.call_repeatedly(10, cycle.maybe_restore_messages)
health_check_interval = connection.client.transport_options.get(
'health_check_interval',
DEFAULT_HEALTH_CHECK_INTERVAL
)
loop.call_repeatedly(
health_check_interval,
cycle.maybe_check_subclient_health
)
3.1 建立Channel
注冊最初是建立Channel。這裡有一個連接配接的動作,就是在這裡,建立了Channel。
@property
def connection(self):
"""The underlying connection object"""
if not self._closed:
if not self.connected:
return self._ensure_connection(
max_retries=1, reraise_as_library_errors=False
)
return self._connection
具體建立是在 base.py 中完成,這是 Transport 基類。Stack 如下:
create_channel, base.py:920
establish_connection, base.py:938
_establish_connection, connection.py:801
_connection_factory, connection.py:866
retry_over_time, functional.py:325
_ensure_connection, connection.py:439
connection, connection.py:859
register_with_event_loop, connection.py:266
main, testUb.py:41, testUb.py:55
3.2 Channel
Channel:與AMQP中概念類似,可以了解成共享一個Connection的多個輕量化連接配接。就是真正的連接配接。
可以認為是 redis 操作和連接配接的封裝。每個 Channel 都可以與 redis 建立一個連接配接,在此連接配接之上對 redis 進行操作,每個連接配接都有一個 socket,每個 socket 都有一個 file,從這個 file 可以進行 poll。
為了更好的說明,我們提前給出這個通訊流程大約如下:
+---------------------------------------------------------------------------------------------------------------------------------------+
| +--------------+ 6 parse_response |
| +--> | Linux Kernel | +---+ |
| | +--------------+ | |
| | | |
| | | event |
| | 1 | |
| | | 2 |
| | | |
+-------+---+ socket + | |
| redis | port +--> fd +--->+ v |
| | | +------+--------+ |
| | socket | | Hub | |
| | port +--> fd +--->----------> | | |
| port=6379 | | | | |
| | socket | | readers +-----> Transport.on_readable |
| | port +--> fd +--->+ | | + |
+-----------+ +---------------+ | |
| |
3 | |
+----------------------------------------------------------------------------------------+ |
| v
| _receive_callback
| 5 +-------------+ +-----------+
+------------+------+ +-------------------------+ 'BRPOP' = Channel._brpop_read +-----> | Channel | +------------------> | Consumer |
| Transport | | MultiChannelPoller | +------> channel . handlers 'LISTEN' = Channel._receive +-------------+ +---+-------+
| | | | | 8 |
| | on_readable(fileno) | | | ^ |
| cycle +---------------------> | _fd_to_chan +----------------> channel . handlers 'BRPOP' = Channel._brpop_read | |
| | 4 | | | 'LISTEN' = Channel._receive | |
| _callbacks[queue]| | | | | on_m | 9
| + | +-------------------------+ +------> channel . handlers 'BRPOP' = Channel._brpop_read | |
+-------------------+ 'LISTEN' = Channel._receive | |
| | v
| 7 _callback |
+-----------------------------------------------------------------------------------------------------------------------------------------+ User Function
手機上如下:
3.2.1 定義
Channel 主要成員是:
- async_pool :redis異步連接配接池;
- pool :redis連接配接池;
- channel_id :Channel ID;
- client :就是StrictRedis之類的driver;
- connection :對應的Transport;
- cycle = {FairCycle}
- queue_order_strategy :擷取queue的政策;
- state :BrokerState狀态;
-
subclient :PubSub所用的client;
keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX) :bing用到的key;
比如_get_client可以看出來client。
def _get_client(self):
if redis.VERSION < (3, 2, 0):
raise VersionMismatch(
'Redis transport requires redis-py versions 3.2.0 or later. '
'You have {0.__version__}'.format(redis))
return redis.StrictRedis
簡化版定義如下:
class Channel(virtual.Channel):
"""Redis Channel."""
QoS = QoS
_client = None
_subclient = None
keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX)
keyprefix_fanout = '/{db}.'
sep = '\x06\x16'
_fanout_queues = {}
unacked_key = '{p}unacked'.format(p=KEY_PREFIX)
unacked_index_key = '{p}unacked_index'.format(p=KEY_PREFIX)
unacked_mutex_key = '{p}unacked_mutex'.format(p=KEY_PREFIX)
unacked_mutex_expire = 300 # 5 minutes
unacked_restore_limit = None
visibility_timeout = 3600 # 1 hour
max_connections = 10
queue_order_strategy = 'round_robin'
_async_pool = None
_pool = None
from_transport_options = (
virtual.Channel.from_transport_options +
('sep',
'ack_emulation',
'unacked_key',
......
'max_connections',
'health_check_interval',
'retry_on_timeout',
'priority_steps') # <-- do not add comma here!
)
connection_class = redis.Connection if redis else None
3.2.2 基類
基類定義如下:
class Channel(AbstractChannel, base.StdChannel):
"""Virtual channel.
Arguments:
connection (ConnectionT): The transport instance this
channel is part of.
"""
#: message class used.
Message = Message
#: QoS class used.
QoS = QoS
#: flag to restore unacked messages when channel
#: goes out of scope.
do_restore = True
#: mapping of exchange types and corresponding classes.
exchange_types = dict(STANDARD_EXCHANGE_TYPES)
#: flag set if the channel supports fanout exchanges.
supports_fanout = False
#: Binary ASCII codecs.
codecs = {'base64': Base64()}
#: Default body encoding.
#: NOTE: ``transport_options['body_encoding']`` will override this value.
body_encoding = 'base64'
#: counter used to generate delivery tags for this channel.
_delivery_tags = count(1)
#: Optional queue where messages with no route is delivered.
#: Set by ``transport_options['deadletter_queue']``.
deadletter_queue = None
# List of options to transfer from :attr:`transport_options`.
from_transport_options = ('body_encoding', 'deadletter_queue')
# Priority defaults
default_priority = 0
min_priority = 0
max_priority = 9
最終具體舉例如下:
self = {Channel}Client = {type}Message = {type}QoS = {type}active_fanout_queues = {set: 0} set()
active_queues = {set: 0} set()
async_pool = {ConnectionPool} ConnectionPool<Connection>
auto_delete_queues = {set: 0} set()
channel_id = {int} 1
client = {Redis} Redis<ConnectionPool<Connection>>
codecs = {dict: 1} {'base64':}
connection = {Transport}connection_class = {type}cycle = {FairCycle}deadletter_queue = {NoneType} None
exchange_types = {dict: 3} {'direct':, 'topic':,
handlers = {dict: 2} {'BRPOP':<bound method Channel._brpop_read of >, 'LISTEN':<bound method Channel._receive of >}
pool = {ConnectionPool} ConnectionPool<Connection>
qos = {QoS}queue_order_strategy = {str} 'round_robin'
state = {BrokerState}subclient = {PubSub}
3.2.3 redis消息回調函數
關于上面成員變量,這裡需要說明的是
handlers = {dict: 2}
{
'BRPOP':<bound method Channel._brpop_read of >,
'LISTEN':<bound method Channel._receive of >
}
這是redis有消息時的回調函數,即:
- BPROP 有消息時候,調用 Channel._brpop_read;
- LISTEN 有消息時候,調用 Channel._receive;
3.2.4 Redis 直接相關的主要成員
與Redis 直接相關的成員定義在:redis/client.py。
與 Redis 直接相關的主要成員是如下,會利用如下變量進行具體 redis操作:
分别對應如下類型:
channel = {Channel}Client = {type}async_pool = {ConnectionPool} ConnectionPool<Connection>
client = {Redis} Redis<ConnectionPool<Connection>>
connection = {Transport}connection_class = {type}connection_class_ssl = {type}pool = {ConnectionPool} ConnectionPool<Connection>
subclient = {PubSub}
def _create_client(self, asynchronous=False):
if asynchronous:
return self.Client(connection_pool=self.async_pool)
return self.Client(connection_pool=self.pool)
def _get_pool(self, asynchronous=False):
params = self._connparams(asynchronous=asynchronous)
self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])
return redis.ConnectionPool(**params)
def _get_client(self):
if redis.VERSION < (3, 2, 0):
raise VersionMismatch(
'Redis transport requires redis-py versions 3.2.0 or later. '
'You have {0.__version__}'.format(redis))
return redis.StrictRedis
@property
def pool(self):
if self._pool is None:
self._pool = self._get_pool()
return self._pool
@property
def async_pool(self):
if self._async_pool is None:
self._async_pool = self._get_pool(asynchronous=True)
return self._async_pool
@cached_property
def client(self):
"""Client used to publish messages, BRPOP etc."""
return self._create_client(asynchronous=True)
@cached_property
def subclient(self):
"""Pub/Sub connection used to consume fanout queues."""
client = self._create_client(asynchronous=True)
return client.pubsub()
因為添加了Channel,是以此時如下:
+-----------------+
| Channel |
| | +-----------------------------------------------------------+
| client +---------> | Redis<ConnectionPool<Connection|
| | +-----------------------------------------------------------+
| |
| | +---------------------------------------------------+-+
| pool +----------> |ConnectionPool<Connection|
| | +---------------------------------------------------+-+
| |
| |
| |
| connection |
| |
+-----------------+
+-------------------+ +---------------------+ +--------------------+
| Connection | | redis.Transport | | MultiChannelPoller |
| | | | | |
| | | | | _channels |
| | | cycle +------------> | _fd_to_chan |
| transport +---------> | | | _chan_to_sock |
| | | | | poller |
+-------------------+ +---------------------+ | after_read |
| |
+------------------+ +--------------------+
| Hub |
| |
| | +-------------+
| poller +---------------> | _poll |
| | | | +-------+
| | | _poller+---------> | poll |
+------------------+ | | +-------+
+-------------+
+----------------+ +-------------------+
| Exchange | | Queue |
| | | |
| | | |
| channel | <------------+ exchange |
| | | |
| | | |
+----------------+ +-------------------+
3.3 channel 與 Connection 聯系
講到這裡,基本道理大家都懂,但是具體兩者之間如何聯系,我們需要再剖析下。
3.3.1 從Connection得到channel
在Connection定義中有如下,原來 Connection 是通過 transport 來得到 channel:
def channel(self):
"""Create and return a new channel."""
self._debug('create channel')
chan = self.transport.create_channel(self.connection)
return chan
3.3.2 Transport具體建立
在Transport之中有:
def create_channel(self, connection):
try:
return self._avail_channels.pop()
except IndexError:
channel = self.Channel(connection)
self.channels.append(channel)
return channel
原來在 Transport 有兩個channels 清單:
self._avail_channels
self.channels
如果_avail_channels 有内容則直接擷取,否則生成一個新的Channel。
在真正連接配接時候,會調用 establish_connection 放入self._avail_channels。
def establish_connection(self):
# creates channel to verify connection.
# this channel is then used as the next requested channel.
# (returned by ``create_channel``).
self._avail_channels.append(self.create_channel(self))
return self # for drain events
其堆棧如下:
__init__, redis.py:557
create_channel, base.py:921
establish_connection, base.py:939
_establish_connection, connection.py:801
_connection_factory, connection.py:866
retry_over_time, functional.py:313
_ensure_connection, connection.py:439
connection, connection.py:859
channel, connection.py:283, node.py:11
3.3.3 建立聯系
在init中有:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.ack_emulation: # disable visibility timeout
self.QoS = virtual.QoS
self._queue_cycle = cycle_by_name(self.queue_order_strategy)()
self.Client = self._get_client()
self.ResponseError = self._get_response_error()
self.active_fanout_queues = set()
self.auto_delete_queues = set()
self._fanout_to_queue = {}
self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}
......
self.connection.cycle.add(self) # add to channel poller.
if register_after_fork is not None:
register_after_fork(self, _after_fork_cleanup_channel)
重點是:
self.connection.cycle.add(self) # add to channel poller.
這就是把 Channel與Transport 中的 poller 聯系起來,這樣Transport可以利用Channel去與真實的redis進行互動。
堆棧如下:
add, redis.py:277
__init__, redis.py:531
create_channel, base.py:920
establish_connection, base.py:938
_establish_connection, connection.py:801
_connection_factory, connection.py:866
retry_over_time, functional.py:325
_ensure_connection, connection.py:439
connection, connection.py:859
register_with_event_loop, connection.py:266
main, testUb.py:41
因為已經聯系起來,是以此時如下:
+-----------------+
| Channel |
| | +-----------------------------------------------------------+
| client +---------> | Redis<ConnectionPool<Connection|
| | +-----------------------------------------------------------+
| |
| | +---------------------------------------------------+-+
| pool +----------> |ConnectionPool<Connection|
| | +---------------------------------------------------+-+
| |
| | | _fd_to_chan |
| transport +---------> | | | _chan_to_sock |
| | | | | poller |
+-------------------+ +---------------------+ | after_read |
| |
+------------------+ +--------------------+
| Hub |
| |
| | +-------------+
| poller +---------------> | _poll |
| | | | +-------+
| | | _poller+---------> | poll |
+------------------+ | | +-------+
+-------------+
+----------------+ +-------------------+
| Exchange | | Queue |
| | | |
| | | |
| channel | <------------+ exchange |
| | | |
| | | |
+----------------+ +-------------------+
3.3 Transport 與 Hub 聯系
on_poll_init 這裡就是把 kombu.transport.redis.Transport 與 Hub 聯系起來。
用
self.poller = poller
把Transport與Hub的poll聯系起來。這樣 Transport 就可以利用 poll。
def on_poll_init(self, poller):
self.poller = poller
for channel in self._channels:
return channel.qos.restore_visible(
num=channel.unacked_restore_limit,
)
此時變量如下:
poller = {_poll}self = {MultiChannelPoller}after_read = {set: 0} set()
eventflags = {int} 25
fds = {dict: 0} {}
poller = {_poll}
是以,我們最終如下:
+-----------------+
| Channel |
| | +-----------------------------------------------------------+
| client +---------> | Redis<ConnectionPool<Connection|
| | +-----------------------------------------------------------+
| |
| | +---------------------------------------------------+-+
| pool +----------> |ConnectionPool<Connection|
| | +---------------------------------------------------+-+
| |
| | | _fd_to_chan |
| transport +---------> | | | _chan_to_sock |
| | | | + | _poll |
| | | | +-------+
| | | _poller+---------> | poll |
+------------------+ | | +-------+
+-------------+
+----------------+ +-------------------+
| Exchange | | Queue |
| | | |
| | | |
| channel | <------------+ exchange |
| | | |
| | | |
+----------------+ +-------------------+
0x04 總結
具體如圖,可以看出來,上面三個基本子產品已經聯系到了一起。
可以看到,
- 目前是以Transport為中心,把 Channel代表的真實 redis 與 Hub其中的poll聯系起來,但是具體如何使用則尚未得知。
- 使用者是通過Connection來作為API入口,connection可以得到Transport。
既然基本架構已經搭好,是以從下文開始,我們看看 Consumer 是如何運作的。
0xFF 參考
celery 7 優秀開源項目kombu源碼分析之registry和entrypoint
(二)放棄pika,選擇kombu
kombu消息架構
AMQP中的概念
AMQP的基本概念
深入了解AMQP協定