概述
本文主要介紹如何使用Python3.6操作阿裡雲AMQP。阿裡雲的AMQP是完全相容開源社群的AMQP,使用過程中隻需要在建立連接配接階段參考官方示例配置連接配接資訊,之後的使用與開源社群AMQP使用完全一緻,使用的SDK也是開源社群的SDK:pika。
Code Sample
1、計算username、password
# -*- coding: utf-8 -*
import base64
import hashlib
import hmac
from datetime import datetime
class AliyunCredentialsProvider:
"""
Python3.6+适用,根據阿裡雲的 accessKey,accessSecret,UID算出amqp連接配接使用的username和password
UID是資源ownerID,一般是接入點第一段
"""
ACCESS_FROM_USER: int = 0
def __init__(self, access_key: str, access_secret: str, uid: int, security_token: str = None) -> None:
self.accessKey = access_key
self.accessSecret = access_secret
self.UID = uid
self.securityToken = security_token
def get_username(self) -> str:
ak = self.accessKey
ret = base64.b64encode(f'{self.ACCESS_FROM_USER}:{self.UID}:{ak}'.encode())
if self.securityToken:
ret = f'{ret}:{self.securityToken}'
return str(ret, 'UTF-8')
def get_password(self) -> str:
now = datetime.now()
timestamp = int(now.timestamp() * 1000)
key = bytes(str(timestamp), 'UTF-8')
message = bytes(self.accessSecret, 'UTF-8')
digester = hmac.new(key, message, hashlib.sha1)
signature1: str = digester.hexdigest()
signature1 = signature1.upper()
ret = base64.b64encode(f'{signature1}:{timestamp}'.encode())
passoword = str(ret, 'UTF-8')
return passoword
2、擷取認證需要的參數
# -*- coding: utf-8 -*
import pika
from AMQP.AliyunCredentialsProvider3 import AliyunCredentialsProvider
# 接入點
host = "1848217816617278.mq-amqp.cn-hangzhou-a.aliyuncs.com";
# 預設端口
port = 5672;
# 資源隔離
virtualHost = "yutaoamqptest";
# 阿裡雲的accessKey
accessKey = "********";
# 阿裡雲的accessSecret
accessSecret = "********";
# 主賬号id
resourceOwnerId = int(184********17278);
provider = AliyunCredentialsProvider(accessKey, accessSecret, resourceOwnerId)
def getConnectionParam():
credentials = pika.PlainCredentials(provider.get_username(), provider.get_password(), erase_on_connect=True)
return pika.ConnectionParameters(host, port, virtualHost, credentials)
3、發送Code
import pika
from AMQP import connection
connection = pika.BlockingConnection(connection.getConnectionParam()) # 建立連接配接
# Create a new channel with the next available channel number or pass in a channel number to use
channel = connection.channel()
# Declare queue, create if needed. This method creates or checks a queue.
# When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print("[x] Sent 'Hello World!'")
connection.close()
4、接收Code
import pika
from AMQP import connection
connection = pika.BlockingConnection(connection.getConnectionParam()) # 建立連接配接
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
5、項目目錄結構
6、接收測試結果