[关闭]
@wddpct 2018-12-16T07:47:58.000000Z 字数 2612 阅读 3108

Golang 中的协程池,超时和取消


Golang 中对于协程池的实现相对简单,以至于开发者去查看比较知名的 Golang 开源协程池库时,会发现代码多是大同小异。本篇文章便是分享一个简版的带有超时和取消功能的协程池。对于一些必备的知识假设读者都已经掌握。

1. 协程池

  1. package main
  2. import "fmt"
  3. import (
  4. "time"
  5. "runtime"
  6. )
  7. // 协程池对象,抢占式执行任务
  8. func worker(id int, tasks <-chan int) {
  9. for {
  10. select {
  11. case t, ok := <-tasks:
  12. if ok {
  13. fmt.Println("worker", id, "started task", t)
  14. time.Sleep(time.Second)
  15. fmt.Println("worker", id, "finished task", t)
  16. }
  17. default:
  18. fmt.Println("worker", id, "is waiting for a task")
  19. time.Sleep(time.Second)
  20. }
  21. }
  22. }
  23. func main() {
  24. // 任务队列,非缓冲信道
  25. // 可以将 int 换成其他任意类型,包括 function
  26. tasks := make(chan int)
  27. // 初始化逻辑核心数目的 worker
  28. for wid := 1; wid <= runtime.NumCPU(); wid++ {
  29. go worker(wid, tasks)
  30. }
  31. // 填充任务,由 worker 抢占执行
  32. for t := 1; t <= 100; t++ {
  33. tasks <- t
  34. }
  35. close(tasks)
  36. }

上述 worker 代码中的 for...select... 可以用 range channel 的方式代替,但是这样实现超时和取消功能便有些不美观。

2. 超时和释放

实际生产中我们一般都会为协程池和协程池执行对象分别创建 Pool 和 Worker 对象,然后加入超时和取消属性等,这次由于只是简版,所以统一参数传递和处理。

  1. package main
  2. import "fmt"
  3. import (
  4. "time"
  5. "runtime"
  6. )
  7. // 协程池对象,抢占式执行任务
  8. func worker(done chan interface{}, id int, timeout time.Duration, tasks <-chan int) {
  9. go func() {
  10. select {
  11. // 在 id * 2 秒后传入结束标志
  12. case <-time.After(timeout * time.Second):
  13. done <- struct{}{}
  14. return
  15. }
  16. }()
  17. for {
  18. select {
  19. case t, ok := <-tasks:
  20. if ok {
  21. fmt.Println("worker", id, "started task", t)
  22. time.Sleep(time.Second)
  23. fmt.Println("worker", id, "finished task", t)
  24. }
  25. // 收到结束标志,释放当前 worker
  26. case <-done:
  27. fmt.Println("worker", id, "is canceled")
  28. return
  29. default:
  30. fmt.Println("worker", id, "is waiting for a task")
  31. time.Sleep(time.Second)
  32. }
  33. }
  34. }
  35. func main() {
  36. // 任务队列,非缓冲信道
  37. // 可以将 int 换成其他任意类型,包括 function
  38. tasks := make(chan int)
  39. // 初始化逻辑核心数目的 worker
  40. for wid := 1; wid <= runtime.NumCPU(); wid++ {
  41. done := make(chan interface{})
  42. go worker(done, wid, time.Duration(wid*2), tasks)
  43. }
  44. // 填充任务,由 worker 抢占执行
  45. for t := 1; t <= 100; t++ {
  46. tasks <- t
  47. }
  48. close(tasks)
  49. }

上述代码中可能存在一个小 bug,比如笔者电脑的逻辑核心数目是 12,也就是意味着 24 秒之后所有 worker 将取消,但是此时 tasks 并未全部消化,所以会造成 main goroutine 中的死锁。这里就不予以解决了。

3. 用 context 取代普通的结束标识

上述代码中使用 done 标识结束每一个 worker,但实际上我们很多时候并不会在意某一或某二执行对象的生命周期,对于整个协程池才是考虑的重点。Golang 在 1.7 之后的版本中加入的 context 包可以很方便的实现这个功能。

  1. package main
  2. import "fmt"
  3. import (
  4. "time"
  5. "runtime"
  6. "context"
  7. )
  8. // 协程池对象,抢占式执行任务
  9. func worker(ctx context.Context, id int, timeout time.Duration, tasks <-chan int) {
  10. for {
  11. select {
  12. case t, ok := <-tasks:
  13. if ok {
  14. fmt.Println("worker", id, "started task", t)
  15. time.Sleep(time.Second)
  16. fmt.Println("worker", id, "finished task", t)
  17. }
  18. // 收到结束标志,释放当前 worker
  19. case <-ctx.Done():
  20. fmt.Println("worker", id, "is canceled")
  21. return
  22. default:
  23. fmt.Println("worker", id, "is waiting for a task")
  24. time.Sleep(time.Second)
  25. }
  26. }
  27. }
  28. func main() {
  29. // 任务队列,非缓冲信道
  30. // 可以将 int 换成其他任意类型,包括 function
  31. tasks := make(chan int)
  32. // 增加 context 信息控制所有 worker 退出
  33. ctx, cancel := context.WithCancel(context.Background())
  34. // 初始化逻辑核心数目的 worker
  35. for wid := 1; wid <= runtime.NumCPU(); wid++ {
  36. go worker(ctx, wid, time.Duration(wid), tasks)
  37. }
  38. // 填充任务,由 worker 抢占执行
  39. for t := 1; t <= 100; t++ {
  40. tasks <- t
  41. }
  42. close(tasks)
  43. // 释放所有 worker
  44. cancel();
  45. // 保证所有的 worker 都能顺利退出
  46. time.Sleep(5 * time.Second)
  47. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注