概述
本文主要介绍如何使用 Python3.6 操作阿里云 AMQP. 阿里云的 AMQP 是完全兼容开源社区的 AMQP, 使用过程中只需要在创建连接阶段参考官方示例配置连接信息, 之后的使用与开源社区 AMQP 使用完全一致, 使用的 SDK 也是开源社区的 SDK:pika.
Code Sample
1, 计算 username,password
- # -*- coding: utf-8 -*
- import base64
- import hashlib
- import hmac
- from datetime import datetime
- class AliyunCredentialsProvider:
- """
- Python3.6 + 适用, 根据阿里云的 accessKey,accessSecret,UID 算出 amqp 连接使用的 username 和 password
- UID 是资源 ownerID, 一般是接入点第一段
- """
- ACCESS_FROM_USER: int = 0
- def __init__(self, access_key: str, access_secret: str, uid: int, security_token: str = None) -> None:
- self.accessKey = access_key
- self.accessSecret = access_secret
- self.UID = uid
- self.securityToken = security_token
- def get_username(self) -> str:
- ak = self.accessKey
- ret = base64.b64encode(f'{self.ACCESS_FROM_USER}:{self.UID}:{ak}'.encode())
- if self.securityToken:
- ret = f'{ret}:{self.securityToken}'
- return str(ret, 'UTF-8')
- def get_password(self) -> str:
- now = datetime.now()
- timestamp = int(now.timestamp() * 1000)
- key = bytes(str(timestamp), 'UTF-8')
- message = bytes(self.accessSecret, 'UTF-8')
- digester = hmac.new(key, message, hashlib.sha1)
- signature1: str = digester.hexdigest()
- signature1 = signature1.upper()
- ret = base64.b64encode(f'{signature1}:{timestamp}'.encode())
- passoword = str(ret, 'UTF-8')
- return passoword
2, 获取认证需要的参数
- # -*- coding: utf-8 -*
- import pika
- from AMQP.AliyunCredentialsProvider3 import AliyunCredentialsProvider
- # 接入点
- host = "1848217816617278.mq-amqp.cn-hangzhou-a.aliyuncs.com";
- # 默认端口
- port = 5672;
- # 资源隔离
- virtualHost = "yutaoamqptest";
- # 阿里云的 accessKey
- accessKey = "********";
- # 阿里云的 accessSecret
- accessSecret = "********";
- # 主账号 id
- resourceOwnerId = int(184********17278);
- provider = AliyunCredentialsProvider(accessKey, accessSecret, resourceOwnerId)
- def getConnectionParam():
- credentials = pika.PlainCredentials(provider.get_username(), provider.get_password(), erase_on_connect=True)
- return pika.ConnectionParameters(host, port, virtualHost, credentials)
3, 发送 Code
- import pika
- from AMQP import connection
- connection = pika.BlockingConnection(connection.getConnectionParam()) # 建立连接
- # Create a new channel with the next available channel number or pass in a channel number to use
- channel = connection.channel()
- # Declare queue, create if needed. This method creates or checks a queue.
- # When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.
- channel.queue_declare(queue='hello')
- channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
- print("[x] Sent'Hello World!'")
- connection.close()
4, 接收 Code
- import pika
- from AMQP import connection
- connection = pika.BlockingConnection(connection.getConnectionParam()) # 建立连接
- channel = connection.channel()
- channel.queue_declare(queue='hello')
- def callback(ch, method, properties, body):
- print("[x] Received %r" % body)
- channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
- print('[*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
5, 项目目录结构
6, 接收测试结果
参考链接
Getting Started With RabbitMQ: Python
来源: https://yq.aliyun.com/articles/701365