RabbitMQ 是一个消息代理, 它接受和转发消息, 是一个由 Erlang 语言开发的遵循 AMQP 协议的开源实现. 在 RabbitMQ 中生产者不会将消息直接发送到队列当中, 而是将消息直接发送到交换机 (exchange), 交换机用来接受生产者发送的消息并将这些消息发送给绑定的队列, 即: 生产者 --> 交换机 -->队列.
在 RabbitMQ 中最主要的三种交换机: 1. fanout(广播交换机) 2. direct(直连交换机) 3. topic(话题交换机)
1. fanout(广播交换机)
fanout 会将接受到的所有消息广播到它所绑定的所有队列当中(每个消费者都会收到所有的消息), 对于广播交换机, 消息路由键 routing_key 和队列绑定键 routing_key 的作用都会被忽略.
fanout 生产者:
- import pika
- class RabbitProducer(object):
- """
- 与 RabbitMq 服务器建立连接
- """
- def __init__(self):
- self.conn = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost', port=5672)
- )
- self.channel = self.conn.channel()
- # 声明一个 exchange 交换机, 交换机的类型为 fanout 广播.
- self.channel.exchange_declare(
- exchange='fanout_exchange', exchange_type='fanout', durable=True
- )
- def send_msg(self, message):
- """
- routing_key: 绑定的 key
- :param message:
- :return:
- """
- self.channel.basic_publish(
- exchange='fanout_exchange',
- routing_key='', # 因为 exchange 的类型为 fanout, 所以 routing_key 的数值在这里将被忽略
- body=message,
- properties=pika.BasicProperties(
- delivery_mode=2,
- # 消息进行持久化(防止服务器挂掉.)===> 如果没有 queue 绑定到这个 exchange 交换机, 这个参数是没有的.
- ))
- def close(self):
- self.conn.close()
- if __name__ == "__main__":
- rabbit_producer = RabbitProducer()
- for i in range(10):
- message = 'hello world {}!'.format(i)
- rabbit_producer.send_msg(message)
- View Code
消费者 consumer1:
- import pika
- import uuid
- class RabbitConsumer(object):
- """
- fanout 消费者 1
- """
- def __init__(self):
- self.conn = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost', port=5672)
- )
- self.channel = self.conn.channel()
- # 声明一个队列 queue_consumer1, 并进行持久化(防止服务器挂掉),exclusive 设置为 false
- self.channel.queue_declare(
- exclusive=False, durable=True, queue='queue_consumer1'
- )
- # 声明一个 exhange 交换机, 其类型为 fanout 广播类型 与生产者的交换机一致
- self.channel.exchange_declare(
- exchange='fanout_exchange', exchange_type='fanout', durable=True
- )
- # 将队列 queue_consumer1 与该 exchange 交换机进行绑定
- self.channel.queue_bind(exchange='fanout_exchange', queue='queue_consumer1')
- def call_back(self, method, body):
- """
- 消费者对消息进行确认, 防止消费者挂掉.
- :param method:
- :param body:
- :return:
- """
- self.channel.basic_ack(delivery_tag=method.delivery_tag)
- print('接收到的消息为:{}'.format(str(body)))
- def receive_msg(self):
- print('consumer1 开始接受消息...')
- # 当上一条消息未确认时, 会告知 RabbitMQ 不要再发送消息给这个消费者了 可以控制流量
- self.channel.basic_qos(prefetch_count=1)
- self.channel.basic_consume(
- consumer_callback=self.call_back,
- queue='queue_consumer1',
- no_ack=False, # 消费者对消息进行确认, 防止消费者挂掉
- consumer_tag=str(uuid.uuid4())
- )
- def consume(self):
- self.receive_msg()
- self.channel.start_consuming()
- if __name__ == '__main__':
- rabbit_consumer = RabbitConsumer()
- rabbit_consumer.consume()
- View Code
消费者 consumer2:
- import pika
- import uuid
- class RabbitConsumer(object):
- def __init__(self):
- self.conn = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost', port=5672)
- )
- self.channel = self.conn.channel()
- # 声明一个队列 queue_consumer2, 并进行持久化(防止服务器挂掉),exclusive 设置为 false
- self.channel.queue_declare(
- exclusive=False, durable=True, queue='queue_consumer2'
- )
- # T 声明一个 exhange 交换机, 其类型为 fanout 广播类型
- self.channel.exchange_declare(
- exchange='fanout_exchange', exchange_type='fanout', durable=True
- )
- # 将队列 queue_consumer2 与该 exchange 交换机进行绑定
- self.channel.queue_bind(exchange='fanout_exchange', queue='queue_consumer2')
- def call_back(self, method, body):
- """
- 消费者对消息进行确认, 防止消费者挂掉.
- :param method:
- :param body:
- :return:
- """
- self.channel.basic_ack(delivery_tag=method.delivery_tag)
- print('接收到的消息为:{}'.format(str(body)))
- def receive_msg(self):
- print('consumer2 开始接受消息...')
- self.channel.basic_consume(
- consumer_callback=self.call_back,
- queue='queue_consumer2',
- no_ack=False,
- consumer_tag=str(uuid.uuid4())
- )
- def consume(self):
- self.receive_msg()
- self.channel.start_consuming()
- if __name__ == '__main__':
- rabbit_consumer = RabbitConsumer()
- rabbit_consumer.consume()
- View Code
fanout 会将接受到的所有消息广播到消费者 consumer1 和消费者 consumer2, 交换机的缺陷: 它只能无意识的播放, 不够灵活地控制消息广播给指定的消费者
2. direct(直连交换机)
对于 direct, 根据绑定键判定应该将数据发送至哪个队列, 消息进入队列, 其绑定秘钥 (routing_key) 与消息的路由秘钥要完全匹配, 当 exchange 使用相同的绑定秘钥 (routing_key) 去绑定多个队列也是合法的, 在这种情况下 direct exchange 的效果等同于 fanout exchange, 交换机会将消息广播到所有匹配的队列当中.
direct 生产者:
- import pika
- class RabbitProducer(object):
- """
- 与 RabbitMq 服务器建立连接
- """
- def __init__(self):
- self.conn = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost', port=5672)
- )
- self.channel = self.conn.channel()
- # 声明一个 exchange 交换机, 交换机的类型为 direct
- self.channel.exchange_declare(
- exchange='direct_exchange', exchange_type='direct', durable=True
- )
- def send_msg(self, routing_key, message):
- """
- :param routing_key: 消息的路由键 本例中为 routing_info
- :param message: 生成者发送的消息
- :return:
- """
- self.channel.basic_publish(
- exchange='direct_exchange',
- routing_key=routing_key,
- body=message,
- properties=pika.BasicProperties(
- delivery_mode=2,
- # 消息进行持久化(防止服务器挂掉.)===> 如果没有 queue 绑定到这个 exchange 交换机, 这个参数是没有的.
- ))
- def close(self):
- self.conn.close()
- if __name__ == "__main__":
- rabbit_producer = RabbitProducer()
- routing_key = 'routing_info'
- for i in range(10):
- message = 'hello world {}!'.format(i)
- print('生产者发送的消息为:{}'.format(message))
- rabbit_producer.send_msg(routing_key, message)
- View Code
direct 消费者:
- import pika
- import uuid
- class RabbitConsumer(object):
- """
- 消费者(订阅者)
- """
- def __init__(self):
- self.conn = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost', port=5672)
- )
- self.channel = self.conn.channel()
- # 消息持久化
- self.channel.queue_declare(
- exclusive=False, durable=True, queue='task_queue'
- )
- # 交换机类型为 direct.
- self.channel.exchange_declare(
- exchange='direct_exchange', exchange_type='direct', durable=True
- )
- # 将队列与该 exchange 交换机进行绑定
- routing_keys = ['routing_info', 'aaa']
- for routing_key in routing_keys:
- self.channel.queue_bind(
- exchange='direct_exchange', queue='task_queue', routing_key=routing_key
- ) # 如果生产者发生消息的 routing_key 与消费者绑定队列的 routing_key 相同则成功发送
- def call_back(self, channel, method, properties, body):
- """
- 消费者对消息进行确认, 防止消费者挂掉
- :param channel:
- :param method:
- :param properties:
- :param body:
- :return:
- """
- self.channel.basic_ack(delivery_tag=method.delivery_tag)
- print('接收到的消息为:{}'.format(str(body)))
- def receive_msg(self):
- print('开始接受消息...')
- self.channel.basic_qos(prefetch_count=1) # TODO 告诉 RabbitMQ, 不要向我发送新的消息.
- self.channel.basic_consume(
- consumer_callback=self.call_back,
- queue='task_queue',
- no_ack=False,
- consumer_tag=str(uuid.uuid4())
- )
- def consume(self):
- self.receive_msg()
- self.channel.start_consuming()
- if __name__ == '__main__':
- rabbit_consumer = RabbitConsumer()
- rabbit_consumer.consume()
- View Code
direct 直连交换机相当于是 fanout 的升级版, 当消费者的队列绑定的秘钥 routing_key 与生产者的 routing_key 相同时, 消费者就会收到消息; 当所有消费者的队列所绑定的 routing_key 都一样且与生产者相同时, 就相当于 fanout 交换机
3. topic(话题交换机)
direct(直连交换机)虽然相当于 fanout 的升级版, 但它仍然有局限性, 它不能根据多个标准进行路由; topic(话题交换机)可以很好地解决这一问题:
(1) 如果消息的路由秘钥与队列的绑定秘钥符合匹配规则, topic 就会将消息发送到相应的队列当中
(2) 对于绑定键 (routing_key) 有两个特殊的情况: * (星号)可以代替一个单词,#(散列)可以替代零个或多个单词
(3) 对于发送到 topic 交换机消息的 routing_key 如果包含特殊字符, 只能是由 "." 分割的单词表, 如("zhangsan.lisi")
topic 生产者:
- import pika
- class RabbitProducer(object):
- def __init__(self):
- self.conn = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost', port=5672)
- )
- self.channel = self.conn.channel()
- # 声明交换机, 交换机的类型为 topic
- self.channel.exchange_declare(
- exchange='logs_topic', exchange_type='topic', durable=True
- )
- def send_msg(self, routing_key, message):
- """
- :param routing_key: 消息的路由键
- :param message: 生成者发送的消息
- :return:
- """
- self.channel.basic_publish(
- exchange='logs_topic',
- routing_key=routing_key,
- body=message,
- properties=pika.BasicProperties(
- delivery_mode=2,
- # 消息进行持久化
- ))
- def close(self):
- self.conn.close()
- if __name__ == "__main__":
- rabbit_producer = RabbitProducer()
- routing_keys = ['info', "debug", "a.debug.b", "a.info.b"]
- for routing_key in routing_keys:
- message = 'hello world! {}'.format(routing_key)
- print('生产者发送的消息为:{}'.format(message))
- rabbit_producer.send_msg(routing_key, message)
- View Code
topic 消费者 1 --> 实现 fanout 交换机:
- """
- 当 topic 交换机使用 #绑定键绑定队列时, 此时 topic 交换机就会将消息广播到所有的队列当中,
- 不管消息的路由秘钥如何, 此时 topic 交换机的效果等同于 fanout: 发送所有消息都会接受到
- """
- import pika
- import uuid
- class RabbitConsumer(object):
- def __init__(self):
- self.conn = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost', port=5672)
- )
- self.channel = self.conn.channel()
- # 消息持久化
- self.channel.queue_declare(
- exclusive=False, durable=True, queue='task_queue'
- )
- # 声明交换机, 其类型为 topic
- self.channel.exchange_declare(
- exchange='logs_topic', exchange_type='topic', durable=True
- )
- # 将队列与该交换机进行绑定
- routing_keys = ['#'] # 使用 #绑定键时, 它将接受所有的消息, 同 fanout 效果一样.
- for routing_key in routing_keys:
- self.channel.queue_bind(
- exchange='logs_topic', queue='task_queue', routing_key=routing_key
- )
- def call_back(self, channel, method, properties, body):
- """
- 消费者对消息进行确认, 防止消费者挂掉
- :param channel:
- :param method:
- :param properties:
- :param body:
- :return:
- """
- self.channel.basic_ack(delivery_tag=method.delivery_tag)
- print('接收到的消息为:{}'.format(str(body)))
- def receive_msg(self):
- print('开始接受消息...')
- self.channel.basic_qos(prefetch_count=1)
- self.channel.basic_consume(
- consumer_callback=self.call_back,
- queue='task_queue',
- no_ack=False, # 消费者对消息进行确认
- consumer_tag=str(uuid.uuid4())
- )
- def consume(self):
- self.receive_msg()
- self.channel.start_consuming()
- if __name__ == '__main__':
- rabbit_consumer = RabbitConsumer()
- rabbit_consumer.consume()
- View Code
topic 消费者 2 --> 实现 direct 交换机:
- """
- 当 topic 交换机没有使用 * 和 #匹配符绑定键绑定队列时, 此时 topic 交换机的效果等同于 direct,
- 会收到 key 相匹配的消息 如: info debug
- """
- import pika
- import uuid
- class RabbitConsumer(object):
- def __init__(self):
- self.conn = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost', port=5672)
- )
- self.channel = self.conn.channel()
- # 消息持久化
- self.channel.queue_declare(
- exclusive=False, durable=True, queue='work_queue'
- )
- #
- # 声明交换机, 其类型为 topic
- self.channel.exchange_declare(
- exchange='logs_topic', exchange_type='topic', durable=True
- )
- # 将队列与交换机进行绑定
- routing_keys = ['info', 'debug']
- for routing_key in routing_keys:
- self.channel.queue_bind(
- exchange='logs_topic', queue='work_queue', routing_key=routing_key
- )
- def call_back(self, channel, method, properties, body):
- """
- 消费者对消息进行确认, 防止消费者挂掉
- :param channel:
- :param method:
- :param properties:
- :param body:
- :return:
- """
- self.channel.basic_ack(delivery_tag=method.delivery_tag)
- print('接收到的消息为:{}'.format(str(body)))
- def receive_msg(self):
- print('开始接受消息...')
- self.channel.basic_qos(prefetch_count=1)
- self.channel.basic_consume(
- consumer_callback=self.call_back,
- queue='work_queue',
- no_ack=False, # 消费者对消息进行确认
- consumer_tag=str(uuid.uuid4())
- )
- def consume(self):
- self.receive_msg()
- self.channel.start_consuming()
- if __name__ == '__main__':
- rabbit_consumer = RabbitConsumer()
- rabbit_consumer.consume()
- View Code
topic 消费者 3 --> 实现 *.x.* 消息匹配:
- """ 匹配任意点分割的单词 生产者发送的: a.debug.b 则匹配了'*.debug.*' 生产者发送的: a.info.b 则匹配了'*.info.*'"""
- import pika
- import uuid
- class RabbitConsumer(object):
- def __init__(self):
- self.conn = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost', port=5672)
- )
- self.channel = self.conn.channel()
- # 消息持久化
- self.channel.queue_declare(
- exclusive=False, durable=True, queue='other_queue'
- )
- # 声明交换机, 其类型为 topic
- self.channel.exchange_declare(
- exchange='logs_topic', exchange_type='topic', durable=True
- )
- # 将队列与交换机进行绑定
- routing_keys = ['*.info.*', '*.debug.*', 'dfdf*']
- for routing_key in routing_keys:
- self.channel.queue_bind(
- exchange='logs_topic', queue='other_queue', routing_key=routing_key
- )
- def call_back(self, channel, method, properties, body):
- """
- 消费者对消息进行确认, 防止消费者挂掉
- :param channel:
- :param method:
- :param properties:
- :param body:
- :return:
- """
- self.channel.basic_ack(delivery_tag=method.delivery_tag)
- print('接收到的消息为:{}'.format(str(body)))
- def receive_msg(self):
- print('开始接受消息...')
- self.channel.basic_qos(prefetch_count=1)
- self.channel.basic_consume(
- consumer_callback=self.call_back,
- queue='other_queue',
- no_ack=False, # 消费者对消息进行确认
- consumer_tag=str(uuid.uuid4())
- )
- def consume(self):
- self.receive_msg()
- self.channel.start_consuming()
- if __name__ == '__main__':
- rabbit_consumer = RabbitConsumer()
- rabbit_consumer.consume()
- View Code
topic 消费者执行结果:
消费者 1:
消费者 2:
消费者 3:
来源: https://www.cnblogs.com/FG123/p/10137383.html