- NetMQ是ZeroMQ的C#移植版本,
- 它是对标准socket接口的扩展。它提供了一种异步消息队列,
- 多消息模式,
- 消息过滤(订阅),
- 对多种传输协议的无缝访问。当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.0 - rc5。本文档使用4.0.0 - rc5进行开发。
对 NetMQ 的源码进行学习并分析理解, 因此写下该系列文章, 本系列文章暂定编写计划如下:
NetMQ 有一个 Context 对象, 用于初始化并保存当前 NetMQ 底层的对象状态, 如 IO 线程、回收线程、进程间传输节点字典、插槽 m_slots(用于保存 IO 对象,回收对象和 socket 对象的 Mailbox)、初始化但未用到的 Socket 对象指针数组以及当前 Mailbox(用于接收终止信号) 等。
当创建第一个 Socket 对象时会初始化 IO 线程,回收线程以及工作线程。默认 Socket 数量 1024 个, IO 线程 1 个, 回收线程 1 个。
- m_slots = new Mailbox[m_slotCount];//m_soltCount = 1 + 1 + 1024
保存的是 Context 的 Mailbox。
- m_slots[0]
- m_slots[TermTid] = m_termMailbox;//用于当前Context接收终止信号
保存的是回收对象的 Mailbox, 保存完毕后就会启动回收对象轮询线程。
- m_slots[1]
- m_reaper = new Reaper(this, ReaperTid);//ReaperTid = 1
- m_slots[ReaperTid] = m_reaper.Mailbox;
- m_reaper.Start();
保存的是 IO 线程对象的 Mailbox。
- m_slots[2]
- for (int i = 2; i != ios
- /*ios = 1,默认用1个io线程*/
- + 2; i++) {
- IOThread ioThread = new IOThread(this, i);
- m_ioThreads.Add(ioThread);
- m_slots[i] = ioThread.Mailbox;
- ioThread.Start();
- }
其余 1024 个 slot 保存的是 socket 对象的 Mailbox, 当 socket 还没使用是, slots 保存的是 null, 占个位置, 同时 m_emptySlots。
- //m_soltCount = 1 + 1 + 1024
- for (int i = (int)m_slotCount - 1; >= (int)ios + 2; i--)
- {
- m_emptySlots.Push(i);
- m_slots[i] = null;
- }
无论是什么类型的 Socket 全都是在 Context 中进行创建或释放的。NetMQ 中不同 Socket 都继承自
, 在 Context 未中止且 Socket 未满时, 会从 m_emptySlots 栈中 Pop 出一个未使用的指针。若创建失败, 则重新加回到栈中, 否则更新当前使用的 Socket 的集合加入该 Socket 并更新
- SocketBase
的 Mailbox
- m_slots
- //slot是当前socket在s_slots中的位置,也用于生成SocketBase的`ThreadId`
- int slot = m_emptySlots.Pop();
- // sid是生成并递增的唯一的socket ID,用于SocketBase创建MailBox命名用,并无实际其他作用。
- int sid = Interlocked.Increment(ref s_maxSocketId);
- s = SocketBase.Create(type, this, slot, sid);
- if (s == null)
- {
- m_emptySlots.Push(slot);
- return null;
- }
- m_sockets.Add(s);
- m_slots[slot] = s.Mailbox;
当
要释放某个 SocketBase 时, 最终会调用 Context 的
- Reaper
方法。
- DestroySocket
- tid = socket.ThreadId;
- //重新加入到可用socket栈中
- m_emptySlots.Push(tid);
- //关闭连接
- m_slots[tid].Close();
- //清空引用
- m_slots[tid] = null;
- // 从当前使用socket集合移除
- m_sockets.Remove(socket);
- //若当前接收到中止信号且当前socket全部已释放时停止回收线程
- if (m_terminating && m_sockets.Count == 0)
- m_reaper.Stop();
NetMQ 除了支持 TCP 以外还支持 inproc(进程内通讯),ipc(进程间通讯),pgm 和 epgm(多路广播) 等传输协议。
Context 会用一个字典管理当前使用 inpoc 的 socket。
当 inpoc 的 socket 进行绑定时会加入到字典缓存中。释放时会从字典缓存中移除。当使用 inpoc 协议连接时, 增加当前绑定 inpoc 地址的连接数。
ZObject 是 NetMQ 的
(状态),
- Session
(IO 线程),
- IOThread
(回收线程),
- Repear
(管道),
- Pipe
(所属关系) 对象的基类, 它是包含 2 个信息, 当前全局 Context 对象, 以及当前对象处理的线程 Id。所有 socket 最终都是继承自该对象。因此 ZObject 对象需要知道 IO 对象接收到不同命令时如何进行处理命令。 NetMQ 中一共定义了一下的命令类型
- Own
- public enum CommandType
- {
- // 发送给IO线程表示当前对象需要停止
- Stop,
- // 发送给IO线程表示当前对象需要注册到IO线程中
- Plug,
- // 将创建的对象Session的加入到当前Socket的所属集合中
- Own,
- // 附加engine到Session中
- Attach,
- // 建立session到Socket之间的管道,在握手之前调用inc_seqnum.
- Bind,
- // 通过写管道发送通知给读管道多少信息可读
- ActivateRead,
- // 通过读管道发送通知给写读管道多少信息可写
- ActivateWrite,
- // 创建一个新的管道后通过读管道发送给写管道
- // 参数是管道类型,然而,他的目的地是私有的,因此我们必须用void指针, however,
- Hiccup,
- // 通过读管道发送到写管道告诉他中止所有管道
- PipeTerm,
- // 写管道对PipeTerm命令响应
- PipeTermAck,
- // 通过IO对象发送给socket请求终端IO对象
- TermReq,
- // 通过socket发送给IO对象他自己开始关闭
- Term,
- // 通过IO对象发送给socket让它知道已经关闭
- TermAck,
- // 将关闭套接字的所有权转移给回收线程.
- Reap,
- // 关闭套接字通知回收线程他已经释放
- Reaped,
- // 当所有socket都被释放通过回收线程发送给 term 线程
- Done
- }
根据不同命令类型进行处理, 处理方式由具体的 Socket 子类去重载。
- public void ProcessCommand(Command cmd)
- {
- switch (cmd.CommandType)
- {
- case CommandType.ActivateRead:
- ProcessActivateRead();
- break;
- case CommandType.ActivateWrite:
- ProcessActivateWrite((long)cmd.Arg);
- break;
- case CommandType.Stop:
- ProcessStop();
- break;
- case CommandType.Plug:
- ProcessPlug();
- ProcessSeqnum();
- break;
- case CommandType.Own:
- ProcessOwn((Own)cmd.Arg);
- ProcessSeqnum();
- break;
- case CommandType.Attach:
- ProcessAttach((IEngine)cmd.Arg);
- ProcessSeqnum();
- break;
- case CommandType.Bind:
- ProcessBind((Pipe)cmd.Arg);
- ProcessSeqnum();
- break;
- case CommandType.Hiccup:
- ProcessHiccup(cmd.Arg);
- break;
- case CommandType.PipeTerm:
- ProcessPipeTerm();
- break;
- case CommandType.PipeTermAck:
- ProcessPipeTermAck();
- break;
- case CommandType.TermReq:
- ProcessTermReq((Own)cmd.Arg);
- break;
- case CommandType.Term:
- ProcessTerm((int)cmd.Arg);
- break;
- case CommandType.TermAck:
- ProcessTermAck();
- break;
- case CommandType.Reap:
- ProcessReap((SocketBase)cmd.Arg);
- break;
- case CommandType.Reaped:
- ProcessReaped();
- break;
- default:
- throw new ArgumentException();
- }
- }
当创建进程间通信 socket 时, 会调用 ZObejct 的
将 socket 对象加入到 Context 的使用 inpoc 协议的 socket 字段缓存中, 而 ZObject 实际是调用 Context 的方法
- RegisterEndpoint
, 释放使用 inpoc 协议的 socket 和使用 inpoc 进行连接方式和
- RegisterEndpoint
一样。
- RegisterEndpoint
- protected void RegisterEndpoint(String addr, Ctx.Endpoint endpoint)
- {
- //m_ctx是在ZObejct初始化是传进来的Context引用
- m_ctx.RegisterEndpoint(addr, endpoint);
- }
默认的 IO 线程数量是 1 个, 当然也可以使用多个 IO 线程并发去处理, 因此当创建监听对象或创建连接时则需要进行负载均衡, 平分到多个 IO 线程去处理, 切换 IO 线程也是在 Context 中实现的。
- protected IOThread ChooseIOThread(long affinity)
- {
- return m_ctx.ChooseIOThread(affinity);
- }
- public IOThread ChooseIOThread(long affinity)
- {
- //affinity表示哪些IO线程有资格,默认为0表示所有IO线程都可以处理。
- if (m_ioThreads.Count == 0)
- return null;
- // Find the I/O thread with minimum load.
- int minLoad = -1;
- IOThread selectedIOThread = null;
- for (int i = 0; i != m_ioThreads.Count; i++)
- {
- if (affinity == 0 || (affinity & (1L << i)) > 0)
- {
- //获取IO线程socket载入次数
- int load = m_ioThreads[i].Load;
- //这里对IO线程进行负载均衡
- if (selectedIOThread == null || load < minLoad)
- {
- minLoad = load;
- selectedIOThread = m_ioThreads[i];
- }
- }
- }
- return selectedIOThread;
- }
该篇介绍了 Context 和 ZObject。NetMQ 所有的 socket 对象创建, 释放都离不开 Context, 由于 Context 内部对必要操作都加了锁, 因此它是线程安全的。
来源: