waitgroup
概念
go
标准库提供了waitgroup
原语, 可以用它来等待一批 goroutine 结束
底层数据结构
// a waitgroup must not be copied after first use. type waitgroup struct { nocopy nocopy state1 [3]uint32 }
其中 nocopy
是 golang 源码中检测禁止拷贝的技术。如果程序中有 waitgroup 的赋值行为,使用 go vet
检查程序时,就会发现有报错。但需要注意的是,nocopy 不会影响程序正常的编译和运行。
state1
主要是存储着状态和信号量,状态维护了 2 个计数器,一个是请求计数器counter ,另外一个是等待计数器waiter(已调用 waitgroup.wait
的 goroutine 的个数)
当数组的首地址是处于一个8
字节对齐的位置上时,那么就将这个数组的前8
个字节作为64
位值使用表示状态,后4
个字节作为32
位值表示信号量(semaphore
);同理如果首地址没有处于8
字节对齐的位置上时,那么就将前4
个字节作为semaphore
,后8
个字节作为64
位数值。
使用方法
在waitgroup里主要有3个方法:
waitgroup.add()
:可以添加或减少请求的goroutine数量,add(n)
将会导致 counter += n
waitgroup.done()
:相当于add(-1),done()
将导致 counter -=1
,请求计数器counter为0 时通过信号量调用runtime_semrelease
唤醒waiter线程
waitgroup.wait()
:会将 waiter++
,同时通过信号量调用 runtime_semacquire(semap)
阻塞当前 goroutine
func main() { var wg sync.waitgroup for i := 1; i <= 5; i++ { wg.add(1) go func() { defer wg.done() println("hello") }() } wg.wait() }
cond
概念
go
标准库提供了cond
原语,可以让 goroutine 在满足特定条件时被阻塞和唤醒
底层数据结构
type cond struct { nocopy nocopy // l is held while observing or changing the condition l locker notify notifylist checker copychecker } type notifylist struct { wait uint32 notify uint32 lock uintptr // key field of the mutex head unsafe.pointer tail unsafe.pointer }
主要有4
个字段:
nocopy
: golang 源码中检测禁止拷贝的技术。如果程序中有 waitgroup 的赋值行为,使用 go vet
检查程序时,就会发现有报错,但需要注意的是,nocopy 不会影响程序正常的编译和运行
checker
:用于禁止运行期间发生拷贝,双重检查(double check
)
l
:可以传入一个读写锁或互斥锁,当修改条件或者调用wait
方法时需要加锁
notify
:通知链表,调用wait()
方法的goroutine
会放到这个链表中,从这里获取需被唤醒的goroutine列表
使用方法
在cond里主要有3个方法:
sync.newcond(l locker)
: 新建一个 sync.cond 变量,注意该函数需要一个 locker 作为必填参数,这是因为在cond.wait()
中底层会涉及到 locker 的锁操作cond.wait()
: 阻塞等待被唤醒,调用wait函数前需要先加锁;并且由于wait函数被唤醒时存在虚假唤醒等情况,导致唤醒后发现,条件依旧不成立,因此需要使用 for 语句来循环地进行等待,直到条件成立为止cond.signal()
: 只唤醒一个最先 wait 的 goroutine,可以不用加锁cond.broadcast()
: 唤醒所有wait的goroutine,可以不用加锁
package main import ( "fmt" "sync" "sync/atomic" "time" ) var status int64 func main() { c := sync.newcond(&sync.mutex{}) for i := 0; i < 10; i++ { go listen(c) } go broadcast(c) time.sleep(1 * time.second) } func broadcast(c *sync.cond) { // 原子操作 atomic.storeint64(&status, 1) c.broadcast() } func listen(c *sync.cond) { c.l.lock() for atomic.loadint64(&status) != 1 { c.wait() // wait 内部会先调用 c.l.unlock(),来先释放锁,如果调用方不先加锁的话,会报错 } fmt.println("listen") c.l.unlock() }
以上就是go waitgroup及cond底层实现原理的详细内容,更多关于go waitgroup cond原理的资料请关注其它相关文章!