RabbitMQ 中 RPC 的实现: 客户端发送请求消息, 服务端回复响应消息, 为了接受响应 response, 客户端需要发送一个回调队列的地址来接受响应, 每条消息在发送的时候会带上一个唯一的 correlation_id, 相应的服务端处理计算后会将结果返回到对应的 correlation_id.
RPC 调用流程:
当生产者启动时, 它会创建一个匿名的独占回调队列, 对于一个 RPC 请求, 生产者发送一条具有两个属性的消息: reply_to(回调队列),correlation_id(每个请求的唯一值), 请求被发送到 rpc_queue 队列, 消费者等待该队列上的请求. 当一个请求出现时, 它会执行该任务, 将带有结果的消息发送回生产者. 生产者等待回调队列上的数据, 当消息出现时, 它检查相关 ID 属性, 如果它与请求中的值匹配, 则返回对应用程序的响应.
RabbitMQ 斐波拉契计算的 RPC, 消费者实现:
- """
- 基于 RabbitMQ 实现 RPC 通信机制 --> 服务端
- """
- import pika
- import uuid
- from functools import lru_cache
- class RabbitServer(object):
- def __init__(self):
- self.conn = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost', port=5672)
- )
- self.channel = self.conn.channel()
- # 声明一个队列, 并进行持久化, exclusive 设置为 false
- self.channel.queue_declare(
- exclusive=False, durable=True, queue='task_queue'
- )
- # 声明一个 exhange 交换机, 类型为 topic
- self.channel.exchange_declare(
- exchange='logs_rpc', exchange_type='topic', durable=True
- )
- # 将队列与交换机进行绑定
- routing_keys = ['#'] # 接受所有的消息
- for routing_key in routing_keys:
- self.channel.queue_bind(
- exchange='logs_rpc', queue='task_queue', routing_key=routing_key
- )
- @lru_cache()
- def fib(self, n):
- """
- 斐波那契数列.===> 程序的处理逻辑
- 使用 lru_cache 优化递归
- :param n:
- :return:
- """
- if n == 0:
- return 0
- elif n == 1:
- return 1
- else:
- return self.fib(n - 1) + self.fib(n - 2)
- def call_back(self, channel, method, properties, body):
- print('------------------------------------------')
- print('接收到的消息为 (斐波那契数列的入参项为):{}'.format(str(body)))
- print('消息的相关属性为:')
- print(properties)
- value = self.fib(int(body))
- print('斐波那契数列的运行结果为:{}'.format(str(value)))
- # 交换机将消息发送到队列
- self.channel.basic_publish(
- exchange='',
- routing_key=properties.reply_to,
- body=str(value),
- properties=pika.BasicProperties(
- delivery_mode=2,
- correlation_id=properties.correlation_id,
- ))
- # 消费者对消息进行确认
- self.channel.basic_ack(delivery_tag=method.delivery_tag)
- def receive_msg(self):
- print('开始接受消息...')
- self.channel.basic_qos(prefetch_count=1)
- self.channel.basic_consume(
- consumer_callback=self.call_back,
- queue='task_queue',
- no_ack=False, # 消费者对消息进行确认
- consumer_tag=str(uuid.uuid4())
- )
- def consume(self):
- self.receive_msg()
- self.channel.start_consuming()
- if __name__ == '__main__':
- rabbit_consumer = RabbitServer()
- rabbit_consumer.consume()
生产者实现:
- """
- 基于 RabbitMQ 实现 RPC 通信机制 --> 客户端
- """
- import pika
- import uuid
- import time
- class RabbitClient(object):
- def __init__(self):
- # 与 RabbitMq 服务器建立连接
- self.conn = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost', port=5672)
- )
- self.channel = self.conn.channel()
- # 声明一个 exchange 交换机, 交换机的类型为 topic
- self.channel.exchange_declare(
- exchange='logs_rpc', exchange_type='topic', durable=True
- )
- # 声明一个回调队列, 用于接受 RPC 回调结果的运行结果
- result = self.channel.queue_declare(durable=True, exclusive=False)
- self.call_queue = result.method.queue
- # 从回调队列当中获取运行结果.
- self.channel.basic_consume(
- consumer_callback=self.on_response,
- queue=self.call_queue,
- no_ack=False
- )
- def on_response(self, channel, method, properties, body):
- """
- 对收到的消息进行确认
- 找到 correlation_id 与服务端的消息标识匹配的消息结果
- :param channel:
- :param method:
- :param properties:
- :param body:
- :return:
- """
- if self.corr_id == properties.correlation_id:
- self.response = body
- print('斐波那契数列的 RPC 返回结果是:{}'.format(body))
- print('相关属性信息:')
- print(properties)
- self.channel.basic_ack(delivery_tag=method.delivery_tag)
- def send_msg(self, routing_key, message):
- """
- exchange 交换机将根据消息的路由键将消息路由到对应的 queue 当中
- :param routing_key: 消息的路由键
- :param message: 生成者发送的消息
- :return:
- """
- self.response = None
- self.corr_id = str(uuid.uuid4())
- self.channel.basic_publish(
- exchange='logs_rpc',
- routing_key=routing_key,
- body=message,
- properties=pika.BasicProperties(
- delivery_mode=2,
- correlation_id=self.corr_id,
- reply_to=self.call_queue,
- ))
- while self.response is None:
- print('等待远程服务端的返回结果...')
- self.conn.process_data_events() # 非阻塞式的不断获取消息.
- return self.response
- def close(self):
- self.conn.close()
- if __name__ == "__main__":
- rabbit_producer = RabbitClient()
- routing_key = 'hello every one'
- start_time = int(time.time())
- for item in range(2000):
- num = str(item)
- print('生产者发送的消息为:{}'.format(num))
- rabbit_producer.send_msg(routing_key, num)
- end_time = int(time.time())
- print("耗时 {}s".format(str(end_time - start_time)))
计算 2000 以内的斐波拉契数列, 执行结果如下:
来源: https://www.cnblogs.com/FG123/p/10137411.html