需求
生产者 Producer 向一个队列发送消息, 并且为消息打上不同的 Tag. 假设这个队列有 3 个消费者(Consumer #[1:3]),Consumer #1 只想消费 tag1 标记的消息, Consumer #2 只想消费 tag2 标记的消息, Consumer #3 只想消费 tag3 标记的消息.
实现分析
Producer 消息打 Tag
生产者 publish 消息时, 将 Tag 保存在 Map<String, Object> 类型的 header 字段, 作为构建 AMQP.BasicProperties 参数
Consumer 指定消费 Tag 类型
消费者如何告知 Broker 只消费特定 Tag?
假设 Consumer #1 只希望消费带 tag1 标记的消息, 那么 Consumer #1 可以在向 Broker 请求 Basic.Consume 指令时, 捎带自己期望的 Tag 字符串. Client 在具体生成 consumerTag 时可以用 Tag 关键字加上随机字符串(避免 consumerTag 重复):
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
服务端如何过滤
消费者通过 Basic.Consume 指令来监听队列的消息, 这些消费者信息服务端是如何存储的?
保存在队列主进程 (Pid) 的 state 中(具体调试可以通过 sys:get_state(Pid))
- %% Queue's state
- -record(q, {
- %% ...
- %% consumers state, see rabbit_queue_consumers
- consumers,
- %% ...
- }).
并且队列进程在初始化时, 会进行 consumers 初始化:
consumers = rabbit_queue_consumers:new(),
consumers 字段实际由 priority_queue:new()初始化. 当有新的 consumer 注册到队列进程, 那么会调用 rabbit_queue_consumer 模块的 add_consumer 方法来向 priority_queue 添加一个元素; 同理当有 consumer 下线时, 最终也会调用该模块的 remove_consumer 方法.
具体过程
Broker 向 Consumer 投递消息时, 底层是通过 rabbit_amqqueue_process 调用 rabbit_queue_consumers 模块的 deliver 方法. 默认采用
- deliver(FetchFun, QName, ConsumersChanged,
- State = #state{consumers = Consumers}) ->
- case priority_queue:out_p(Consumers) of
- {empty, _} ->
- {undelivered, ConsumersChanged,
- State#state{use = update_use(State#state.use, inactive)}};
- {{value, QEntry, Priority}, Tail} ->
- case deliver_to_consumer(FetchFun, QEntry, QName) of
- {delivered, R} ->
- {delivered, ConsumersChanged, R,
- State#state{consumers = priority_queue:in(QEntry, Priority,
- Tail)}};
- undelivered ->
- deliver(FetchFun, QName, true,
- State#state{consumers = Tail})
- end
- end.
从 priority_queue 从获取一个 QEntry({ChPid, Consumer}), 然后通过 FetchFun 从队列中获取消息, 发送到 Channel 进程(ChPid)
- deliver_to_consumer(FetchFun,
- #consumer{tag = CTag,
- ack_required = AckRequired},
- C = #cr{ch_pid = ChPid,
- acktags = ChAckTags,
- unsent_message_count = Count},
- QName) ->
- {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
- rabbit_channel:deliver(ChPid, CTag, AckRequired,
- {QName, self(), AckTag, IsDelivered, Message}),
- ChAckTags1 = case AckRequired of
- true -> queue:in({AckTag, CTag}, ChAckTags);
- false -> ChAckTags
- end,
- update_ch_record(C#cr{acktags = ChAckTags1,
- unsent_message_count = Count + 1}),
- R.
改造思路
在 consumers 不为空的情况下, 通过 FetchFun 获取消息, 此时可以获取该消息的 header, 取出 Tag 值(如果消息打了 Tag 标记), 然后通过 priority_queue 的 filter/2 方法
- filter(Pred, Q) -> fold(fun(V, P, Acc) ->
- case Pred(V) of
- true -> in(V, P, Acc);
- false -> Acc
- end
- end, new(), Q).
在 Pred 实现中, 我们可以判断当前消息 Tag 值是否被包含在 consumerTag 中, 从而可以过滤出消费特定 tag 的 consumers, 最后向这些 consumers 中的一个发送 Message 消息.
附(队列进程 state 中的 consumers 信息例子)
- {state,
- {queue,
- [{<7874.10509.1537>,
- {consumer,<<"amq.ctag-VGcFg-OvjkY9xv6jqjXySQ">>,true,10,[]}},
- {<7874.3960.1541>,
- {consumer,<<"amq.ctag-W52EK0jEsS51bnMdRuNivg">>,true,10,[]}},
- {<7874.859.1523>,
- {consumer,<<"amq.ctag-NWQwVx3nkUfXmebmmOYzpQ">>,true,10,[]}},
- {<7874.32052.1540>,
- {consumer,<<"amq.ctag-vE7sryPzzJGvwxMFTsulag">>,true,10,
- []}}],
- [{<7874.4076.1541>,
- {consumer,<<"amq.ctag-IFb6d2EnIUU7dFgqEqJs2g">>,true,10,[]}},
- {<7891.3104.1332>,
- {consumer,<<"amq.ctag-jakQZTbwWx3DZQracuuuOQ">>,true,10,[]}},
- {<7891.3047.1332>,
- {consumer,<<"amq.ctag-j-KUXd4rd_qfSE9h7N_rqQ">>,true,10,[]}},
- {<7891.27946.1203>,
- {consumer,<<"amq.ctag-zdc87xrG3f7Y3EFHjLTBhw">>,true,10,[]}},
- {<7891.29647.1475>,
- {consumer,<<"amq.ctag-f3h4rCE6prP9PbkL68rhsg">>,true,10,[]}},
- {<7891.2982.1332>,
- {consumer,<<"amq.ctag-j6McrXJC1KsLlx0XVjonoA">>,true,10,
- []}}],
- 10},
- {active,-573798462590564,0.009264803170815055}},
注: 上述思路建议在测试环境测试, 考虑到有可能出现的性能问题, 作为一个调研也会有很多工作要做, 整个过程会涉及 RabbitMQ 服务端源码改造, 编译, 打包以及客户端的相关改造, 如果能实际尝试下, 也会有不小的收获.
来源: http://www.jianshu.com/p/211d167d4288