我使用线程池函数构建管道,并将 context.Context 作为参数传入其中。当调用 cancel() 函数或超时时,管道必须正常终止,这样就不会剩下任何可用的 goroutine。
我使用的职能:
func generate(amount int) <-chan int {
result := make(chan int)
go func() {
defer close(result)
for i := 0; i < amount; i++ {
result <- i
}
}()
return result
}
func sum(input <-chan int) int {
result := 0
for el := range input {
result += el
}
return result
}
func process[T any, R any](ctx context.Context, workers int, input <-chan T, do func(T) R) <-chan R {
wg := new(sync.WaitGroup)
result := make(chan R)
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case val, ok := <-input:
if !ok {
return
}
select {
case <-ctx.Done():
return
case result <- do(val):
}
}
}
}()
}
go func() {
defer close(result)
wg.Wait()
}()
return result
}
用法:
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1200*time.Millisecond)
defer cancel()
input := generate(1000)
multiplied := process(ctx, 15, input, func(val int) int {
time.Sleep(time.Second)
return val * 2
})
increased := process(ctx, 15, multiplied, func(val int) int {
return val + 10
})
fmt.Println("Result: ", sum(increased)) // 360 is ok
fmt.Println("Num goroutine: ", runtime.NumGoroutine()) // 18 is too much
}
我理解发生这种情况是因为所有增加 goroutine 都结束了,而乘法 goroutine 仍在运行。
有没有什么规范的方法来解决这个问题?
您期望类似结构化并发的东西,因此所有 goroutine 都应在当前范围的末尾结束,但不要按照您的期望设计代码。
generate
当输入通道未耗尽且您的do
函数不可取消时,您将会泄漏。generate
为你的功能添加可取消性do
会有些帮助:“高级 Go 并发模式”中提到了更多内容,但作为一般建议,当您瞄准结构化并发时,我建议首先编写同步代码,然后再并发运行它们。