这一章节我们将详细描述网络关键API的实现,主要包括Listen、Accept、Read、Write等。 另外,为了突出关键流程,我们选择忽略所有的错误。这样可以使得代码看起来更为简单。 而且我们只关注tcp协议实现,udp和unix socket不是我们关心的。
- func Listen(net, laddr string)(Listener, error) {
- la,
- err: =resolveAddr("listen", net, laddr, noDeadline)......
- switch la: =la.toAddr(). (type) {
- case * TCPAddr: l,
- err = ListenTCP(net, la)
- case * UnixAddr: ......
- }......
- }
- // 对于tcp协议,返回的的是TCPListener
- func ListenTCP(net string, laddr * TCPAddr)( * TCPListener, error) {......fd,
- err: =internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen")......
- return & TCPListener {
- fd
- },
- nil
- }
- func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string)(fd * netFD, err error) {......
- return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)
- }
- func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time)(fd * netFD, err error) {
- // 创建底层socket,设置属性为O_NONBLOCK
- s,
- err: =sysSocket(family, sotype, proto)......setDefaultSockopts(s, family, sotype, ipv6only)
- // 创建新netFD结构
- fd,
- err = newFD(s, family, sotype, net)......
- if laddr != nil && raddr == nil {
- switch sotype {
- case syscall.SOCK_STREAM,
- syscall.SOCK_SEQPACKET:
- // 调用底层listen监听创建的套接字
- fd.listenStream(laddr, listenerBacklog) return fd,
- nil
- case syscall.SOCK_DGRAM:
- ......
- }
- }
- }
- // 最终调用该函数来创建一个socket
- // 并且将socket属性设置为O_NONBLOCK
- func sysSocket(family, sotype, proto int)(int, error) {
- syscall.ForkLock.RLock() s,
- err: =syscall.Socket(family, sotype, proto) if err == nil {
- syscall.CloseOnExec(s)
- }
- syscall.ForkLock.RUnlock() if err != nil {
- return - 1,
- err
- }
- if err = syscall.SetNonblock(s, true);
- err != nil {
- syscall.Close(s) return - 1,
- err
- }
- return s,
- nil
- }
- func(fd * netFD) listenStream(laddr sockaddr, backlog int) error {
- if err: =setDefaultListenerSockopts(fd.sysfd) if lsa,
- err: =laddr.sockaddr(fd.family);
- err != nil {
- return err
- } else if lsa != nil {
- // Bind绑定至该socket
- if err: =syscall.Bind(fd.sysfd, lsa);
- err != nil {
- return os.NewSyscallError("bind", err)
- }
- }
- // 监听该socket
- if err: =syscall.Listen(fd.sysfd, backlog);
- // 这里非常关键:初始化socket与异步IO相关的内容
- if err: =fd.init();
- err != nil {
- return err
- }
- lsa,
- _: =syscall.Getsockname(fd.sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil
- }
我们这里看到了如何实现Listen。流程基本都很简单,但是因为我们使用了异步编程,因此,我们在Listen完该socket后,还必须将其添加到监听队列中,以后该socket有事件到来时能够及时通知到。
对linux有所了解的应该都知道epoll,没错golang使用的就是epoll机制来实现socket事件通知。那我们看对一个监听socket,是如何将其添加到epoll的监听队列中呢?
- func(fd * netFD) init() error {
- if err: =fd.pd.Init(fd);
- err != nil {
- return err
- }
- return nil
- }
- func(pd * pollDesc) Init(fd * netFD) error {
- // 利用了Once机制,保证一个进程只会执行一次
- // runtime_pollServerInit:
- // TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0
- // JMP runtime·netpollServerInit(SB)
- serverInit.Do(runtime_pollServerInit)
- // runtime_pollOpen:
- // TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0
- // JMP runtime·netpollOpen(SB)
- ctx,
- errno: =runtime_pollOpen(uintptr(fd.sysfd)) if errno != 0 {
- return syscall.Errno(errno)
- }
- pd.runtimeCtx = ctx
- return nil
- }
这里就是socket异步编程的关键:
netpollServerInit()初始化异步编程结构,对于epoll,该函数是netpollinit,且使用Once机制保证一个进程 只会初始化一次;
- func netpollinit() {
- epfd = epollcreate1(_EPOLL_CLOEXEC)
- if epfd >= 0 {
- return
- }
- epfd = epollcreate(1024)
- if epfd >= 0 {
- closeonexec(epfd)
- return
- }
- ......
- }
netpollOpen则在socket被创建出来后将其添加到epoll队列中,对于epoll,该函数被实例化为netpollopen。
- func netpollopen(fd uintptr, pd *pollDesc) int32 {
- var ev epollevent
- ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
- *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
- return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
- }
OK,看到这里,我们也就明白了,监听一个套接字的时候无非就是传统的socket异步编程,然后将该socket添加到 epoll的事件监听队列中。
既然我们描述的重点的tcp协议,因此,我们看看TCPListener的Accept方法是怎么实现的:
- func(l * TCPListener) Accept()(Conn, error) {
- c,
- err: =l.AcceptTCP()......
- }
- func(l * TCPListener) AcceptTCP()( * TCPConn, error) {......fd,
- err: =l.fd.accept()......
- // 返回给调用者一个新的TCPConn
- return newTCPConn(fd),
- nil
- }
- func(fd * netFD) accept()(netfd * netFD, err error) {
- // 为什么对该函数加读锁?
- if err: =fd.readLock();
- err != nil {
- return nil,
- err
- }
- defer fd.readUnlock()......
- for {
- // 这个accept是golang包装的系统调用
- // 用来处理跨平台
- s,
- rsa,
- err = accept(fd.sysfd) if err != nil {
- if err == syscall.EAGAIN {
- // 如果没有可用连接,WaitRead()阻塞该协程
- // 后面会详细分析WaitRead.
- if err = fd.pd.WaitRead();
- err == nil {
- continue
- }
- } else if err == syscall.ECONNABORTED {
- // 如果连接在Listen queue时就已经被对端关闭
- continue
- }
- }
- break
- }
- netfd,
- err = newFD(s, fd.family, fd.sotype, fd.net)......
- // 这个前面已经分析,将该fd添加到epoll队列中
- err = netfd.init()......lsa,
- _: =syscall.Getsockname(netfd.sysfd) netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) return netfd,
- nil
- }
OK,从前面的编程事例中我们知道,一般在主协程中会accept新的connection,使用异步编程我们知道,如果没有 新连接到来,该协程会一直被阻塞,直到新连接到来有人唤醒了该协程。
一般在主协程中调用accept,如果返回值为EAGAIN,则调用WaitRead来阻塞当前协程,后续在该socket有事件到来时被唤醒,WaitRead以及唤醒过程我们会在后面仔细分析。
- func (c *conn) Read(b []byte) (int, error) {
- if !c.ok() {
- return 0, syscall.EINVAL
- }
- return c.fd.Read(b)
- }
- func (fd *netFD) Read(p []byte) (n int, err error) {
- // 为什么对函数调用加读锁
- if err := fd.readLock(); err != nil {
- return 0, err
- }
- defer fd.readUnlock()
- // 这个又是干嘛?
- if err := fd.pd.PrepareRead(); err != nil {
- return 0, &OpError{"read", fd.net, fd.raddr, err}
- }
- for {
- n, err = syscall.Read(int(fd.sysfd), p)
- if err != nil {
- n = 0
- // 如果返回EAGIN,阻塞当前协程直到有数据可读被唤醒
- if err == syscall.EAGAIN {
- if err = fd.pd.WaitRead(); err == nil {
- continue
- }
- }
- }
- // 检查错误,封装io.EOF
- err = chkReadErr(n, err, fd)
- break
- }
- if err != nil && err != io.EOF {
- err = &OpError{"read", fd.net, fd.raddr, err}
- }
- return
- }
- func chkReadErr(n int, err error, fd *netFD) error {
- if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
- return io.EOF
- }
- return err
- }
Read的流程与Accept流程极其一致,阅读起来也很简单。相信不用作过多解释,自己看吧。 需要注意的是每次Read不能保证可以读到想读的那么多内容,比如缓冲区大小是10,而实际可能只读到5,应用程序需要能够处理这种情况。
- func (fd *netFD) Write(p []byte) (nn int, err error) {
- // 为什么这里加写锁
- if err := fd.writeLock(); err != nil {
- return 0, err
- }
- defer fd.writeUnlock()
- // 这个是干什么?
- if err := fd.pd.PrepareWrite(); err != nil {
- return 0, &OpError{"write", fd.net, fd.raddr, err}
- }
- // nn记录总共写入的数据量,每次Write可能只能写入部分数据
- for {
- var n int
- n, err = syscall.Write(int(fd.sysfd), p[nn:])
- if n > 0 {
- nn += n
- }
- // 如果数组数据已经全部写完,函数返回
- if nn == len(p) {
- break
- }
- // 如果写入数据时被block了,阻塞当前协程
- if err == syscall.EAGAIN {
- if err = fd.pd.WaitWrite(); err == nil {
- continue
- }
- }
- if err != nil {
- n = 0
- break
- }
- // 如果返回值为0,代表了什么?
- if n == 0 {
- err = io.ErrUnexpectedEOF
- break
- }
- }
- if err != nil {
- err = &OpError{"write", fd.net, fd.raddr, err}
- }
- return nn, err
- }
注意Write语义与Read不一样的地方:
Write尽量将用户缓冲区的内容全部写入至底层socket,如果遇到socket暂时不可写入,会阻塞当前协程; Read在某次读取成功时立即返回,可能会导致读取的数据量少于用户缓冲区的大小; 为什么会在实现上有此不同,我想可能read的优先级比较高吧,应用程序可能一直在等着,我们不能等到数据一直读完才返回,会阻塞用户。 而写不一样,优先级相对较低,而且用户一般也不着急写立即返回,所以可以将所有的数据全部写入,而且这样 也能简化应用程序的写法。
上面我们基本说完了golang网络编程内的关键API流程,我们遗留了一个关键内容:当系统调用返回EAGAIN时,会 调用WaitRead/WaitWrite来阻塞当前协程,我会在接下来的章节中继续分析。
来源: https://juejin.im/entry/5a24c5456fb9a044fa19b086