1、简介
Pub/Sub 是一种全托管式实时消息传递服务,可让您在独立的应用之间发送和接收消息,它是一个PAAS服务。
2、概览
- 主题(Topic):相当于一个消息的中转站,发布者发布消息后,消息存储在主题中。
- 发布者(Publisher):发布消息的应用
- 订阅者(Subscriber):接收消息者
如下图,首先对主题创建了两个订阅者(subscriber1, subscriber2),发布者(publisher)向主题中发布一条消息(Hello,World!), 接着,这两个订阅者都收到了此消息(Hello,World!)。
3、订阅消息
(1)订阅是如何工作的
- 多个订阅者订阅一个主题的情况,如果在一个主题中发布一条消息,那所有的订阅者都会收到此消息。如果想要多个订阅者分工处理不同的内容,可以在消息中加自定义特性 (Attribute),在订阅者逻辑中可以根据此特性只处理当前订阅者感兴趣的消息。
- 只有经过确认过的消息`message.ack()`,才不会再被传送,如果您在确认时限之前未确认消息,Pub/Sub 会重新发送该消息。因此,Pub/Sub 可能会发送重复的消息,如果订阅者处理消息发生异常,且消息未被确认,那Pub/Sub 会重新发送该消息。
- 确认过的消息,会在Pub/Sub被删除。
- 在给定订阅者创建之前发布的消息通常不会针对此订阅进行传送。因此,如果某主题没有订阅,则发布到该主题的消息将不会传送给任何订阅者。
(2)至少传送一次
通常,Pub/Sub 会按照消息发布的顺序将每条消息传送一次,但有时可能并不按顺序传送消息,或者会将消息传送多次。 一般来说,如果要实施多次传送,订阅者需要在处理消息时遵循幂等原则。您可以使用 Cloud Dataflow PubsubIO 将 Pub/Sub 消息流只处理一次。PubsubIO 会根据自定义消息标识符或由 Pub/Sub 分配的消息标识符来删除重复的消息。所以,处理消息的逻辑必须是幂等的(所谓幂等,通俗点说,就是函数执行一次,和执行数次,产生的结/效果是一样的)。
(3)对消息排序
通常情况下,pub/sub不完全像队列一样严格地保证消息先进先出,因为保证消息顺序会对吞吐量产生严重限制,pub/sub仅保证第一次传送消息时是按顺序进行的,后序的消息不一定是按顺序排列的,所有消息都允许随时尝试重新传送,这样允许一次向订阅者发送多条消息。
如果想使消息有顺序的话,可以在自定义特性 (Attribute)中加时间戳或序列号。如果主题有10条这样的消息0,1,2,3,4,5 那收到消息顺序,下面几种情况都可能会发生。
- 0,1,5,4,2,3 # 第一条消息始终是0,顺序不能保证
- 0,1,2,4,2,3,0 # 0 也可能被发送了2次
- 0,1,1,2,4,3,5 ,6 # 后序的消息可能被发送多次
4、订阅者接收消息的两种方式
订阅者可以有两种方式拿到消息,一是设置public endpoint,让pub/sub被动地推送消息给你,二是主要向pub/sub发送拉取(pull)请求。
4.1 推送传送
(1)关于推送
Pub/Sub 将根据收到成功响应的速率来动态调整推送请求的速率, 推送订阅受一组配额和资源限制的约束,系统会自动调整推送传送的速率,以最大限度地提高传送速率,同时不会使推送端点过载,它是通过一套算法来控制的。
(2)Cloud Function和App Engine通常都是使用推送传送的,也是遵循至少传送一次的原则。
(3)对于推送订阅,Pub/Sub 不会发送否定确认(有时称为 NACK)。如果 Webhook 未返回成功代码,则 Pub/Sub 会重试传送,直到消息在订阅的消息保留期限过后失效为止。
(4)推送不能一次处理多条消息,没法使用批处理,但拉取和发布消息可以。
4.2 拉取传送
(1)异步拉取(用得最多,实时处理消息那种)
可以在应用中使用长时间运行的消息侦听器接收消息,并且一次确认一条消息,不建议使用cron job,效率不高。
使用异步拉取不需要应用阻止新消息,从而在应用中实现更高的吞吐量。
如果订阅者客户端处理和确认消息的速度可能比 Pub/Sub 将消息发送到客户端的速度要慢,可考虑使用订阅者的流控制功能来控制订阅者接收消息的速率。https://cloud.google.com/pubsub/docs/pull
(2)同步拉取
在某些情况下,异步拉取并不非常适合您的应用。例如,应用逻辑可能依赖轮询模式来检索消息,或者需要对客户端在任何给定时间检索的消息数量进行精确限制。为了支持此类应用,该服务支持同步拉取方法,用于拉取和确认固定数量的消息,但这会带来一些消息传送的延迟。
如下是例子的代码
from google.cloud import pubsub_v1
def sub_data():
project_id = 'cong-proj'
# topic_name = 'hello_topic'
subscription_name = 'sub_one'
subscription1 = pubsub_v1.SubscriberClient()
# topic_path = subscription1.topic_path(project_id, topic_name)
subscription_path = subscription1.subscription_path(project_id, subscription_name)
def callback(message):
print("Received message: {}".format(message))
message.ack()
streaming_pull_future = subscription1.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))
try:
streaming_pull_future.result()
except: # noqa
streaming_pull_future.cancel()
5、发布消息
5.1 批处理以平衡延迟和吞吐量
消息可以根据请求大小(以字节为单位)、消息数量和时间分批。
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, max_latency=1 # One kilobyte # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
5.2 重试请求
如果发布失败,系统会自动重试,但无法保证能够重试的错误除外。
publisher = pubsub_v1.PublisherClient(client_config=retry_settings)
如下是例子的代码
def publish_data():
project_id = 'cong-proj'
topic_name = 'hello_topic'
# batch_settings = pubsub_v1.types.BatchSettings(max_messages=10, max_latency=30)
publisher = pubsub_v1.PublisherClient() # batch_settings
# publisher.from_service_account_file("""C:\CongStudy\pubsub-demo\cong-pubsub.json""")
topic_path = publisher.topic_path(project_id, topic_name)
data = "Message number 1".encode('utf-8')
future = publisher.publish(topic_path, data)
print(future.result())
def get_message(message):
# 消息实际上是以Base64的字符串的形式存于主题中,如要在订阅器中使用消息,可以使用如下代码
pubsub_message = base64.b64decode(message).decode('utf-8')
# hell,world base64加密后为 aGVsbCx3b3JsZA==, 它就会存于主题中
# print(base64.b64encode('hell,world'.encode('utf-8')).decode('utf-8')) # aGVsbCx3b3JsZA==
# aGVsbCx3b3JsZA == 解密后为 hell,world
# print(base64.b64decode('aGVsbCx3b3JsZA==').decode('utf-8')) # hell,world
6、重放和完全清除消息
- 还原至某一时间戳
- 还原至快照
7、参考链接
- https://cloud.google.com/pubsub/docs/quickstarts
- https://blog.gcp.expert/google-cloud-pub-sub-aws-sqs-comparison/