Sobre o programa: Estou tentando implementar o padrão pipeline por meio do meu próprio tipo de primeira classe intJob
. A função main, que agrega os pipelines, é ExecutePipeline2
e, como posso dizer, exatamente ela está causando problemas.
Por que continuo tendo deadlocks? Para mim, parece que fecho todos os canais que são usados pelos leitores após cada goroutine. Além disso, criar canais com buffer não ajuda, então fiquei completamente sem ideias e ficarei muito grato pela sua ajuda.
IMPORTANTE : Não posso alterar a main
função e implementar essa ideia somente a partir de outras funções, quando a base (função mencionada) permanece constante e inalterada.
type intJob func(a, b chan int)
func ExecutePipeline2(jobs ...intJob) {
outs := make([]chan int, len(jobs)+1)
wg := sync.WaitGroup{}
for i := 0; i < len(outs); i++ {
outs[i] = make(chan int)
}
for i, job := range jobs {
job := job
in, out := outs[i], outs[i+1]
i := i
wg.Add(1)
go func() {
job(in, out)
fmt.Printf("job %d closed\n", i)
close(out)
wg.Done()
}()
}
wg.Wait()
}
func pipe(_, b chan int) {
for i := 0; i < 5; i++ {
b <- i
}
}
func main() {
inputData := []int{0, 1, 1, 2, 3, 5, 8}
hashSignJobs := []intJob{
intJob(func(in, out chan int) {
for _, fibNum := range inputData {
out <- fibNum
}
}),
intJob(pipe),
intJob(func(in, out chan int) {
for val := range in {
fmt.Println(val)
}
}),
}
ExecutePipeline2(hashSignJobs...)
}