//Go语言中暴露的semaphore实现//具体的用法是提供sleep和wakeup原语//以使其能够在其它同步原语中的竞争情况下使用//因此这里的semaphore和Linux中的futex目标是一致的//只不过语义上更简单一些////也就是说,不要认为这些是信号量//把这里的东西看作sleep和wakeup实现的一种方式//每一个sleep都会和一个wakeup配对//即使在发生race时,wakeup在sleep之前时也是如此////SeeMullenderandCox,``SemaphoresinPlan9,''//http://swtch.com/semaphore.pdf//为sync.Mutex准备的异步信号量//semaRoot持有一棵地址各不相同的sudog(s.elem)的平衡树//每一个sudog都反过来指向(通过s.waitlink)一个在同一个地址上等待的其它sudog们//同一地址的sudog的内部列表上的操作时间复杂度都是O(1)。顶层semaRoot列表的扫描//的时间复杂度是O(logn),n是被哈希到同一个semaRoot的不同地址的总数,每一个地址上都会有一些goroutine被阻塞。//访问golang.org/issue/17953来查看一个在引入二级列表之前性能较差的程序样例,test/locklinear.go//中有一个复现这个样例的测试typesemaRootstruct{lockmutextreap*sudog//rootofbalancedtreeofuniquewaiters.nwaituint32//Numberofwaiters.Readw/othelock.}//Primetonotcorrelatewithanyuserpatterns.constsemTabSize=251varsemtable[semTabSize]struct{rootsemaRootpad[sys.CacheLineSize-unsafe.Sizeof(semaRoot{})]byte}funcsemroot(addr*uint32)*semaRoot{return&semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root}
┌─────┬─────┬─────┬─────┬─────┬────────────────────────┬─────┐│0│1│2│3│4│.....│250│└─────┴─────┴─────┴─────┴─────┴────────────────────────┴─────┘││││└──┐└─┐││││▼▼┌─────────┐┌─────────┐│struct││struct│├─────────┴─────────┐├─────────┴─────────┐│rootsemaRoot│──┐│rootsemaRoot│──┐├───────────────────┤│├───────────────────┤││pad│││pad││└───────────────────┘│└───────────────────┘│││┌────────────────┘┌────────────────┘││││▼▼┌──────────┐┌──────────┐│semaRoot││semaRoot│├──────────┴────────┐├──────────┴────────┐│lockmutex││lockmutex│├───────────────────┤├───────────────────┤│treap*sudog││treap*sudog│├───────────────────┤├───────────────────┤│nwaituint32││nwaituint32│└───────────────────┘└───────────────────┘
treap 结构:
┌──────────┐┌─┬─▶│sudog│││├──────────┴────────────┐┌─────────────────────┼─┼──│prev*sudog││││├───────────────────────┤││││next*sudog│────┐│││├───────────────────────┤│││││parent*sudog│││││├───────────────────────┤│││││elemunsafe.Pointer│││││├───────────────────────┤│││││ticketuint32│││││└───────────────────────┘│││││││││││││││││││││││││▼││▼┌──────────┐││┌──────────┐│sudog││││sudog│├──────────┴────────────┐││├──────────┴────────────┐│prev*sudog││││prev*sudog│├───────────────────────┤││├───────────────────────┤│next*sudog││││next*sudog│├───────────────────────┤││├───────────────────────┤│parent*sudog│───┘└─────────────────────────│parent*sudog│├───────────────────────┤├───────────────────────┤│elemunsafe.Pointer││elemunsafe.Pointer│├───────────────────────┤├───────────────────────┤│ticketuint32││ticketuint32│└───────────────────────┘└───────────────────────┘
在这个 treap 结构里,从 elem 的视角(其实就是 lock 的 addr)来看,这个结构是个二叉搜索树。从 ticket 的角度来看,整个结构就是一个小顶堆。
所以才叫树堆(treap)。
相同 addr,即对同一个 mutex 上锁的 g,会阻塞在同一个地址上。这些阻塞在同一个地址上的 goroutine 会被打包成 sudog,组成一个链表。用 sudog 的 waitlink 相连:
┌──────────┐┌──────────┐┌──────────┐│sudog│┌─────▶│sudog│┌─────▶│sudog│├──────────┴────────────┐│├──────────┴────────────┐│├──────────┴────────────┐│waitlink*sudog│─────┘│waitlink*sudog│──────┘│waitlink*sudog│├───────────────────────┤├───────────────────────┤├───────────────────────┤│waittail*sudog││waittail*sudog││waittail*sudog│└───────────────────────┘└───────────────────────┘└───────────────────────┘
中间的元素的 waittail 都会指向最后一个元素:
┌──────────┐│sudog│├──────────┴────────────┐│waitlink*sudog│├───────────────────────┤│waittail*sudog│───────────────────────────────────────────────────────────┐└───────────────────────┘│┌──────────┐││sudog││├──────────┴────────────┐││waitlink*sudog││├───────────────────────┤││waittail*sudog│─────────────────────────┤└───────────────────────┘▼┌──────────┐│sudog│├──────────┴────────────┐│waitlink*sudog│├───────────────────────┤│waittail*sudog│└───────────────────────┘
对外封装
在 sema.go 里实现的内容,用 go:linkname 导出给 sync、poll 库来使用,也是在链接期做了些手脚:
//go:linknamesync_runtime_Semacquiresync.runtime_Semacquirefuncsync_runtime_Semacquire(addr*uint32){semacquire1(addr,false,semaBlockProfile)}//go:linknamepoll_runtime_Semacquireinternal/poll.runtime_Semacquirefuncpoll_runtime_Semacquire(addr*uint32){semacquire1(addr,false,semaBlockProfile)}//go:linknamesync_runtime_Semreleasesync.runtime_Semreleasefuncsync_runtime_Semrelease(addr*uint32,handoffbool){semrelease1(addr,handoff)}//go:linknamesync_runtime_SemacquireMutexsync.runtime_SemacquireMutexfuncsync_runtime_SemacquireMutex(addr*uint32,lifobool){semacquire1(addr,lifo,semaBlockProfile|semaMutexProfile)}//go:linknamepoll_runtime_Semreleaseinternal/poll.runtime_Semreleasefuncpoll_runtime_Semrelease(addr*uint32){semrelease(addr)}
sem 本身支持 acquire 和 release,其实就是 OS 里常说的 P 操作和 V 操作。
funccansemacquire(addr*uint32)bool{for{v:=atomic.Load(addr)ifv==0{returnfalse}ifatomic.Cas(addr,v,v-1){returntrue}}}
typesemaProfileFlagsintconst(semaBlockProfilesemaProfileFlags=1<<iotasemaMutexProfile)//Calledfromruntime.funcsemacquire(addr*uint32){semacquire1(addr,false,0)}funcsemacquire1(addr*uint32,lifobool,profilesemaProfileFlags){gp:=getg()ifgp!=gp.m.curg{throw("semacquirenotontheGstack")}//低成本的情况ifcansemacquire(addr){return}//高成本的情况://增加waitercount的值//再尝试调用一次cansemacquire,成本了就直接返回//没成功就把自己作为一个waiter入队//sleep//(之后waiter的descriptor被signaler用dequeue踢出)s:=acquireSudog()root:=semroot(addr)t0:=int64(0)s.releasetime=0s.acquiretime=0s.ticket=0for{lock(&root.lock)//给nwait加一,这样后来的就不会在semrelease中进低成本的路径了atomic.Xadd(&root.nwait,1)//检查cansemacquire避免错过了唤醒ifcansemacquire(addr){atomic.Xadd(&root.nwait,-1)unlock(&root.lock)break}//在cansemacquire之后的semrelease都可以知道我们正在等待//(上面设置了nwait),所以会直接进入sleep//注:这里说的sleep其实就是goparkunlockroot.queue(addr,s,lifo)goparkunlock(&root.lock,"semacquire",traceEvGoBlockSync,4)ifs.ticket!=0||cansemacquire(addr){break}}ifs.releasetime>0{blockevent(s.releasetime-t0,3)}releaseSudog(s)}
funcsemrelease(addr*uint32){semrelease1(addr,false)}funcsemrelease1(addr*uint32,handoffbool){root:=semroot(addr)atomic.Xadd(addr,1)//低成本情况:没有waiter?//这个atomic的检查必须发生在xadd之前,以避免错误唤醒//(具体参见semacquire中的循环)ifatomic.Load(&root.nwait)==0{return}//高成本情况:搜索waiter并唤醒它lock(&root.lock)ifatomic.Load(&root.nwait)==0{//count值已经被另一个goroutine消费了//所以我们不需要唤醒其它goroutine了unlock(&root.lock)return}s,t0:=root.dequeue(addr)ifs!=nil{atomic.Xadd(&root.nwait,-1)}unlock(&root.lock)ifs!=nil{//可能会很慢,所以先解锁acquiretime:=s.acquiretimeifacquiretime!=0{mutexevent(t0-acquiretime,3)}ifs.ticket!=0{throw("corruptedsemaphoreticket")}ifhandoff&&cansemacquire(addr){s.ticket=1}readyWithTime(s,5)}}funcreadyWithTime(s*sudog,traceskipint){ifs.releasetime!=0{s.releasetime=cputicks()}goready(s.g,traceskip)}
sudog 按照地址 hash 到 251 个 bucket 中的其中一个,每一个 bucket 都是一棵 treap。而相同 addr 上的 sudog 会形成一个链表。
为啥同一个地址的 sudog 不需要展开放在 treap 中呢?显然,sudog 唤醒的时候,block 在同一个 addr 上的 goroutine,说明都是加的同一把锁,这些 goroutine 被唤醒肯定是一起被唤醒的,相同地址的 g 并不需要查找才能找到,只要决定是先进队列的被唤醒(fifo)还是后进队列的被唤醒(lifo)就可以了。
//queue函数会把s添加到semaRoot上阻塞的goroutine们中//实际上就是把s添加到其地址对应的treap上func(root*semaRoot)queue(addr*uint32,s*sudog,lifobool){s.g=getg()s.elem=unsafe.Pointer(addr)s.next=nils.prev=nilvarlast*sudogpt:=&root.treapfort:=*pt;t!=nil;t=*pt{ift.elem==unsafe.Pointer(addr){//Alreadyhaveaddrinlist.iflifo{//treap中在t的位置用s覆盖掉t*pt=ss.ticket=t.tickets.acquiretime=t.acquiretimes.parent=t.parents.prev=t.prevs.next=t.nextifs.prev!=nil{s.prev.parent=s}ifs.next!=nil{s.next.parent=s}//把t放在s的waitlist的第一个位置s.waitlink=ts.waittail=t.waittailifs.waittail==nil{s.waittail=t}t.parent=nilt.prev=nilt.next=nilt.waittail=nil}else{//把s添加到t的等待列表的末尾ift.waittail==nil{t.waitlink=s}else{t.waittail.waitlink=s}t.waittail=ss.waitlink=nil}return}last=tifuintptr(unsafe.Pointer(addr))<uintptr(t.elem){pt=&t.prev}else{pt=&t.next}}//把s作为树的新的叶子插入进去//平衡树使用ticket作为堆的权重值,这个ticket是随机生成的//也就是说,这个结构以元素地址来看的话,是一个二叉搜索树//同时用ticket值使其同时又是一个小顶堆,满足//s.ticket<=boths.prev.ticketands.next.ticket.//https://en.wikipedia.org/wiki/Treap//http://faculty.washington.edu/aragon/pubs/rst89.pdf////s.ticket会在一些地方和0相比,因此只设置最低位的bit//这样不会明显地影响treap的质量?s.ticket=fastrand()|1s.parent=last*pt=s//按照ticket来进行旋转,以满足treap的性质fors.parent!=nil&&s.parent.ticket>s.ticket{ifs.parent.prev==s{root.rotateRight(s.parent)}else{ifs.parent.next!=s{panic("semaRootqueue")}root.rotateLeft(s.parent)}}}//dequeue会搜索到阻塞在addr地址的semaRoot中的第一个goroutine//如果这个sudog需要进行profile,dequeue会返回它被唤醒的时间(now),否则的话now为0func(root*semaRoot)dequeue(addr*uint32)(found*sudog,nowint64){ps:=&root.treaps:=*psfor;s!=nil;s=*ps{ifs.elem==unsafe.Pointer(addr){gotoFound}ifuintptr(unsafe.Pointer(addr))<uintptr(s.elem){ps=&s.prev}else{ps=&s.next}}returnnil,0Found:now=int64(0)ifs.acquiretime!=0{now=cputicks()}ift:=s.waitlink;t!=nil{//替换掉同样在addr上等待的t。*ps=tt.ticket=s.tickett.parent=s.parentt.prev=s.previft.prev!=nil{t.prev.parent=t}t.next=s.nextift.next!=nil{t.next.parent=t}ift.waitlink!=nil{t.waittail=s.waittail}else{t.waittail=nil}t.acquiretime=nows.waitlink=nils.waittail=nil}else{//向下旋转s到叶节点,以进行删除,同时要考虑优先级fors.next!=nil||s.prev!=nil{ifs.next==nil||s.prev!=nil&&s.prev.ticket<s.next.ticket{root.rotateRight(s)}else{root.rotateLeft(s)}}//Removes,nowaleaf.//删除s,现在是叶子节点了ifs.parent!=nil{ifs.parent.prev==s{s.parent.prev=nil}else{s.parent.next=nil}}else{root.treap=nil}}s.parent=nils.elem=nils.next=nils.prev=nils.ticket=0returns,now}
“Semaphore的原理和实现方法”的内容就介绍到这里了,感谢大家的阅读。