概念
什么叫消息队列
消息 (Message) 是指在应用间传送的数据消息可以非常简单, 比如只包含文本字符串, 也可以更复杂, 可能包含嵌入对象
消息队列 (Message Queue) 是一种应用间的通信方式, 消息发送后可以立即返回, 由消息系统来确保消息的可靠传递消息发布者只管把消息发布到 MQ 中而不用管谁来取, 消息使用者只管从 MQ 中取消息而不管是谁发布的这样发布者和使用者都不用知道对方的存在
从上面的描述中可以看出消息队列是一种应用间的异步协作机制, 那什么时候需要使用 MQ 呢?
以常见的订单系统为例, 用户点击下单按钮之后的业务逻辑可能包括: 扣减库存生成相应单据发红包发短信通知在业务发展初期这些逻辑可能放在一起同步执行, 随着业务的发展订单量增长, 需要提升系统服务的性能, 这时可以将一些不需要立即生效的操作拆分出来异步执行, 比如发放红包发短信通知等这种场景下就可以用 MQ , 在下单的主流程 (比如扣减库存生成相应单据) 完成之后发送一条消息到 MQ 让主流程快速完结, 而由另外的单独线程拉取 MQ 的消息(或者由 MQ 推送消息), 当发现 MQ 中有发红包或发短信之类的消息时, 执行相应的业务逻辑
RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现
rabbitMQ 是一款基于 AMQP 协议的消息中间件, 它能够在应用之间提供可靠的消息传输在易用性, 扩展性, 高可用性上表现优秀使用消息中间件利于应用之间的解耦, 生产者 (客户端) 无需知道消费者 (服务端) 的存在而且两端可以使用不同的语言编写, 大大提供了灵活性
示例
生产者:
- # ######################### 生产者 #########################
- #!/usr/bin/env python
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='hello')
- channel.basic_publish(exchange='',
- routing_key='hello',
- body='Hello World!')
- print("[x] Sent'Hello World!'")
- connection.close()
消费者:
- # ########################## 消费者 ##########################
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='hello')
- def callback(ch, method, properties, body):
- print("[x] Received %r" % body)
- channel.basic_consume( callback,
- queue='hello',
- no_ack=True)
- print('[*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
基于 RabbitMQ 的 RPC
客户端发送请求: 某个应用将请求信息交给客户端, 然后客户端发送 RPC 请求, 在发送 RPC 请求到 RPC 请求队列时, 客户端至少发送带有 reply_to 以及 correlation_id 两个属性的信息
服务器端工作流: 等待接受客户端发来 RPC 请求, 当请求出现的时候, 服务器从 RPC 请求队列中取出请求, 然后处理后, 将响应发送到 reply_to 指定的回调队列中
客户端接受处理结果: 客户端等待回调队列中出现响应, 当响应出现时, 它会根据响应中 correlation_id 字段的值, 将其返回给对应的应用
服务端
- #!/usr/bin/env python
- import pika
- # 建立连接, 服务器地址为 localhost, 可指定 ip 地址
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- # 建立会话
- channel = connection.channel()
- # 声明 RPC 请求队列
- channel.queue_declare(queue='rpc_queue')
- # 数据处理方法
- def fib(n):
- if n == 0:
- return 0
- elif n == 1:
- return 1
- else:
- return fib(n-1) + fib(n-2)
- # 对 RPC 请求队列中的请求进行处理
- def on_request(ch, method, props, body):
- n = int(body)
- print("[.] fib(%s)" % n)
- # 调用数据处理方法
- response = fib(n)
- # 将处理结果 (响应) 发送到回调队列
- ch.basic_publish(exchange='',
- routing_key=props.reply_to,
- properties=pika.BasicProperties(correlation_id = \
- props.correlation_id),
- body=str(response))
- ch.basic_ack(delivery_tag = method.delivery_tag)
- # 负载均衡, 同一时刻发送给该服务器的请求不超过一个
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(on_request, queue='rpc_queue')
- print("[x] Awaiting RPC requests")
- channel.start_consuming()
客户端
- #!/usr/bin/env python
- import pika
- import uuid
- class FibonacciRpcClient(object):
- def __init__(self):
客户端启动时, 创建回调队列, 会开启会话用于发送 RPC 请求以及接受响应
- # 建立连接, 指定服务器的 ip 地址
- self.connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- # 建立一个会话, 每个 channel 代表一个会话任务
- self.channel = self.connection.channel()
- # 声明回调队列, 再次声明的原因是, 服务器和客户端可能先后开启, 该声明是幂等的, 多次声明, 但只生效一次
- result = self.channel.queue_declare(exclusive=True)
- # 将次队列指定为当前客户端的回调队列
- self.callback_queue = result.method.queue
- # 客户端订阅回调队列, 当回调队列中有响应时, 调用 `on_response` 方法对响应进行处理;
- self.channel.basic_consume(self.on_response, no_ack=True,
- queue=self.callback_queue)
- # 对回调队列中的响应进行处理的函数
- def on_response(self, ch, method, props, body):
- if self.corr_id == props.correlation_id:
- self.response = body
- # 发出 RPC 请求
- def call(self, n):
- # 初始化 response
- self.response = None
- #生成 correlation_id
- self.corr_id = str(uuid.uuid4())
- # 发送 RPC 请求内容到 RPC 请求队列 `rpc_queue`, 同时发送的还有 `reply_to` 和 `correlation_id`
- self.channel.basic_publish(exchange='',
- routing_key='rpc_queue',
- properties=pika.BasicProperties(
- reply_to = self.callback_queue,
- correlation_id = self.corr_id,
- ),
- body=str(n))
- while self.response is None:
- self.connection.process_data_events()
- return int(self.response)
- # 建立客户端
- fibonacci_rpc = FibonacciRpcClient()
- # 发送 RPC 请求
- print("[x] Requesting fib(30)")
- response = fibonacci_rpc.call(30)
- print("[.] Got %r" % response)
总结
概念
RabbitMQ
来源: http://www.jianshu.com/p/425b1ce3f9da