rabbitMQ 是一款基于 AMQP 协议的消息中间件, 它能够在应用之间提供可靠的消息传输. 在易用性, 扩展性, 高可用性上表现优秀. 使用消息中间件利于应用之间的解耦, 生产者 (客户端) 无需知道消费者 (服务端) 的存在. 而且两端可以使用不同的语言编写, 大大提供了灵活性.
中文文档
安装
- # 安装配置 epel 源
- rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
- # 安装 erlang
- yum -y install erlang
- # 安装 RabbitMQ
- yum -y install rabbitmq-server
- # 启动 / 停止
- service rabbitmq-server start/stop
rabbitMQ 工作模型
简单模式
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='hello')
- channel.basic_publish(exchange='',
- routing_key='hello',
- body='Hello World!')
- print("[x] Sent'Hello World!'")
- connection.close()
生产者
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='hello')
- def callback(ch, method, properties, body):
- print("[x] Received %r" % body)
- channel.basic_consume( callback,
- queue='hello',
- no_ack=True)
- print('[*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
消费者
相关参数
1,no-ack = False
如果消费者遇到情况 (its channel is closed, connection is closed, or TCP connection is lost) 挂掉了, 那么, RabbitMQ 会重新将该任务添加到队列中.
回调函数中的 ch.basic_ack(delivery_tag=method.delivery_tag)
basic_comsume 中的 no_ack=False
接收消息端应该这么写:
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='10.211.55.4'))
- channel = connection.channel()
- channel.queue_declare(queue='hello')
- def callback(ch, method, properties, body):
- print("[x] Received %r" % body)
- import time
- time.sleep(10)
- print 'ok'
- ch.basic_ack(delivery_tag = method.delivery_tag)
- channel.basic_consume(callback,
- queue='hello',
- no_ack=False)
- print('[*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
2,durable : 消息不丢失
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
- channel = connection.channel()
- # make message persistent
- channel.queue_declare(queue='hello', durable=True)
- channel.basic_publish(exchange='',
- routing_key='hello',
- body='Hello World!',
- properties=pika.BasicProperties(
- delivery_mode=2, # make message persistent
- ))
- print("[x] Sent'Hello World!'")
- connection.close()
生产者
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
- channel = connection.channel()
- # make message persistent
- channel.queue_declare(queue='hello', durable=True)
- def callback(ch, method, properties, body):
- print("[x] Received %r" % body)
- import time
- time.sleep(10)
- print 'ok'
- ch.basic_ack(delivery_tag = method.delivery_tag)
- channel.basic_consume(callback,
- queue='hello',
- no_ack=False)
- print('[*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
消费者
3, 消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走, 例如: 消费者 1 去队列中获取 奇数 序列的任务, 消费者 1 去队列中获取 偶数 序列的任务.
channel.basic_qos(prefetch_count=1) 表示谁来谁取, 不再按照奇偶数排列
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
- channel = connection.channel()
- # make message persistent
- channel.queue_declare(queue='hello')
- def callback(ch, method, properties, body):
- print("[x] Received %r" % body)
- import time
- time.sleep(10)
- print 'ok'
- ch.basic_ack(delivery_tag = method.delivery_tag)
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(callback,
- queue='hello',
- no_ack=False)
- print('[*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
exchange 模型
1, 发布订阅
发布订阅和简单的消息队列区别在于, 发布订阅会将消息发送给所有的订阅者, 而消息队列中的数据被消费一次便消失. 所以, RabbitMQ 实现发布和订阅时, 会为每一个订阅者创建一个队列, 而发布者发布消息时, 会将消息放置在所有相关队列中.
exchange type = fanout
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='logs',
- type='fanout')
- message = ''.join(sys.argv[1:]) or"info: Hello World!"channel.basic_publish(exchange='logs',
- routing_key='',
- body=message)
- print("[x] Sent %r" % message)
- connection.close()
生产者
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='logs',
- type='fanout')
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
- channel.queue_bind(exchange='logs',
- queue=queue_name)
- print('[*] Waiting for logs. To exit press CTRL+C')
- def callback(ch, method, properties, body):
- print("[x] %r" % body)
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
- channel.start_consuming()
消费则
2, 关键字发送
之前事例, 发送消息时明确指定某个队列并向其中发送消息, RabbitMQ 还支持根据关键字发送, 即: 队列绑定关键字, 发送者将数据根据关键字发送到消息 exchange,exchange 根据 关键字 判定应该将数据发送至指定队列.
exchange type = direct
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='direct_logs',
- type='direct')
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
- severities = sys.argv[1:]
- if not severities:
- sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
- sys.exit(1)
- for severity in severities:
- channel.queue_bind(exchange='direct_logs',
- queue=queue_name,
- routing_key=severity)
- print('[*] Waiting for logs. To exit press CTRL+C')
- def callback(ch, method, properties, body):
- print("[x] %r:%r" % (method.routing_key, body))
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
- channel.start_consuming()
- View Code
3, 模糊匹配
exchange type = topic
发送者路由值 队列中
old.boy.python old.* -- 不匹配
old.boy.python old.# -- 匹配
在 topic 类型下, 可以让队列绑定几个模糊的关键字, 之后发送者将数据发送到 exchange,exchange 将传入 "路由值" 和 "关键字" 进行匹配, 匹配成功, 则将数据发送到指定队列.
# 表示可以匹配 0 个 或 多个 单词
* 表示只能匹配 一个 单词
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.exchange_declare(exchange='topic_logs',
- type='topic')
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
- binding_keys = sys.argv[1:]
- if not binding_keys:
- sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
- sys.exit(1)
- for binding_key in binding_keys:
- channel.queue_bind(exchange='topic_logs',
- queue=queue_name,
- routing_key=binding_key)
- print('[*] Waiting for logs. To exit press CTRL+C')
- def callback(ch, method, properties, body):
- print("[x] %r:%r" % (method.routing_key, body))
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
- channel.start_consuming()
栗子
基于 rabbitMQ 的 RPC
Callback queue 回调队列
一个客户端向服务器发送请求, 服务器端处理请求后, 将其处理结果保存在一个存储体中. 而客户端为了获得处理结果, 那么客户在向服务器发送请求时, 同时发送一个回调队列地址 reply_to.
Correlation id 关联标识
一个客户端可能会发送多个请求给服务器, 当服务器处理完后, 客户端无法辨别在回调队列中的响应具体和那个请求时对应的. 为了处理这种情况, 客户端在发送每个请求时, 同时会附带一个独有 correlation_id 属性, 这样客户端在回调队列中根据 correlation_id 字段的值就可以分辨此响应属于哪个请求.
客户端发送请求:
某个应用将请求信息交给客户端, 然后客户端发送 RPC 请求, 在发送 RPC 请求到 RPC 请求队列时, 客户端至少发送带有 reply_to 以及 correlation_id 两个属性的信息
服务端工作流:
等待接受客户端发来 RPC 请求, 当请求出现的时候, 服务器从 RPC 请求队列中取出请求, 然后处理后, 将响应发送到 reply_to 指定的回调队列中
客户端接受处理结果:
客户端等待回调队列中出现响应, 当响应出现时, 它会根据响应中 correlation_id 字段的值, 将其返回给对应的应用
来源: https://www.cnblogs.com/peng104/p/10555541.html