- [NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。
- 当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.1。本文档是对4.0.1分支代码进行分析。
zeromq 的英文文档 NetMQ 的英文文档
对 NetMQ 的源码进行学习并分析理解, 因此写下该系列文章, 本系列文章暂定编写计划如下:
友情提示: 看本系列文章时最好获取源码, 更有助于理解。
Command 定义如下
- internal struct Command
- {
- public Command([CanBeNull] ZObject destination, CommandType type, [CanBeNull] object arg = null) : this()
- {
- Destination = destination;
- CommandType = type;
- Arg = arg;
- }
- [CanBeNull]
- public ZObject Destination { get; }
- public CommandType CommandType { get; }
- [CanBeNull]
- public object Arg { get; private set; }
- public override string ToString()
- {
- return base.ToString() + "[" + CommandType + ", " + Destination + "]";
- }
- }
其包含了 3 个信息:调用者, 命令类型和命令参数。
还记的《消息队列 NetMQ 原理分析 1-Context 和 ZObject》中我们介绍过 NetMQ 中的命令类型吗? 待处理命令全部会存放着
的信箱中。当
- Socket
有命令 (连接完成、发送完成或接受完成等) 需要处理时调用基类
- Socket
的
- ZObject
方法。
- SendCommand
- private void SendCommand([NotNull] Command cmd)
- {
- m_ctx.SendCommand(cmd.Destination.ThreadId, cmd);
- }
实际调用
- ZObject
的 SendCommand 方法
- Context
- public void SendCommand(int threadId, [NotNull] Command command)
- {
- m_slots[threadId].Send(command);
- }
保存的是当前 IO 线程的 IO 信箱
- m_slots[threadId]
, 在《消息队列 NetMQ 原理分析 2-IO 线程和完成端口》 我们简单介绍了
- IOThreadMailbox
的结构。
- IOThreadMailbox
- [NotNull] private readonly YPipe<Command> m_commandPipe = new YPipe<Command>(Config.CommandPipeGranularity, "mailbox");
中维护这一个
- IOThreadMailbox
管道, 该管道实际就是一个先进先出队列,详细解析会在第四章进行介绍。
- Command
- public void Send(Command command)
- {
- bool ok;
- lock (m_sync)
- {
- //向管道写入命令
- m_commandPipe.Write(ref command, false);
- //成功写入会返回false,表示有命令需要处理
- ok = m_commandPipe.Flush();
- }
- if (!ok)
- {
- //向完成端口传递信号
- m_proactor.SignalMailbox(this);
- }
- }
- public bool TryRecv(out Command command)
- {
- return m_commandPipe.TryRead(out command);
- }
- public void RaiseEvent()
- {
- if (!m_disposed)
- {
- m_mailboxEvent.Ready();
- }
- }
的主要就是这三个方法
- IOThreadMailbox
方法向管道 (队列) 写入命令。写完时, 会向完成端口传递信号。
- Send
方法读取
- TryRecv
(实际是信箱的 IO 线程的
- RaiseEvent
方法) 进行处理命令。
- RaiseEvent
- public void SignalMailbox(IOThreadMailbox mailbox)
- {
- //该方法会向完成端口的队列中插入一个信号状态
- m_completionPort.Signal(mailbox);
- }
有关于完成端口介绍请查看《消息队列 NetMQ 原理分析 2-IO 线程和完成端口》
当有命令需要处理时, 完成端口会接收到信号。
- private void Loop()
- {
- ...
- int timeout = ExecuteTimers();
- int removed;
- if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))
- continue;
- for (int i = 0; i < removed; i++)
- {
- try
- {
- if (completionStatuses[i].OperationType == OperationType.Signal)
- {
- var mailbox = (IOThreadMailbox)completionStatuses[i].State;
- mailbox.RaiseEvent();
- }
- ...
- }
- ...
- }
- ...
- }
在线程轮询方法
中, 当接收到需要处理的数据时, 首先会判断是否是信号, 若为信号, 则将状态 (参数) 转化为
- Loop
类型, 同时调用
- IOThreadMailbox
方法处理命令。
- RaiseEvent
- public void Ready()
- {
- Command command;
- while (m_mailbox.TryRecv(out command))
- command.Destination.ProcessCommand(command);
- }
当有命令需要处理时, 会调用
的
- IOThreadMailbox
方法从管道 (队列, 先进先出) 中获取第一个命令进行处理。
- TryRecv
在介绍回收线程工作之前, 我们先看下创建一个新的
做了哪些工作, 这里的
- Socket
实际是 NetMQ 中的
- Socket
。
- SocketBase
- RequestSocket socket = new RequestSocket();
- socket.Connect("tcp://127.0.0.1:12345");
是 NetMQ 的
- NetMQSocket
的基类。
- Socket
- public RequestSocket(string connectionString = null) : base(ZmqSocketType.Req, connectionString, DefaultAction.Connect)
- {
- }
- internal NetMQSocket(ZmqSocketType socketType, string connectionString, DefaultAction defaultAction)
- {
- m_socketHandle = NetMQConfig.Context.CreateSocket(socketType);
- m_netMqSelector = new NetMQSelector();
- Options = new SocketOptions(this);
- m_socketEventArgs = new NetMQSocketEventArgs(this);
- Options.Linger = NetMQConfig.Linger;
- if (!string.IsNullOrEmpty(connectionString))
- {
- var endpoints =
- connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries)
- .Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));
- foreach (string endpoint in endpoints)
- {
- if (endpoint[0] == '@')
- {
- Bind(endpoint.Substring(1));
- }
- else if (endpoint[0] == '>')
- {
- Connect(endpoint.Substring(1));
- }
- else if (defaultAction == DefaultAction.Connect)
- {
- Connect(endpoint);
- }
- else
- {
- Bind(endpoint);
- }
- }
- }
- }
首先会根据
的类型创建对应的
- Socket
, 调用的是
- Socket
的
- Context
方法。具体的请看创建 SocketBase。最终创建方法是调用
- CreateSocket
的
- SocketBase
方法
- Create
- public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId)
- {
- switch (type)
- {
- ...
- case ZmqSocketType.Req:
- return new Req(parent, threadId, socketId);
- ...
- default:
- throw new InvalidException("SocketBase.Create called with invalid type of " + type);
- }
- }
创建完后, 就对地址进行解析。若有多个地址, 则可用, 分隔。
- var endpoints = connectionString.Split(new[] {
- ','
- },
- StringSplitOptions.RemoveEmptyEntries).Select(a = >a.Trim()).Where(a = >!string.IsNullOrEmpty(a));
解析完成后则用默认的方式进行绑定或连接,如
默认为连接, 而
- RequestSocket
则为绑定。
- ResponseSocket
- private static void DecodeAddress([NotNull] string addr, out string address, out string protocol)
- {
- const string protocolDelimeter = "://";
- int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal);
- protocol = addr.Substring(0, protocolDelimeterIndex);
- address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length);
- }
,
- Session
和
- Socket
的关系如图所示
- Session
和
- Socket
,如上图所示。创建管道完毕后需要设置管道的回调事件,管道 1 设置回调为
- Session
的回调方法,管道 2 设置为
- Socket
的回调方法。
- Session
具体关于
和
- Session
的内容请查看《消息队列 NetMQ 原理分析 4-Session、Option 和 Pipe》。
- Pipe
和
- Socket
的关系
- Session
- protected void LaunchChild([NotNull] Own obj)
- {
- // Specify the owner of the object.
- obj.SetOwner(this);
- // Plug the object into the I/O thread.
- SendPlug(obj);
- // Take ownership of the object.
- SendOwn(this, obj);
- }
的宿主设置为该
- Session
- Socket
- private void SetOwner([NotNull] Own owner)
- {
- Debug.Assert(m_owner == null);
- m_owner = owner;
- }
, 当管道有数据交互时,
- Session
的回调方法就会触发。
- Session
- protected void SendPlug([NotNull] Own destination, bool incSeqnum = true)
- {
- if (incSeqnum)
- destination.IncSeqnum();
- SendCommand(new Command(destination, CommandType.Plug));
- }
的
- SessionBase
会被触发
- ProcessPlug
- protected override void ProcessPlug()
- {
- m_ioObject.SetHandler(this);
- if (m_connect)
- StartConnecting(false);
- }
加入到
- Session
的
- Socket
集合中,
- Session
- protected void SendOwn([NotNull] Own destination, [NotNull] Own obj)
- {
- destination.IncSeqnum();
- SendCommand(new Command(destination, CommandType.Own, obj));
- }
的父类方法
- SocketBase
(Own 方法) 方法会被触发,将
- SendOwn
加入到集合中
- Session
- protected override void ProcessOwn(Own obj)
- {
- ...
- // Store the reference to the owned object.
- m_owned.Add(obj);
- }
, 具体的协议类型分析请查看《消息队列 NetMQ 原理分析 6-TCP 和 Inpoc 实现》
- Socket
- private static void DecodeAddress([NotNull] string addr, out string address, out string protocol)
- {
- const string protocolDelimeter = "://";
- int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal);
- protocol = addr.Substring(0, protocolDelimeterIndex);
- address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length);
- }
和
- Socket
的关系
- Session
- protected void LaunchChild([NotNull] Own obj)
- {
- // Specify the owner of the object.
- obj.SetOwner(this);
- // Plug the object into the I/O thread.
- SendPlug(obj);
- // Take ownership of the object.
- SendOwn(this, obj);
- }
的宿主设置为该
- Listener
- Socket
- private void SetOwner([NotNull] Own owner)
- {
- Debug.Assert(m_owner == null);
- m_owner = owner;
- }
, 当管道有数据交互是,
- Listener
的回调方法就会触发。
- Listener
- protected void SendPlug([NotNull] Own destination, bool incSeqnum = true)
- {
- if (incSeqnum)
- destination.IncSeqnum();
- SendCommand(new Command(destination, CommandType.Plug));
- }
的
- Listener
会被触发
- ProcessPlug
- protected override void ProcessPlug()
- {
- m_ioObject.SetHandler(this);
- m_ioObject.AddSocket(m_handle);
- //接收异步socket
- Accept();
- }
加入到
- Listener
的
- Socket
集合中,
- Listener
- protected void SendOwn([NotNull] Own destination, [NotNull] Own obj)
- {
- destination.IncSeqnum();
- SendCommand(new Command(destination, CommandType.Own, obj));
- }
的父类方法
- SocketBase
(Own 方法) 方法会被触发,将
- SendOwn
加入到集合中
- Listener
- protected override void ProcessOwn(Own obj)
- {
- ...
- // Store the reference to the owned object.
- m_owned.Add(obj);
- }
的创建处理就完成了
- SocketBase
(垃圾)回收线程是专门处理 (清理) 异步关闭的
的线程, 它在 NetMQ 中起到至关重要的作用。
- Socket
- internal class Reaper : ZObject, IPollEvents
- {
- ...
- }
是一个 ZObject 对象, 同时实现了
- Reaper
接口, 该接口的作用是当有信息接收或发送时进行处理。回收线程实现了
- IPollEvents
方法。
- InEvent
- internal interface IPollEvents : ITimerEvent
- {
- void InEvent();
- void OutEvent();
- }
方法实现和 IO 线程的
- InEvent
方法很像, 都是遍历需要处理的命令进行处理。
- Ready
- public void InEvent()
- {
- while (true)
- {
- Command command;
- if (!m_mailbox.TryRecv(0, out command))
- break;
- command.Destination.ProcessCommand(command);
- }
- }
当有
需要释放时, 会向完成端口发送
- SocketBase
信号。
- Reap
- public void Close()
- {
- // Mark the socket as disposed
- m_disposed = true;
- //工作线程向Socket邮箱发送Reap信号
- //回收线程会做剩下的工作
- SendReap(this);
- }
向回收线程的邮箱发送当前
的回收命令
- SocketBase
- protected void SendReap([NotNull] SocketBase socket)
- {
- SendCommand(new Command(m_ctx.GetReaper(), CommandType.Reap, socket));
- }
接收到释放信号进行处理
- Reap
- protected override void ProcessReap(SocketBase socket)
- {
- // Add the socket to the poller.
- socket.StartReaping(m_poller);
- ++m_sockets;
- }
的加入到回收线程的中,当
- Socket
接收到数据时,由回收线程回调该 Socket 的处理事件进行处理。
- Socket
- internal void StartReaping([NotNull] Poller poller)
- {
- m_poller = poller;
- m_handle = m_mailbox.Handle;
- m_poller.AddHandle(m_handle, this);
- m_poller.SetPollIn(m_handle);
- Terminate();
- CheckDestroy();
- }
时,直接终止即可
- Socket
默认情况下
的
- NetMQ
值被设置为 - 1,就是说如果网络读写没有进行完是不能退出的。如果
- Linger
被设置为 0, 那么中断时会丢弃一切未完成的网络操作。如果
- Linger
被设置的大于 0,那么将等待
- Linger
毫秒用来完成未完成的网络读写,在指定的时间里完成或者超时都会立即返回。
- Linger
, 则需要发送请求清理关联 Socket 的当前
- Session
对象
- Session
- protected void Terminate()
- {
- ...
- if (m_owner == null)
- {
- // 释放的是Socket,Owner为空
- ProcessTerm(m_options.Linger);
- }
- else
- {
- // 释放的是Session则会关联一个Socket
- SendTermReq(m_owner, this);
- }
- }
时,需要先中断当前
- SocketBase
关联的
- SocketBase
- SessionBase
集合
- Session
全部终止后发送给当前
- Session
宿主终端响应 (TermAck)
- Socket
- protected override void ProcessTerm(int linger)
- {
- ...
- // 断开所有session的连接
- foreach (Own it in m_owned)
- {
- SendTerm(it, linger);
- }
- RegisterTermAcks(m_owned.Count);
- m_owned.Clear();
- CheckTermAcks();
- }
- Session
大于 0 则等到 N 毫秒后再终止终止
- Ligner
和
- Socket
之间的管道
- Session
- protected override void ProcessTerm(int linger)
- {
- if (m_pipe == null)
- {
- ProceedWithTerm();
- return;
- }
- m_pending = true;
- if (linger > 0)
- {
- Debug.Assert(!m_hasLingerTimer);
- m_ioObject.AddTimer(linger, LingerTimerId);
- m_hasLingerTimer = true;
- }
- // 是否需要等待一定时间后消息处理完再终止管道.
- m_pipe.Terminate(linger != 0);
- // TODO: Should this go into pipe_t::terminate ?
- // In case there's no engine and there's only delimiter in the
- // pipe it wouldn't be ever read. Thus we check for it explicitly.
- m_pipe.CheckRead();
- }
- private enum State
- {
- /// <summary> Active 表示在中断命令开始前的状态 </summary>
- Active,
- /// <summary> Delimited 表示在终端命令接收前从管道接收到分隔符</summary>
- Delimited,
- /// <summary> Pending 表示中断命令已经从管道接收,但是仍有待定消息可读</summary>
- Pending,
- /// <summary> Terminating 表示所有待定消息都已经读取等待管道终止确认信号返回 </summary>
- Terminating,
- /// <summary> Terminated 表示终止命令是由用户显示调用 </summary>
- Terminated,
- /// <summary> Double_terminated 表示用户调用了终止命令同时管道也调用了终止命令 </summary>
- DoubleTerminated
- }
、
- Terminated
和
- DoubleTerminated
不再处理终止命令
- Terminating
- public void Terminate(bool delay)
- {
- //判断当前状态是否可处理终止命令
- ...
- if (m_state == State.Active)
- {
- // 向另一个管道发送终止命令然后等待确认终止
- SendPipeTerm(m_peer);
- m_state = State.Terminated;
- }
- else if (m_state == State.Pending && !m_delay)
- {
- // 若有待处理数据,但是不等待直接终止,则向另一个管道发送确认终止.
- m_outboundPipe = null;
- SendPipeTermAck(m_peer);
- m_state = State.Terminating;
- }
- else if (m_state == State.Pending)
- {
- //若有待处理数据但是需要等到则不处理.
- }
- else if (m_state == State.Delimited)
- {
- //若已经获取到限定符但是还没有收到终止命令则忽略定界符,然后发送终止命令给另一个管道
- SendPipeTerm(m_peer);
- m_state = State.Terminated;
- }
- else
- {
- // 没有其他状态
- Debug.Assert(false);
- }
- //停止向外发送的消息
- m_outActive = false;
- if (m_outboundPipe != null)
- {
- //抛弃未发送出的消息.
- Rollback();
- // 这里不会再先查水位,所以即使管道满了也可再写入,向管道写入定界符 .
- var msg = new Msg();
- msg.InitDelimiter();
- m_outboundPipe.Write(ref msg, false);
- Flush();
- }
- }
- protected override void ProcessPipeTerm()
- {
- // 这是一个简单的例子有道管道终止
- //若没有更多待处理消息需要读取,或者这个管道已经丢去待处理数据,我们直接将状态设置为正在终止(terminating),否则我们搁置待处理状态直到所有待处理消息被发送
- if (m_state == State.Active)
- {
- if (!m_delay)
- {
- //不需要等到消息处理
- m_state = State.Terminating;
- m_outboundPipe = null;
- //发送终止确认
- SendPipeTermAck(m_peer);
- }
- else
- m_state = State.Pending;
- return;
- }
- // 若定界符碰巧在终止命令之前到达,将状态改为正在终止
- if (m_state == State.Delimited)
- {
- m_state = State.Terminating;
- m_outboundPipe = null;
- SendPipeTermAck(m_peer);
- return;
- }
- // 当管道并发关闭,则状态改为DoubleTerminated
- if (m_state == State.Terminated)
- {
- m_state = State.DoubleTerminated;
- m_outboundPipe = null;
- SendPipeTermAck(m_peer);
- return;
- }
- // pipe_term is invalid in other states.
- Debug.Assert(false);
- }
整体回收
- protected override void ProcessPipeTermAck()
- {
- // 通知Socket或Session中断当前管道 .
- Debug.Assert(m_sink != null);
- m_sink.Terminated(this);
- // 若正则处理或double_terminated这里不做任何事
- // 简化释放管道,在已终止状态,我们必须在释放这个管道之前确认
- //其他状态都是非法的
- if (m_state == State.Terminated)
- {
- m_outboundPipe = null;
- SendPipeTermAck(m_peer);
- }
- else
- Debug.Assert(m_state == State.Terminating || m_state == State.DoubleTerminated);
- // 删除所有管道中的未读消息,然后释放流入管道
- var msg = new Msg();
- while (m_inboundPipe.TryRead(out msg))
- {
- msg.Close();
- }
- m_inboundPipe = null;
- }
流程图如下:
- Socket
- public virtual void InEvent()
- {
- // 回收线程命令会调用此事件
- try
- {
- ProcessCommands(0, false);
- }
- catch
- {
- // ignored
- }
- finally
- {
- CheckDestroy();
- }
- }
- private void CheckDestroy()
- {
- // socket释放完则做最后的清除和释放工作.
- if (m_destroyed)
- {
- // 从回收线程移除轮询
- m_poller.RemoveHandle(m_handle);
- // 释放socke.
- DestroySocket(this);
- // 通知已释放.
- SendReaped();
- // Deallocate.
- base.ProcessDestroy();
- }
- }
该篇介绍命令处理方式和回收线程回收
,顺便介绍了下创建
- Socket
的细节性问题。以便对释放
- SocketBase
有更清晰的认识。
- Socket
来源: http://www.cnblogs.com/Jack-Blog/p/6774902.html