这里有新鲜出炉的GO语言教程,程序狗速度看过来!
Go是一种新的语言,一种并发的、带垃圾回收的、快速编译的语言。Go是谷歌2009年发布的第二款编程语言。2009年7月份,谷歌曾发布了Simple语言,它是用来开发Android应用的一种BASIC语言。
常见的IO模型有阻塞、非阻塞、IO多路复用,异,下面这篇文章主要给大家介绍了关于golang将多路复异步io转成阻塞io的方法,文中给出了详细的示例代码,需要的朋友可以参考借鉴,下面随着小编来一起学习学习吧。
前言
本文主要给大家介绍了关于golang 如何将多路复异步io转变成阻塞io的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍:
- package main
- import (
- "net"
- )
- func handleConnection(c net.Conn) {
- //读写数据
- buffer := make([]byte, 1024)
- c.Read(buffer)
- c.Write([]byte("Hello from server"))
- }
- func main() {
- l, err := net.Listen("tcp", "host:port")
- if err != nil {
- return
- }
- defer l.Close()
- for {
- c, err := l.Accept()
- if err!= nil {
- return
- }
- go handleConnection(c)
- }
- }
对于我们都会写上面的代码,很简单,的确golang的网络部分对于我们隐藏了太多东西,我们不用像c++一样去调用底层的socket函数,也不用去使用epoll等复杂的io多路复用相关的逻辑,但是上面的代码真的就像我们看起来的那样在调用accept和read时阻塞吗?
- // Multiple goroutines may invoke methods on a Conn simultaneously.
- //官方注释:多个goroutines可能同时调用方法在一个连接上,我的理解就是所谓的惊群效应吧
- //换句话说就是你多个goroutines监听同一个连接同一个事件,所有的goroutines都会触发,
- //这只是我的猜测,有待验证。
- type Conn interface {
- Read(b []byte) (n int, err error)
- Write(b []byte) (n int, err error)
- Close() error
- LocalAddr() Addr
- RemoteAddr() Addr
- SetDeadline(t time.Time) error
- SetReadDeadline(t time.Time) error
- SetWriteDeadline(t time.Time) error
- }
- type conn struct {
- fd *netFD
- }
这里面又一个Conn接口,下面conn实现了这个接口,里面只有一个成员netFD.
- // Network file descriptor.
- type netFD struct {
- // locking/lifetime of sysfd + serialize access to Read and Write methods
- fdmu fdMutex
- // immutable until Close
- sysfd int family int sotype int isConnected bool net string laddr Addr raddr Addr
- // wait server
- pd pollDesc
- }
- func(fd * netFD) accept()(netfd * netFD, err error) {
- //................
- for {
- s,
- rsa,
- err = accept(fd.sysfd) if err != nil {
- nerr,
- ok: =err. ( * os.SyscallError) if ! ok {
- return nil,
- err
- }
- switch nerr.Err {
- /* 如果错误是EAGAIN说明Socket的缓冲区为空,未读取到任何数据
- 则调用fd.pd.WaitRead,*/
- case syscall.EAGAIN:
- if err = fd.pd.waitRead();
- err == nil {
- continue
- }
- case syscall.ECONNABORTED:
- continue
- }
- return nil,
- err
- }
- break
- }
- //.........
- //代码过长不再列出,感兴趣看go的源码,runtime 下的fd_unix.go
- return netfd,
- nil
- }
上面代码段是accept部分,这里我们注意当accept有错误发生的时候,会检查这个错误是否是
,如果是,则调用WaitRead将当前读这个fd的goroutine在此等待,直到这个fd上的读事件再次发生为止。当这个socket上有新数据到来的时候,WaitRead调用返回,继续for循环的执行,这样以来就让调用netFD的Read的地方变成了同步“阻塞”。有兴趣的可以看netFD的读和写方法,都有同样的实现。
- syscall.EAGAIN
到这里所有的疑问都集中到了pollDesc上,它到底是什么呢?
- const (
- pdReady uintptr = 1
- pdWait uintptr = 2
- )
- // Network poller descriptor.
- type pollDesc struct {
- link *pollDesc // in pollcache, protected by pollcache.lock
- lock mutex // protects the following fields
- fd uintptr
- closing bool
- seq uintptr // protects from stale timers and ready notifications
- rg uintptr // pdReady, pdWait, G waiting for read or nil
- rt timer // read deadline timer (set if rt.f != nil)
- rd int64 // read deadline
- wg uintptr // pdReady, pdWait, G waiting for write or nil
- wt timer // write deadline timer
- wd int64 // write deadline
- user uint32 // user settable cookie
- }
- type pollCache struct {
- lock mutex
- first *pollDesc
- }
pollDesc网络轮询器是Golang中针对每个socket文件描述符建立的轮询机制。 此处的轮询并不是一般意义上的轮询,而是Golang的runtime在调度goroutine或者GC完成之后或者指定时间之内,调用epoll_wait获取所有产生IO事件的socket文件描述符。当然在runtime轮询之前,需要将socket文件描述符和当前goroutine的相关信息加入epoll维护的数据结构中,并挂起当前goroutine,当IO就绪后,通过epoll返回的文件描述符和其中附带的goroutine的信息,重新恢复当前goroutine的执行。这里我们可以看到pollDesc中有两个变量wg和rg,其实我们可以把它们看作信号量,这两个变量有几种不同的状态:
继续接着上面的WaitRead调用说起,go在这里到底做了什么让当前的goroutine挂起了呢。
- func net_runtime_pollWait(pd * pollDesc, mode int) int {
- err: =netpollcheckerr(pd, int32(mode)) if err != 0 {
- return err
- }
- // As for now only Solaris uses level-triggered IO.
- if GOOS == "solaris" {
- netpollarm(pd, mode)
- }
- for ! netpollblock(pd, int32(mode), false) {
- err = netpollcheckerr(pd, int32(mode)) if err != 0 {
- return err
- }
- // Can happen if timeout has fired and unblocked us,
- // but before we had a chance to run, timeout has been reset.
- // Pretend it has not happened and retry.
- }
- return 0
- }
- // returns true if IO is ready, or false if timedout or closed
- // waitio - wait only for completed IO, ignore errors
- func netpollblock(pd * pollDesc, mode int32, waitio bool) bool {
- //根据读写模式获取相应的pollDesc中的读写信号量
- gpp: =&pd.rg
- if mode == 'w' {
- gpp = &pd.wg
- }
- for {
- old: =*gpp
- //已经准备好直接返回true
- if old == pdReady { * gpp = 0
- return true
- }
- if old != 0 {
- throw ("netpollblock: double wait")
- }
- //设置gpp pdWait
- if atomic.Casuintptr(gpp, 0, pdWait) {
- break
- }
- }
- if waitio || netpollcheckerr(pd, mode) == 0 {
- gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
- }
- old: =atomic.Xchguintptr(gpp, 0) if old > pdWait {
- throw ("netpollblock: corrupted state")
- }
- return old == pdReady
- }
当调用WaitRead时经过一段汇编最重调用了上面的net_runtime_pollWait函数,该函数循环调用了netpollblock函数,返回true表示io已准备好,返回false表示错误或者超时,在netpollblock中调用了gopark函数,gopark函数调用了mcall的函数,该函数用汇编来实现,具体功能就是把当前的goroutine挂起,然后去执行其他可执行的goroutine。到这里整个goroutine挂起的过程已经结束,那当goroutine可读的时候是如何通知该goroutine呢,这就是epoll的功劳了。
- func netpoll(block bool) * g {
- if epfd == -1 {
- return nil
- }
- waitms: =int32( - 1) if ! block {
- waitms = 0
- }
- var events[128] epollevent retry:
- //每次最多监听128个事件
- n: =epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 {
- if n != -_EINTR {
- println("runtime: epollwait on fd", epfd, "failed with", -n) throw ("epollwait failed")
- }
- goto retry
- }
- var gp guintptr
- for i: =int32(0);
- i < n;
- i++{
- ev: =&events[i]
- if ev.events == 0 {
- continue
- }
- var mode int32
- //读事件
- if ev.events & (_EPOLLIN | _EPOLLRDHUP | _EPOLLHUP | _EPOLLERR) != 0 {
- mode += 'r'
- }
- //写事件
- if ev.events & (_EPOLLOUT | _EPOLLHUP | _EPOLLERR) != 0 {
- mode += 'w'
- }
- if mode != 0 {
- //把epoll中的data转换成pollDesc
- pd: =*( * *pollDesc)(unsafe.Pointer( & ev.data)) netpollready( & gp, pd, mode)
- }
- }
- if block && gp == 0 {
- goto retry
- }
- return gp.ptr()
- }
这里就是熟悉的代码了,epoll的使用,看起来亲民多了。
这是最关键的一句,我们在这里拿到当前可读时间的pollDesc,上面我们已经说了,当pollDesc的读写信号量保存为G pointer时当前goroutine就会挂起。而在这里我们调用了netpollready函数,函数中把相应的读写信号量G指针擦出,置为pdReady,G-pointer状态被抹去,当前goroutine的G指针就放到可运行队列中,这样goroutine就被唤醒了。
- pd:=*(**pollDesc)(unsafe.Pointer(&ev.data))
可以看到虽然我们在写tcp server看似一个阻塞的网络模型,在其底层实际上是基于异步多路复用的机制来实现的,只是把它封装成了跟阻塞io相似的开发模式,这样是使得我们不用去关注异步io,多路复用等这些复杂的概念以及混乱的回调函数。
总结
来源: http://www.phperz.com/article/17/1107/352018.html