天天看点

GCP: Pub/Sub的使用

1、简介

Pub/Sub 是一种全托管式实时消息传递服务,可让您在独立的应用之间发送和接收消息,它是一个PAAS服务。

2、概览

  • 主题(Topic):相当于一个消息的中转站,发布者发布消息后,消息存储在主题中。
  • 发布者(Publisher):发布消息的应用
  • 订阅者(Subscriber):接收消息者

如下图,首先对主题创建了两个订阅者(subscriber1, subscriber2),发布者(publisher)向主题中发布一条消息(Hello,World!), 接着,这两个订阅者都收到了此消息(Hello,World!)。

GCP: Pub/Sub的使用

3、订阅消息

(1)订阅是如何工作的

  • 多个订阅者订阅一个主题的情况,如果在一个主题中发布一条消息,那所有的订阅者都会收到此消息。如果想要多个订阅者分工处理不同的内容,可以在消息中加自定义特性 (Attribute),在订阅者逻辑中可以根据此特性只处理当前订阅者感兴趣的消息。
  • 只有经过确认过的消息`message.ack()`,才不会再被传送,如果您在确认时限之前未确认消息,Pub/Sub 会重新发送该消息。因此,Pub/Sub 可能会发送重复的消息,如果订阅者处理消息发生异常,且消息未被确认,那Pub/Sub 会重新发送该消息。
  • 确认过的消息,会在Pub/Sub被删除。
  • 在给定订阅者创建之前发布的消息通常不会针对此订阅进行传送。因此,如果某主题没有订阅,则发布到该主题的消息将不会传送给任何订阅者。

(2)至少传送一次

通常,Pub/Sub 会按照消息发布的顺序将每条消息传送一次,但有时可能并不按顺序传送消息,或者会将消息传送多次。 一般来说,如果要实施多次传送,订阅者需要在处理消息时遵循幂等原则。您可以使用 Cloud Dataflow PubsubIO 将 Pub/Sub 消息流只处理一次。PubsubIO 会根据自定义消息标识符或由 Pub/Sub 分配的消息标识符来删除重复的消息。所以,处理消息的逻辑必须是幂等的(所谓幂等,通俗点说,就是函数执行一次,和执行数次,产生的结/效果是一样的)。

(3)对消息排序

通常情况下,pub/sub不完全像队列一样严格地保证消息先进先出,因为保证消息顺序会对吞吐量产生严重限制,pub/sub仅保证第一次传送消息时是按顺序进行的,后序的消息不一定是按顺序排列的,所有消息都允许随时尝试重新传送,这样允许一次向订阅者发送多条消息。

如果想使消息有顺序的话,可以在自定义特性 (Attribute)中加时间戳或序列号。如果主题有10条这样的消息0,1,2,3,4,5 那收到消息顺序,下面几种情况都可能会发生。

  • 0,1,5,4,2,3  # 第一条消息始终是0,顺序不能保证
  • 0,1,2,4,2,3,0  # 0 也可能被发送了2次
  • 0,1,1,2,4,3,5 ,6  # 后序的消息可能被发送多次

4、订阅者接收消息的两种方式

订阅者可以有两种方式拿到消息,一是设置public endpoint,让pub/sub被动地推送消息给你,二是主要向pub/sub发送拉取(pull)请求。

4.1 推送传送

(1)关于推送

Pub/Sub 将根据收到成功响应的速率来动态调整推送请求的速率, 推送订阅受一组配额和资源限制的约束,系统会自动调整推送传送的速率,以最大限度地提高传送速率,同时不会使推送端点过载,它是通过一套算法来控制的。

(2)Cloud Function和App Engine通常都是使用推送传送的,也是遵循至少传送一次的原则。

(3)对于推送订阅,Pub/Sub 不会发送否定确认(有时称为 NACK)。如果 Webhook 未返回成功代码,则 Pub/Sub 会重试传送,直到消息在订阅的消息保留期限过后失效为止。

(4)推送不能一次处理多条消息,没法使用批处理,但拉取和发布消息可以。

4.2 拉取传送

(1)异步拉取(用得最多,实时处理消息那种)

可以在应用中使用长时间运行的消息侦听器接收消息,并且一次确认一条消息,不建议使用cron job,效率不高。

使用异步拉取不需要应用阻止新消息,从而在应用中实现更高的吞吐量。

如果订阅者客户端处理和确认消息的速度可能比 Pub/Sub 将消息发送到客户端的速度要慢,可考虑使用订阅者的流控制功能来控制订阅者接收消息的速率。https://cloud.google.com/pubsub/docs/pull

(2)同步拉取

在某些情况下,异步拉取并不非常适合您的应用。例如,应用逻辑可能依赖轮询模式来检索消息,或者需要对客户端在任何给定时间检索的消息数量进行精确限制。为了支持此类应用,该服务支持同步拉取方法,用于拉取和确认固定数量的消息,但这会带来一些消息传送的延迟。

如下是例子的代码

from google.cloud import pubsub_v1


def sub_data():
    project_id = 'cong-proj'
    # topic_name = 'hello_topic'
    subscription_name = 'sub_one'
    subscription1 = pubsub_v1.SubscriberClient()
    # topic_path = subscription1.topic_path(project_id, topic_name)
    subscription_path = subscription1.subscription_path(project_id, subscription_name)

    def callback(message):
        print("Received message: {}".format(message))
        message.ack()

    streaming_pull_future = subscription1.subscribe(subscription_path, callback=callback)
    print("Listening for messages on {}..\n".format(subscription_path))
    try:
        streaming_pull_future.result()
    except:  # noqa
        streaming_pull_future.cancel()           

5、发布消息

5.1 批处理以平衡延迟和吞吐量

消息可以根据请求大小(以字节为单位)、消息数量和时间分批。

batch_settings = pubsub_v1.types.BatchSettings(

    max_bytes=1024, max_latency=1  # One kilobyte  # One second

)

publisher = pubsub_v1.PublisherClient(batch_settings)           

5.2 重试请求

如果发布失败,系统会自动重试,但无法保证能够重试的错误除外。

publisher = pubsub_v1.PublisherClient(client_config=retry_settings)           

如下是例子的代码

def publish_data():
    project_id = 'cong-proj'
    topic_name = 'hello_topic'

    # batch_settings = pubsub_v1.types.BatchSettings(max_messages=10, max_latency=30)

    publisher = pubsub_v1.PublisherClient() # batch_settings 
    # publisher.from_service_account_file("""C:\CongStudy\pubsub-demo\cong-pubsub.json""")
    topic_path = publisher.topic_path(project_id, topic_name)

    data = "Message number 1".encode('utf-8')
    future = publisher.publish(topic_path, data)
    print(future.result())



 def get_message(message):
    # 消息实际上是以Base64的字符串的形式存于主题中,如要在订阅器中使用消息,可以使用如下代码
    pubsub_message = base64.b64decode(message).decode('utf-8')


    # hell,world base64加密后为 aGVsbCx3b3JsZA==, 它就会存于主题中
    # print(base64.b64encode('hell,world'.encode('utf-8')).decode('utf-8')) # aGVsbCx3b3JsZA==

    # aGVsbCx3b3JsZA == 解密后为 hell,world
    # print(base64.b64decode('aGVsbCx3b3JsZA==').decode('utf-8')) # hell,world
    


           

6、重放和完全清除消息

  • 还原至某一时间戳
  • 还原至快照

7、参考链接

  • https://cloud.google.com/pubsub/docs/quickstarts
  • https://blog.gcp.expert/google-cloud-pub-sub-aws-sqs-comparison/
gcp

继续阅读