一个消息被多个消费者获取, 并且消息的目标队列可被生产者指定
- package RabbitMQ
- import (
- "fmt"
- "github.com/streadway/amqp"
- "log"
- )
- const MQURL = "amqp://guest:[email protected]:5672/imooc"
- // 创建 rabbitmq 结构体实例
- type RabbitMQ struct {
- conn *amqp.Connection
- channel *amqp.Channel
- QueueName string
- Exchange string
- Key string
- Mqurl string
- }
- func NewRabbitMQ(queueName string, Exchange string, key string) *RabbitMQ {
- rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: Exchange, Key: key, Mqurl: MQURL}
- var err error
- rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
- rabbitmq.failOnErr(err, "创建连接错误")
- rabbitmq.channel, err = rabbitmq.conn.Channel()
- rabbitmq.failOnErr(err, "获取 channel 失败")
- return rabbitmq
- }
- // 断开 channel 和 connection
- func (r *RabbitMQ) Destroy() {
- r.channel.Close()
- r.conn.Close()
- }
- // 错误处理函数
- func (r *RabbitMQ) failOnErr(err error, message string) {
- if err != nil {
- log.Fatalf("%s:%s", message, err)
- panic(fmt.Sprintf("%s%s", message, err))
- }
- }
- // 路由模式
- // 创建 rabbitmq 实例
- func NewRabbitMQRouting(exchangeName string, routingkey string) *RabbitMQ {
- // 创建 rabbitmq 实例
- rabbitmq := NewRabbitMQ("", exchangeName, routingkey)
- var err error
- // 获取 connection
- rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
- rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
- // 获取 channel
- rabbitmq.channel, err = rabbitmq.conn.Channel()
- rabbitmq.failOnErr(err, "failed to open a channel")
- return rabbitmq
- }
- // 路由模式发送消息
- func (r *RabbitMQ) PublishRouting(message string) {
- // 创建交换机
- err := r.channel.ExchangeDeclare(
- r.Exchange,
- // 路由模式要改成 direct
- "direct",
- true,
- false,
- false,
- false,
- nil,
- )
- r.failOnErr(err, "Failed to declare an exchange")
- err = r.channel.Publish(
- r.Exchange,
- r.Key,
- false,
- false,
- amqp.Publishing{
- ContentType: "text/plain",
- Body: []byte(message),
- })
- r.failOnErr(err, "Failed to Publish msg")
- }
- // 路由模式接受消息
- func (r *RabbitMQ) ReceiveRouting() {
- // 创建交换机
- err := r.channel.ExchangeDeclare(
- r.Exchange,
- // 路由模式要改成 direct
- "direct",
- true,
- false,
- false,
- false,
- nil,
- )
- r.failOnErr(err, "Failed to declare an exchange")
- // 试探性创建队列, 这里注意队列名称不要写
- q, err := r.channel.QueueDeclare(
- "", // 随机生产队列名称
- false,
- false,
- true,
- false,
- nil,
- )
- r.failOnErr(err, "Failed to declare a queue")
- // 绑定队列到 exchange 中
- err = r.channel.QueueBind(
- q.Name,
- // 需要绑定 key
- r.Key,
- r.Exchange,
- false,
- nil,
- )
- // 消费消息
- messages, err := r.channel.Consume(
- q.Name,
- "",
- true,
- false,
- false,
- false,
- nil,
- )
- forever := make(chan bool)
- go func() {
- for d := range messages {
- log.Printf("Received a message:%s", d.Body)
- }
- }()
- fmt.Println("推出请按 CTRL+C\n")
- <-forever
- }
路由模式消费者一号代码
- package main
- import "immoc-rabbitmq/RabbitMQ"
- func main() {
- imoocOne := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one")
- imoocOne.ReceiveRouting()
- }
路由模式消费者二号代码
- package main
- import "immoc-rabbitmq/RabbitMQ"
- func main() {
- imoocTwo := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_two")
- imoocTwo.ReceiveRouting()
- }
路由模式生产者代码
- package main
- import (
- "fmt"
- "immoc-rabbitmq/RabbitMQ"
- "strconv"
- "time"
- )
- func main() {
- imoocOne := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one")
- imoocTwo := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_two")
- for i := 0; i <= 10; i++ {
- imoocOne.PublishRouting("Hello imooc one!" + strconv.Itoa(i))
- imoocTwo.PublishRouting("Hello imooc Two!" + strconv.Itoa(i))
- time.Sleep(time.Second)
- fmt.Println(i)
- }
- }
来自为知笔记 (Wiz) https://www.wiz.cn/i/c5b11ee0
路由模式
来源: http://www.bubuko.com/infodetail-3340401.html