Sobre o programa: Estou tentando implementar o padrão pipeline por meio do meu próprio tipo de primeira classe intJob
. A função main, que agrega os pipelines, é ExecutePipeline2
e, como posso dizer, exatamente ela está causando problemas.
Por que continuo tendo deadlocks? Para mim, parece que fecho todos os canais que são usados pelos leitores após cada goroutine. Além disso, criar canais com buffer não ajuda, então fiquei completamente sem ideias e ficarei muito grato pela sua ajuda.
IMPORTANTE : Não posso alterar a main
função e implementar essa ideia somente a partir de outras funções, quando a base (função mencionada) permanece constante e inalterada.
type intJob func(a, b chan int)
func ExecutePipeline2(jobs ...intJob) {
outs := make([]chan int, len(jobs)+1)
wg := sync.WaitGroup{}
for i := 0; i < len(outs); i++ {
outs[i] = make(chan int)
}
for i, job := range jobs {
job := job
in, out := outs[i], outs[i+1]
i := i
wg.Add(1)
go func() {
job(in, out)
fmt.Printf("job %d closed\n", i)
close(out)
wg.Done()
}()
}
wg.Wait()
}
func pipe(_, b chan int) {
for i := 0; i < 5; i++ {
b <- i
}
}
func main() {
inputData := []int{0, 1, 1, 2, 3, 5, 8}
hashSignJobs := []intJob{
intJob(func(in, out chan int) {
for _, fibNum := range inputData {
out <- fibNum
}
}),
intJob(pipe),
intJob(func(in, out chan int) {
for val := range in {
fmt.Println(val)
}
}),
}
ExecutePipeline2(hashSignJobs...)
}
Acho que o problema principal está no seu segundo pipeline
pipe()
, que em vez de ler a saída do pipeline anterior, apenas começou a gerar números em um loop. Deveria ter sido escrito como abaixo para lerin
comoVocê também pode reescrever a função execute para algo como abaixo. Exemplo completo em https://go.dev/play/p/K_-UFzz0zt5
O primeiro canal "out" (1 índice) criado neste trecho de código
não usa como "produtor" e após a primeira iteração na primeira
intJob
e na segunda iteração do primeirointJob
deadlock acumulado (porquechan
está cheio):PLAYGROUND