Go 并发系列是根据我对晁岳攀老师的《Go 并发编程实战课》的吸收和理解整理而成,如有偏差,欢迎指正~
在上一篇 Go 并发基石之 channel(一) 中简单介绍了 channel 的由来、基本用法以及几种容易出错的场景。
这一篇,尝试通过 channel 源码的阅读,了解 channel 的实现思路。
说明:以下源码均基于 go1.16。
channel 的定义 channel 的数据类型是 runtime.hchan ,其详细定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq lock mutex }
整体上,channel 的设计思路可以概述为用一个循环队列存储数据,send 索引和 recv 索引记录发送和接收数据的节点。向 channel 发送的数据会被添加循环队列尾部;从 channel 接收的数据来自于队列的首部。
channel 的创建 创建 channel 的语句如下:
1 2 ch := make (chan int ) ch := make (chan int , 2 )
make 函数的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func walkMakeChan (n *ir.MakeExpr, init *ir.Nodes) ir .Node { size := n.Len fnname := "makechan64" argtype := types.Types[types.TINT64] if size.Type().IsKind(types.TIDEAL) || size.Type().Size() <= types.Types[types.TUINT].Size() { fnname = "makechan" argtype = types.Types[types.TINT] } return mkcall1(chanfn(fnname, 1 , n.Type()), n.Type(), init, reflectdata.TypePtr(n.Type()), typecheck.Conv(size, argtype)) }
仅仅看注释,也能猜出来,make 的时候,编译器会根据系统的位数选择对应的函数: makechan64 还是 makechan。makechan64 底层直接调用的 makechan。所以我们直接看 makechane 的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 const ( maxAlign = 8 hchanSize = unsafe.Sizeof(hchan{}) + uintptr (-int (unsafe.Sizeof(hchan{}))&(maxAlign-1 )) )func makechan (t *chantype, size int ) *hchan { elem := t.elem if elem.size >= 1 <<16 { throw("makechan: invalid channel element type" ) } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment" ) } mem, overflow := math.MulUintptr(elem.size, uintptr (size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic (plainError("makechan: size out of range" )) } var c *hchan switch { case mem == 0 : c = (*hchan)(mallocgc(hchanSize, nil , true )) c.buf = c.raceaddr() case elem.ptrdata == 0 : c = (*hchan)(mallocgc(hchanSize+mem, nil , true )) c.buf = add(unsafe.Pointer(c), hchanSize) default : c = new (hchan) c.buf = mallocgc(mem, elem, true ) } c.elemsize = uint16 (elem.size) c.elemtype = elem c.dataqsiz = uint (size) lockInit(&c.lock, lockRankHchan) return c }
梳理 makechan 的逻辑之前,先看下常量 maxAlign 和 hchanSize。
maxAlign : 内存对齐最大值,这里8表示64位对齐。hchanSize : 大于 Sizeof(hchan{}) 且为8的倍数中最小的一个,其目的是提高 CPU 存取的速度(CPU 按块存取数据,块的大小可以是8、16、24等字节 )。
第 20 行,计算需要给循环队列 buf 分配的内存,如果超过最大限制,则抛出异常。
接下来的内存分配流程比较清晰。
如果 mem == 0,说明是无缓冲 channel,只需要分配 hchan 本身的内存;如果缓冲数据是值类型,则分配 hchanSize+mem 大小的连续内存,buf 指向循环队列;如果缓冲数据是指针类型,则分别分配 hchan 和循环队列的内存。
往 channel 发送数据 以下是往 channel 发送数据的语句:
其对应源码如下(解释见注释):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 func chansend1 (c *hchan, elem unsafe.Pointer) { chansend(c, elem, true , getcallerpc()) }func chansend (c *hchan, ep unsafe.Pointer, block bool , callerpc uintptr ) bool { if c == nil { if !block { return false } gopark(nil , nil , waitReasonChanSendNilChan, traceEvGoStop, 2 ) throw("unreachable" ) } if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic (plainError("send on closed channel" )) } if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true } if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil ) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1 ) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2 ) KeepAlive(ep) if mysg != gp.waiting { throw("G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2 ) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup" ) } panic (plainError("send on closed channel" )) } return true }
不深究细节,只看流程,chansend 还比较好理解,只是 if 分支比较多,不容易厘清,可以对着下面的流程图一起看:
从 channel 接收数据 从 channel 接收数据的写法有两种:
分别对应如下两个函数:
1 2 3 4 5 6 7 8 9 10 11 12 func chanrecv1 (c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true ) }func chanrecv2 (c *hchan, elem unsafe.Pointer) (received bool ) { _, received = chanrecv(c, elem, true ) return }
这两个函数都是通过调用 chanrecv 实现(主要解释见注释):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 func chanrecv (c *hchan, ep unsafe.Pointer, block bool ) (selected, received bool ) { if c == nil { if !block { return } gopark(nil , nil , waitReasonChanReceiveNilChan, traceEvGoStop, 2 ) throw("unreachable" ) } if !block && empty(c) { if atomic.Load(&c.closed) == 0 { return } if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true , false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true , false } if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true , true } if c.qcount > 0 { qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil ) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true , true } if !block { unlock(&c.lock) return false , false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1 ) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2 ) if mysg != gp.waiting { throw("G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2 ) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true , success }
chanrecv 的 if 判断可以对着下面的流程图看:
close 一个 channel 关闭 一个 channel 的语法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 func closechan (c *hchan) { if c == nil { panic (plainError("close of nil channel" )) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic (plainError("close of closed channel" )) } c.closed = 1 var glist gList for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3 ) } }
close 一个 channel 之前,如果这是 channel 是 nil 或者已经被 close ,则抛出异常。
接下来,close 的动作有三块:
1)修改 c.closed 标志位 2)唤醒所有读协程:读到的都是零值 3)唤醒所有写协程:chansend 中会抛出异常
结尾 粗略的过了一遍 channel 的源码,还是有不少细节没看明白,但是目前水平所限,只能看到这一步。期待对 Go 有了进一步的了解之后,再回过头来重温,能有新的认识。
下一期过一遍基于 channel 的几种并发模式。
就这样,下一期再见~