github.com/marusama/cyclicbarrier
是一个 Go 语言库,用于实现 循环屏障(Cyclic Barrier) 的同步机制。它的主要功能是协调多个 goroutine 在某个点等待,直到所有 goroutine 都到达该点后,才能继续执行后续操作。以下是它的核心功能和用途:
1. 核心功能
- 同步多个 goroutine:
屏障会阻塞所有调用Await()
的 goroutine,直到预设数量的 goroutine 都到达屏障点。 - 循环使用:
屏障可以重复使用(即“循环”),每次所有 goroutine 通过后,屏障会自动重置,可以再次用于下一轮同步。 - 支持超时和取消:
通过context.Context
支持超时或取消操作,避免 goroutine 无限等待。
2. 主要用途
- 任务分阶段执行:
例如,多个任务需要分阶段执行,每个阶段完成后需要等待其他任务,才能进入下一阶段。 - 模拟多人游戏:
所有玩家加载完资源后,游戏才能开始(如代码示例中的场景)。 - 并行计算:
在并行计算中,多个计算任务需要同步中间结果后再继续。 - 测试并发逻辑:
用于测试多 goroutine 的同步行为。
3. 关键方法
New(n int)
:
创建一个屏障,n
是需要等待的 goroutine 数量。Await(ctx context.Context) error
:
调用此方法的 goroutine 会阻塞,直到所有 goroutine 都调用了Await()
。- 返回
nil
:所有 goroutine 成功通过屏障。 - 返回错误:可能是超时(
context.DeadlineExceeded
)或屏障被破坏(ErrBrokenBarrier
)。
- 返回
4. 与 sync.WaitGroup
的区别
sync.WaitGroup
:
用于等待一组 goroutine 完成,但无法重复使用。cyclicbarrier
:
支持重复使用,且可以嵌入业务逻辑(如屏障通过后执行特定动作)。
5. 代码示例中的使用
在用户提供的代码中:
- 创建了一个屏障,参与者数量为玩家数(4)加裁判(1)。
- 每个玩家调用
Await()
等待其他玩家和裁判。 - 裁判调用
Await()
后,打印“游戏开始”作为屏障通过后的动作。
代码
package mainimport ("context""fmt""github.com/marusama/cyclicbarrier""math/rand""sync""time"
)func player(ctx context.Context, id int, b cyclicbarrier.CyclicBarrier, wg *sync.WaitGroup) {defer wg.Done()// 模拟加载资源loadTime := time.Duration(rand.Intn(3)+1) * time.Secondfmt.Printf("Player %d is loading resources (%v)...\n", id, loadTime)time.Sleep(loadTime)fmt.Printf("Player %d finished loading and is waiting...\n", id)err := b.Await(ctx)if err != nil {fmt.Printf("Player %d couldn't start the game: %v\n", id, err)return}fmt.Printf("==> Player %d has entered the game! <--\n", id)
}func main() {const numPlayers = 6// 我们增加一个特殊的goroutine,在所有玩家准备好后,由它来宣布游戏开始// 因此参与者总数是 玩家数 + 1b := cyclicbarrier.New(numPlayers)var wg sync.WaitGroupwg.Add(numPlayers)ctx := context.Background()// 启动玩家goroutinefor i := 1; i <= numPlayers; i++ {go player(ctx, i, b, &wg)}// 启动游戏裁判goroutinego func() {defer wg.Done()fmt.Println("Game Master: Waiting for all players to be ready...")// 裁判也加入等待err := b.Await(ctx)if err != nil {fmt.Printf("Game Master: Could not start the game: %v\n", err)return}// 当所有参与者(玩家+裁判)都到达后,裁判被唤醒,执行这个“栅栏动作”fmt.Println("Game Master: All players are ready. Game Start!")}()wg.Wait()
}
带超时的
package mainimport ("context""errors""fmt""github.com/marusama/cyclicbarrier""time"
)func main() {const numTasks = 3b := cyclicbarrier.New(numTasks)// 创建一个2秒后自动取消的contextctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)defer cancel()for i := 0; i < numTasks; i++ {go func(id int) {// 模拟一个任务超时if id == 0 {fmt.Printf("Task %d will take a long time...\n", id)time.Sleep(5 * time.Second) // 这个任务耗时超过了context的超时时间} else {time.Sleep(1 * time.Second)}fmt.Printf("Task %d reached barrier.\n", id)err := b.Await(ctx)// 关键的错误处理if err != nil {// 使用 errors.Is 来正确判断错误类型if errors.Is(err, context.DeadlineExceeded) {fmt.Printf("Task %d: Waiting timed out! The barrier is broken.\n", id)} else if errors.Is(err, cyclicbarrier.ErrBrokenBarrier) {fmt.Printf("Task %d: The barrier was broken by another goroutine.\n", id)} else {fmt.Printf("Task %d: An unexpected error occurred: %v\n", id, err)}} else {fmt.Printf("Task %d passed the barrier.\n", id)}}(i)}time.Sleep(6 * time.Second) // 等待演示结束
}