semaphore原理

信号量的概念是荷兰计算机科学家 Edsger Dijkstra 在 1963 年左右提出来的,广泛应用在 不同的操作系统中。在系统中,会给每一个进程一个信号量,代表每个进程目前的状态。 未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。 最简单的信号量就是一个变量加一些并发控制的能力,这个变量是 0 到 n 之间的一个数 值。当 goroutine 完成对此信号量的等待(wait)时,该计数值就减 1,当 goroutine 完 成对此信号量的释放(release)时,该计数值就加 1。当计数值为 0 的时候,goroutine 调用 wait 等待该信号量是不会成功的,除非计数器又大于 0,等待的 goroutine 才有可能 成功返回。 更复杂的信号量类型,就是使用抽象数据类型代替变量,用来代表复杂的资源类型。实际 上,大部分的信号量都使用一个整型变量来表示一组资源,并没有实现太复杂的抽象数据 类型,所以你只要知道有更复杂的信号量就行了,我们这节课主要是学习最简单的信号 量。

P/V 操作 Dijkstra 在他的论文中为信号量定义了两个操作 P 和 V。P 操作(descrease、wait、 acquire)是减少信号量的计数值,而 V 操作(increase、signal、release)是增加信号 量的计数值

Go 在它的扩展包中提供了信号量 semaphore,不过这个信号量的类型名并不叫 Semaphore,而是叫 Weighted

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
var (
	sema = semaphore.NewWeighted(int64(maxWorkers)) //信号量
	task = make([]int, maxWorkers*4) // 任务数,是worker的四
)
func main() {
    ctx := context.Background()
    for i := range task {
    // 如果没有worker可用,会阻塞在这里,直到某个worker被释放
        if err := sema.Acquire(ctx, 1); err != nil {
       		 break
    	}
    // 启动worker goroutine
        go func(i int) {
            defer sema.Release(1)
            time.Sleep(100 * time.Millisecond) // 模拟一个耗时操作
            task[i] = i + 1
        }(i)
    }
    // 请求所有的worker,这样能确保前面的worker都执行完
    if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil {
    	log.Printf("获取所有的worker失败: %v", err)
    }
    fmt.Println(task)
}

思考题 欢迎在留言区写下你的思考和答案,我们一起交流讨论。如果你觉得有所收获,也欢迎你 把今天的内容分享给你的朋友或同事。

  1. 你能用 Channel 实现信号量并发原语吗?你能想到几种实现方式?

  2. 为什么信号量的资源数设计成 int64 而不是 uint64 呢?

在初始化这个信号量的时候,我们设置它的初始容量,代表有多少个资源可以使用。它使 用 Lock 和 Unlock 方法实现请求资源和释放资源,正好实现了 Locker 接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// Semaphore 数据结构,并且还实现了Locker接口
type semaphore struct {
    sync.Locker
    ch chan struct{}
}
// 创建一个新的信号量
func NewSemaphore(capacity int) sync.Locker {
    if capacity <= 0 {
    	capacity = 1 // 容量为1就变成了一个互斥锁
    }
	return &semaphore{ch: make(chan struct{}, capacity)}
}
// 请求一个资源
func (s *semaphore) Lock() {
	s.ch <- struct{}{}
}
// 释放资源
func (s *semaphore) Unlock() {
	<-s.ch
}

除了 Channel, marusama/semaphore 也实现了一个可以动态更改资源容量的信号 量,也是一个非常有特色的实现。如果你的资源数量并不是固定的,而是动态变化的,我 建议你考虑一下这个信号量库。

不管怎样,信号量这个并发原语在多资源共享的并发控制的场景中被广泛使用,有时候也 会被 Channel 类型所取代,因为一个 buffered chan 也可以代表 n 个资源。 但是,官方扩展的信号量也有它的优势,就是可以一次获取多个资源。在批量获取资源的 场景中, 建议你尝试使用官方扩展的信号量。