@wddpct
2018-12-16T15:47:58.000000Z
字数 2612
阅读 3228
Golang 中对于协程池的实现相对简单,以至于开发者去查看比较知名的 Golang 开源协程池库时,会发现代码多是大同小异。本篇文章便是分享一个简版的带有超时和取消功能的协程池。对于一些必备的知识假设读者都已经掌握。
package main
import "fmt"
import (
"time"
"runtime"
)
// 协程池对象,抢占式执行任务
func worker(id int, tasks <-chan int) {
for {
select {
case t, ok := <-tasks:
if ok {
fmt.Println("worker", id, "started task", t)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished task", t)
}
default:
fmt.Println("worker", id, "is waiting for a task")
time.Sleep(time.Second)
}
}
}
func main() {
// 任务队列,非缓冲信道
// 可以将 int 换成其他任意类型,包括 function
tasks := make(chan int)
// 初始化逻辑核心数目的 worker
for wid := 1; wid <= runtime.NumCPU(); wid++ {
go worker(wid, tasks)
}
// 填充任务,由 worker 抢占执行
for t := 1; t <= 100; t++ {
tasks <- t
}
close(tasks)
}
上述 worker 代码中的 for...select... 可以用 range channel 的方式代替,但是这样实现超时和取消功能便有些不美观。
实际生产中我们一般都会为协程池和协程池执行对象分别创建 Pool 和 Worker 对象,然后加入超时和取消属性等,这次由于只是简版,所以统一参数传递和处理。
package main
import "fmt"
import (
"time"
"runtime"
)
// 协程池对象,抢占式执行任务
func worker(done chan interface{}, id int, timeout time.Duration, tasks <-chan int) {
go func() {
select {
// 在 id * 2 秒后传入结束标志
case <-time.After(timeout * time.Second):
done <- struct{}{}
return
}
}()
for {
select {
case t, ok := <-tasks:
if ok {
fmt.Println("worker", id, "started task", t)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished task", t)
}
// 收到结束标志,释放当前 worker
case <-done:
fmt.Println("worker", id, "is canceled")
return
default:
fmt.Println("worker", id, "is waiting for a task")
time.Sleep(time.Second)
}
}
}
func main() {
// 任务队列,非缓冲信道
// 可以将 int 换成其他任意类型,包括 function
tasks := make(chan int)
// 初始化逻辑核心数目的 worker
for wid := 1; wid <= runtime.NumCPU(); wid++ {
done := make(chan interface{})
go worker(done, wid, time.Duration(wid*2), tasks)
}
// 填充任务,由 worker 抢占执行
for t := 1; t <= 100; t++ {
tasks <- t
}
close(tasks)
}
上述代码中可能存在一个小 bug,比如笔者电脑的逻辑核心数目是 12,也就是意味着 24 秒之后所有 worker 将取消,但是此时 tasks 并未全部消化,所以会造成 main goroutine 中的死锁。这里就不予以解决了。
上述代码中使用 done 标识结束每一个 worker,但实际上我们很多时候并不会在意某一或某二执行对象的生命周期,对于整个协程池才是考虑的重点。Golang 在 1.7 之后的版本中加入的 context 包可以很方便的实现这个功能。
package main
import "fmt"
import (
"time"
"runtime"
"context"
)
// 协程池对象,抢占式执行任务
func worker(ctx context.Context, id int, timeout time.Duration, tasks <-chan int) {
for {
select {
case t, ok := <-tasks:
if ok {
fmt.Println("worker", id, "started task", t)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished task", t)
}
// 收到结束标志,释放当前 worker
case <-ctx.Done():
fmt.Println("worker", id, "is canceled")
return
default:
fmt.Println("worker", id, "is waiting for a task")
time.Sleep(time.Second)
}
}
}
func main() {
// 任务队列,非缓冲信道
// 可以将 int 换成其他任意类型,包括 function
tasks := make(chan int)
// 增加 context 信息控制所有 worker 退出
ctx, cancel := context.WithCancel(context.Background())
// 初始化逻辑核心数目的 worker
for wid := 1; wid <= runtime.NumCPU(); wid++ {
go worker(ctx, wid, time.Duration(wid), tasks)
}
// 填充任务,由 worker 抢占执行
for t := 1; t <= 100; t++ {
tasks <- t
}
close(tasks)
// 释放所有 worker
cancel();
// 保证所有的 worker 都能顺利退出
time.Sleep(5 * time.Second)
}