哈喽,大家好,我是asong。在写上一篇文章请勿滥用goroutine时,发现Go语言扩展包提供了一个带权重的信号量库Semaphore,使用信号量我们可以实现一个"工作池"控制一定数量的goroutine并发工作。因为对源码抱有好奇的态度,所以在周末仔细看了一下这个库并进行了解析,在这里记录一下。
目前创新互联公司已为上千家的企业提供了网站建设、域名、虚拟空间、网站托管运营、企业网站设计、海东网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
要想知道一个东西是什么,我都爱去百度百科上搜一搜,输入"信号量",这答案不就来了。
百度百科解释:
信号量(Semaphore),有时被称为信号灯,是[多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。为了完成这个过程,需要创建一个信号量VI,然后将Acquire Semaphore VI以及Release Semaphore VI分别放置在每个关键代码段的首末端。确认这些信号量VI引用的是初始创建的信号量。
通过这段解释我们可以得知什么是信号量,其实信号量就是一种变量或者抽象数据类型,用于控制并发系统中多个进程对公共资源的访问,访问具有原子性。信号量主要分为两类:
信号量是由操作系统来维护的,信号量只能进行两种操作等待和发送信号,操作总结来说,核心就是PV操作:
在信号量进行PV操作时都为原子操作,并且在PV原语执行期间不允许有中断的发生。
PV原语对信号量的操作可以分为三种情况:
具体在什么场景使用本文就不在继续分析,接下来我们重点来看一下Go语言提供的扩展包Semaphore,看看它是怎样实现的。
我们之前在分析Go语言源码时总会看到这几个函数:
- func runtime_Semacquire(s *uint32)
- func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
- func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
这几个函数就是信号量的PV操作,不过他们都是给Go内部使用的,如果想使用信号量,那就可以使用官方的扩展包:Semaphore,这是一个带权重的信号量,接下来我们就重点分析一下这个库。
安装方法:go get -u golang.org/x/sync
- type Weighted struct {
- size int64 // 设置一个最大权值
- cur int64 // 标识当前已被使用的资源数
- mu sync.Mutex // 提供临界区保护
- waiters list.List // 阻塞等待的调用者列表
- }
semaphore库核心结构就是Weighted,主要有4个字段:
- type waiter struct {
- n int64 // 等待调用者权重值
- ready chan<- struct{} // close channel就是唤醒
- }
这里只有两个字段:
semaphore还提供了一个创建Weighted对象的方法,在初始化时需要给定最大权值:
- // NewWeighted为并发访问创建一个新的加权信号量,该信号量具有给定的最大权值。
- func NewWeighted(n int64) *Weighted {
- w := &Weighted{size: n}
- return w
- }
先直接看代码吧:
- func (s *Weighted) Acquire(ctx context.Context, n int64) error {
- s.mu.Lock() // 加锁保护临界区
- // 有资源可用并且没有等待获取权值的goroutine
- if s.size-s.cur >= n && s.waiters.Len() == 0 {
- s.cur += n // 加权
- s.mu.Unlock() // 释放锁
- return nil
- }
- // 要获取的权值n大于最大的权值了
- if n > s.size {
- // 先释放锁,确保其他goroutine调用Acquire的地方不被阻塞
- s.mu.Unlock()
- // 阻塞等待context的返回
- <-ctx.Done()
- return ctx.Err()
- }
- // 走到这里就说明现在没有资源可用了
- // 创建一个channel用来做通知唤醒
- ready := make(chan struct{})
- // 创建waiter对象
- w := waiter{n: n, ready: ready}
- // waiter按顺序入队
- elem := s.waiters.PushBack(w)
- // 释放锁,等待唤醒,别阻塞其他goroutine
- s.mu.Unlock()
- // 阻塞等待唤醒
- select {
- // context关闭
- case <-ctx.Done():
- err := ctx.Err() // 先获取context的错误信息
- s.mu.Lock()
- select {
- case <-ready:
- // 在context被关闭后被唤醒了,那么试图修复队列,假装我们没有取消
- err = nil
- default:
- // 判断是否是第一个元素
- isFront := s.waiters.Front() == elem
- // 移除第一个元素
- s.waiters.Remove(elem)
- // 如果是第一个元素且有资源可用通知其他waiter
- if isFront && s.size > s.cur {
- s.notifyWaiters()
- }
- }
- s.mu.Unlock()
- return err
- // 被唤醒了
- case <-ready:
- return nil
- }
- }
注释已经加到代码中了,总结一下这个方法主要有三个流程:
- func main() {
- s := semaphore.NewWeighted(3)
- ctx,cancel := context.WithTimeout(context.Background(), time.Second * 2)
- defer cancel()
- for i :=0; i < 3; i++{
- if i != 0{
- go func(num int) {
- if err := s.Acquire(ctx,3); err != nil{
- fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
- return
- }
- time.Sleep(2 * time.Second)
- fmt.Printf("goroutine: %d run over\n",num)
- s.Release(3)
- }(i)
- }else {
- go func(num int) {
- ct,cancel := context.WithTimeout(context.Background(), time.Second * 3)
- defer cancel()
- if err := s.Acquire(ct,3); err != nil{
- fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
- return
- }
- time.Sleep(3 * time.Second)
- fmt.Printf("goroutine: %d run over\n",num)
- s.Release(3)
- }(i)
- }
- }
- time.Sleep(10 * time.Second)
- }
上面的例子中goroutine:0 使用ct对象来做控制,超时时间为3s,goroutine:1和goroutine:2对象使用ctx对象来做控制,超时时间为2s,这三个goroutine占用的资源都等于最大资源数,也就是说只能有一个goruotine运行成功,另外两个goroutine都会被阻塞,因为goroutine是抢占式调度,所以我们不能确定哪个gouroutine会第一个被执行,这里我们假设第一个获取到信号量的是gouroutine:2,阻塞等待的调用者列表顺序是:goroutine:1 -> goroutine:0,因为在goroutine:2中有一个2s的延时,所以会触发ctx的超时,ctx会下发Done信号,因为goroutine:2和goroutine:1都是被ctx控制的,所以就会把goroutine:1从等待者队列中取消,但是因为goroutine:1属于队列的第一个队员,并且因为goroutine:2已经释放资源,那么就会唤醒goroutine:0继续执行,画个图表示一下:
使用这种方式可以避免goroutine永久失眠。
- func (s *Weighted) TryAcquire(n int64) bool {
- s.mu.Lock() // 加锁
- // 有资源可用并且没有等待获取资源的goroutine
- success := s.size-s.cur >= n && s.waiters.Len() == 0
- if success {
- s.cur += n
- }
- s.mu.Unlock()
- return success
- }
这个方法就简单很多了,不阻塞地获取权重为n的信号量,成功时返回true,失败时返回false并保持信号量不变。
- func (s *Weighted) Release(n int64) {
- s.mu.Lock()
- // 释放资源
- s.cur -= n
- // 释放资源大于持有的资源,则会发生panic
- if s.cur < 0 {
- s.mu.Unlock()
- panic("semaphore: released more than held")
- }
- // 通知其他等待的调用者
- s.notifyWaiters()
- s.mu.Unlock()
- }
这里就是很常规的操作,主要就是资源释放,同时进行安全性判断,如果释放资源大于持有的资源,则会发生panic。
在Acquire和Release方法中都调用了notifyWaiters,我们来分析一下这个方法:
- func (s *Weighted) notifyWaiters() {
- for {
- // 获取等待调用者队列中的队员
- next := s.waiters.Front()
- // 没有要通知的调用者了
- if next == nil {
- break // No more waiters blocked.
- }
- // 断言出waiter信息
- w := next.Value.(waiter)
- if s.size-s.cur < w.n {
- // 没有足够资源为下一个调用者使用时,继续阻塞该调用者,遵循先进先出的原则,
- // 避免需要资源数比较大的waiter被饿死
- //
- // 考虑一个场景,使用信号量作为读写锁,现有N个令牌,N个reader和一个writer
- // 每个reader都可以通过Acquire(1)获取读锁,writer写入可以通过Acquire(N)获得写锁定
- // 但不包括所有的reader,如果我们允许reader在队列中前进,writer将会饿死-总是有一个令牌可供每个reader
- break
- }
- // 获取资源
- s.cur += w.n
- // 从waiter列表中移除
- s.waiters.Remove(next)
- // 使用channel的close机制唤醒waiter
- close(w.ready)
- }
- }
这里只需要注意一个点:唤醒waiter采用先进先出的原则,避免需要资源数比较大的waiter被饿死。
到这里我们就把Semaphore的源代码看了一篇,代码行数不多,封装的也很巧妙,那么我们该什么时候选择使用它呢?
目前能想到一个场景就是Semaphore配合上errgroup实现一个"工作池",使用Semaphore限制goroutine的数量,配合上errgroup做并发控制,示例如下:
- const (
- limit = 2
- )
- func main() {
- serviceName := []string{
- "cart",
- "order",
- "account",
- "item",
- "menu",
- }
- eg,ctx := errgroup.WithContext(context.Background())
- s := semaphore.NewWeighted(limit)
- for index := range serviceName{
- name := serviceName[index]
- if err := s.Acquire(ctx,1); err != nil{
- fmt.Printf("Acquire failed and err is %s\n", err.Error())
- break
- }
- eg.Go(func() error {
- defer s.Release(1)
- return callService(name)
- })
- }
- if err := eg.Wait(); err != nil{
- fmt.Printf("err is %s\n", err.Error())
- return
- }
- fmt.Printf("run success\n")
- }
- func callService(name string) error {
- fmt.Println("call ",name)
- time.Sleep(1 * time.Second)
- return nil
- }
结果如下:
- call order
- call cart
- call account
- call item
- call menu
- run success
本文我们主要赏析了Go官方扩展库Semaphore的实现,他的设计思路简单,仅仅用几十行就完成了完美的封装,值得我们借鉴学习。不过在实际业务场景中,我们使用信号量的场景并不多,大多数场景我们都可以使用channel来替代,但是有些场景使用Semaphore来实现会更好,比如上篇文章【[警惕] 请勿滥用goroutine】我们使用channel+sync来控制goroutine数量,这种实现方式并不好,因为实际已经起来了多个goroutine,只不过控制了工作的goroutine数量,如果改用semaphore实现才是真正的控制了goroutine数量。
文中代码已上传github:https://github.com/asong2020/Golang_Dream/blob/master/code_demo/semaphore_demo/semaphore.go,欢迎star。
网页标题:Go官方设计了一个信号量库
URL网址:http://www.mswzjz.com/qtweb/news40/204240.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联