这一篇开始讲解 moqutte 对 SUBSCRIBE 报文的处理
代码不复杂
- public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
- String clientID = NettyUtils.clientID(channel);// 从 channel 里面获取 clientId, 具体原理看下文
- int messageID = messageId(msg);
- LOG.info("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);
- RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
- SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
- if (currentStatus != null) {
- LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
- clientID, messageID);
- return;
- }
- String username = NettyUtils.userName(channel);
- List<MqttTopicSubscription> ackTopics = doVerify(clientID, username, msg);
- MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);
- if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
- LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}," +
- "messageId={}", clientID, messageID);
- return;
- }
- LOG.info("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics);
- List<Subscription> newSubscriptions = doStoreSubscription(ackTopics, clientID);
- // save session, persist subscriptions from session
- for (Subscription subscription : newSubscriptions) {
- subscriptions.add(subscription.asClientTopicCouple());
- }
- LOG.info("Sending SUBACK response CId={}, messageId={}", clientID, messageID);
- channel.writeAndFlush(ackMessage);
- // fire the persisted messages in session
- for (Subscription subscription : newSubscriptions) {
- publishRetainedMessagesInSession(subscription, username);
- }
- boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
- if (!success) {
- LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
- }
- }
1.channel 里面为什么会存在 clientid 呢? 这个问题也可以这样描述, 当连接建立之后, client 发布消息的时候, netty 接收到 socket 里面的数据之后, 他怎么知道是哪个 client 的数据呢? 这里面就需要确定 client 与 channel 的映射关系 moquette 是这么做的,
在处理 CONNECT 的第 5 步, 详见 http://blog.51cto.com/13579730/2073630 的时候会做如下处理
- private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, final String clientId) {
- int keepAlive = msg.variableHeader().keepAliveTimeSeconds();
- LOG.info("Configuring connection. CId={}", clientId);
- NettyUtils.keepAlive(channel, keepAlive);
- // session.attr(NettyUtils.ATTR_KEY_CLEANSESSION).set(msg.variableHeader().isCleanSession());
- NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession());
- // used to track the client in the subscription and publishing phases.
- // session.attr(NettyUtils.ATTR_KEY_CLIENTID).set(msg.getClientID());
- NettyUtils.clientID(channel, clientId);
- int idleTime = Math.round(keepAlive * 1.5f);
- setIdleTime(channel.pipeline(), idleTime);
- if(LOG.isDebugEnabled()){
- LOG.debug("The connection has been configured CId={}, keepAlive={}, cleanSession={}, idleTime={}",
- clientId, keepAlive, msg.variableHeader().isCleanSession(), idleTime);
- }
- }
这里面有一步 NettyUtils.clientID(channel, clientId); 这个不起眼的方法做了将 channel 与 clientId 映射的动作, 接着跟踪
- public static void clientID(Channel channel, String clientID) {
- channel.attr(NettyUtils.ATTR_KEY_CLIENTID).set(clientID);
- }
原来是把 clientId 作为一个属性存到了 channel 里面, 因为 channel 是集成 AttributeMap 的, 所以可以这么做
只要有 channel 与 clientId 的映射关系, 就好说了, 这也就是为什么 moquette 的 NettyMQTTHandler 是这样处理的
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object message) {
- MqttMessage msg = (MqttMessage) message;
- MqttMessageType messageType = msg.fixedHeader().messageType();
- if(LOG.isDebugEnabled())
- LOG.debug("Processing MQTT message, type={}", messageType);
- try {
- switch (messageType) {
- case CONNECT:
- m_processor.processConnect(ctx.channel(), (MqttConnectMessage) msg);
- break;
- case SUBSCRIBE:
- m_processor.processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg);
- break;
- case UNSUBSCRIBE:
- m_processor.processUnsubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
- break;
- case PUBLISH:
- m_processor.processPublish(ctx.channel(), (MqttPublishMessage) msg);
- break;
- case PUBREC:
- m_processor.processPubRec(ctx.channel(), msg);
- break;
- case PUBCOMP:
- m_processor.processPubComp(ctx.channel(), msg);
- break;
- case PUBREL:
- m_processor.processPubRel(ctx.channel(), msg);
- break;
- case DISCONNECT:
- m_processor.processDisconnect(ctx.channel());
- break;
- case PUBACK:
- m_processor.processPubAck(ctx.channel(), (MqttPubAckMessage) msg);
- break;
- case PINGREQ:
- MqttFixedHeader pingHeader = new MqttFixedHeader(
- MqttMessageType.PINGRESP,
- false,
- AT_MOST_ONCE,
- false,
- 0);
- MqttMessage pingResp = new MqttMessage(pingHeader);
- ctx.writeAndFlush(pingResp);
- break;
- default:
- LOG.error("Unkonwn MessageType:{}", messageType);
- break;
哪个 tcp-socket 对应哪个 channel 由 netty 负责处理, 当 client 发送数据的时候, netty 负责从 ChannelHandlerContext 取出 channel 传给相应的业务自定义的 handler 进行处理
2. 创建一个正在运行中的 RunningSubscription 对象, 之所以要创建这个对象, 是为了防止重复订阅, 同时到存储了所有的 RunningSubscription 的 ConcurrentMap 里面查询所有已经存在这个对象, 如果存在, 说明是重复的订阅包, 则不处理, 这里面调用了 putIfAbsent 方法, 同时重写了 RunningSubscription 的 equals 方法 packetId 和 clientID 相同时代表是相同的 RunningSubscription
3. 从 channel 里面取出用户名, 验证该 client 下的该 username 是否有权利读取该 topic(订阅该 topic) 的权限, 这里贴一下相关的代码进行讲解
- rivate List<MqttTopicSubscription> doVerify(String clientID, String username, MqttSubscribeMessage msg) {
- ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);
- List<MqttTopicSubscription> ackTopics = new ArrayList<>();
- final int messageId = messageId(msg);
- for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
- Topic topic = new Topic(req.topicName());
- if (!m_authorizator.canRead(topic, username, clientSession.clientID)) {
- // send SUBACK with 0x80, the user hasnt credentials to read the topic
- LOG.error("Client does not have read permissions on the topic CId={}, username={}, messageId={}," +
- "topic={}", clientID, username, messageId, topic);
- ackTopics.add(new MqttTopicSubscription(topic.toString(), FAILURE));
- } else {
- MqttQoS qos;
- if (topic.isValid()) {
- LOG.info("Client will be subscribed to the topic CId={}, username={}, messageId={}, topic={}",
- clientID, username, messageId, topic);
- qos = req.qualityOfService();
- } else {
- LOG.error("Topic filter is not valid CId={}, username={}, messageId={}, topic={}", clientID,
- username, messageId, topic);
- qos = FAILURE;
- }
- ackTopics.add(new MqttTopicSubscription(topic.toString(), qos));
- }
- }
- return ackTopics;
- }
从报文的 payload 里面取出所有的订阅请求, 遍历, 然后验证是否有权限, 这个权限是在配置文件里面配置的, 详见 http://blog.51cto.com/13579730/2072467
如果没有权限, 返回 SUBACK 报文中标记该订阅状态为失败, 如果有权限, 检查 topic 是否有效如果有效, 获取 qos, 如果无效标记为失败
校验之后得到一个 List<MqttTopicSubscription>, 再根据这个 list 生成 SUBACK
4. 将 RunningSubscription 的状态从 VERIFIED 修改成 STORED, 这里面用到了 ConcurrentHashMap.replace(key,oldvalue,newvlaue) 这个原子操作, 如果修改失败表面, 这个订阅请求已经存在
5. 开始存储订阅请求, 这里存储订阅请求
- private List<Subscription> doStoreSubscription(List<MqttTopicSubscription> ackTopics, String clientID) {
- ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);
- List<Subscription> newSubscriptions = new ArrayList<>();
- for (MqttTopicSubscription req : ackTopics) {
- // TODO this is SUPER UGLY
- if (req.qualityOfService() == FAILURE) {
- continue;
- }
- Subscription newSubscription =
- new Subscription(clientID, new Topic(req.topicName()), req.qualityOfService());
- clientSession.subscribe(newSubscription);// 存储到用户的 session 里面, 用以表明该 client 订阅了哪些请求
- newSubscriptions.add(newSubscription);
- }
- return newSubscriptions;
- }
我们先看存储到用户的 session 这一步
- public boolean subscribe(Subscription newSubscription) {
- LOG.info("Adding new subscription. ClientId={}, topics={}, qos={}", newSubscription.getClientId(),
- newSubscription.getTopicFilter(), newSubscription.getRequestedQos());
- boolean validTopic = newSubscription.getTopicFilter().isValid();
- if (!validTopic) {
- LOG.warn("The topic filter is not valid. ClientId={}, topics={}", newSubscription.getClientId(),
- newSubscription.getTopicFilter());
- // send SUBACK with 0x80 for this topic filter
- return false;
- }
- ClientTopicCouple matchingCouple = new ClientTopicCouple(this.clientID, newSubscription.getTopicFilter());
- Subscription existingSub = subscriptionsStore.getSubscription(matchingCouple);
- // update the selected subscriptions if not present or if has a greater qos
- if (existingSub == null || existingSub.getRequestedQos().value() < newSubscription.getRequestedQos().value()) {
- if (existingSub != null) {
- LOG.info("Subscription already existed with a lower QoS value. It will be updated. ClientId={}," +
- "topics={}, existingQos={}, newQos={}", newSubscription.getClientId(),
- newSubscription.getTopicFilter(), existingSub.getRequestedQos(), newSubscription.getRequestedQos());
- subscriptions.remove(newSubscription);
- }
- subscriptions.add(newSubscription);// 存储到内存的 session
- subscriptionsStore.addNewSubscription(newSubscription);// 存储到别的地方
- }
- return true;
- }
这里面先创建了一个 ClientTopicCouple 对, 然后从订阅集合里面查询是否已经存在这个订阅, 如果不存在或者新的订阅的 qos 要高于就的订阅的 qos, 则会把订阅添加到订阅集合里面, 这里有两个存储, 一个是 Set<Subscription>, 一个是 Map<Topic, Subscription> subscriptions(这个在 ISessionsStore 的具体实现里面)moquette 在这里面做了冗余, 即内存里面会存一分, 同时允许通过 ISessionsStore 存储到外部
6. 我们接着看 processSubscribe, 这个方法会返回一个新的 list
接着会遍历这个返回的 list, 存储到 SubscriptionsDirectory 里面, 这个维护所有的 client 直接的发布订阅关系, 是 moquette 里面一个非常重要的对象了, 里面维护者一颗 topic 树, 这个后面单独讲
7. 发送 SUBACK
8. 发布 retain 消息, 这里面也讲解一下, 这一步的作用在于, 如果一个 client 发布了新的订阅, 那么必须遍历那些 retain 消息, 如果这些新的订阅, 确实能够匹配这些 retain 消息, 必须将这些 retain 消息发送给他们 // 这里 moquette 的处理是遍历 map, 这样的话, 当 retain 消息特别大的时候, 效率是非常低的, 会很容易拖垮那些对吞吐率和性能要求比较高的系统的
- private void publishRetainedMessagesInSession(final Subscription newSubscription, String username) {
- LOG.info("Retrieving retained messages CId={}, topics={}", newSubscription.getClientId(),
- newSubscription.getTopicFilter());
- // scans retained messages to be published to the new subscription
- // TODO this is ugly, it does a linear scan on potential big dataset
- Collection<IMessagesStore.StoredMessage> messages = m_messagesStore.searchMatching(new IMatchingCondition() {
- @Override
- public boolean match(Topic key) {
- return key.match(newSubscription.getTopicFilter());
- }
- });
- if (!messages.isEmpty()) {
- LOG.info("Publishing retained messages CId={}, topics={}, messagesNo={}",
- newSubscription.getClientId(), newSubscription.getTopicFilter(), messages.size());
- }
- ClientSession targetSession = m_sessionsStore.sessionForClient(newSubscription.getClientId());
- this.internalRepublisher.publishRetained(targetSession, messages);
- // notify the Observables
- m_interceptor.notifyTopicSubscribed(newSubscription, username);
- }
另外, 用以匹配订阅的 topic 与 retain 消息的 topic 是否匹配的方法也非常不完善具体的原因大家可以看一下这里
io.moquette.spi.impl.subscriptions.Topic#match
9. 从 ConcurrentMap<RunningSubscription, SubscriptionState > 移除该订阅请求
整个 RunningSubscription 的状态会从 VERIFIED 到 STORED, 这代表了整个处理过程的最重要的两个步骤
来源: http://www.bubuko.com/infodetail-2510288.html