Então, recentemente executei um benchmark onde comparei o desempenho de fluxos aninhados em 3 casos:
- Fluxo externo paralelo e fluxo interno sequencial
- Fluxos externos e internos paralelos (usando
parallelStream
) - isso testa efetivamente `ForkJoinPool.commonPool() - Fluxos externos e internos paralelos, mas os fluxos internos criam novos fluxos
ForkJoinPool
para cada tarefa
Aqui está o código de benchmark (usei JMH):
public class NestedPerf {
@State(Scope.Benchmark)
public static class StateData{
public static final List<Integer> outerLoop = IntStream.range(0, 32).boxed().toList();
public static final List<Integer> innerLoop = IntStream.range(0, 32).boxed().toList();
}
private static void runInNewPool(Runnable task) {
ForkJoinPool pool = new ForkJoinPool();
try {
pool.submit(task).join();
} finally {
pool.shutdown();
}
}
private static void innerParallelLoop() {
StateData.innerLoop.parallelStream().unordered().forEach(i -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
private static void innerSequentialLoop() {
StateData.innerLoop.stream().unordered().forEach(i -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
@Benchmark
public void testingNewPool(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
runInNewPool(ParallelPerf::innerParallelLoop);
bh.consume(i);
});
}
@Benchmark
public void testingCommonPoolWithSequentialInner(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerSequentialLoop();
bh.consume(i);
});
}
@Benchmark
public void testingCommonPool(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerParallelLoop();
bh.consume(i);
});
}
}
E aqui está o resultado:
Benchmark Mode Cnt Score Error Units
NestedPerf.testingCommonPool thrpt 25 1.935 ± 0.005 ops/s
NestedPerf.testingCommonPoolWithSequentialInner thrpt 25 1.744 ± 0.007 ops/s
NestedPerf.testingNewPool thrpt 25 22.648 ± 0.559 ops/s
A diferença entre o método com novos Pools e o método com commonPool é surpreendente. Alguém tem uma ideia de por que criar novos pools torna as coisas cerca de 20x mais rápidas para esse benchmark?
Se ajudar, estou executando isso em um sistema Core i7 10850H com 12 CPUs disponíveis (hexcore + hyperthreading).
A diferença de desempenho que você está observando vem de como os
ForkJoinPool.commonPool()
identificadores aninham fluxos paralelos. Quando os fluxos externo e interno são usadosparallelStream()
sem um pool de threads personalizado, eles competem pelo mesmo conjunto limitado de threads no pool comum. Isso leva à contenção de threads e à subutilização de recursos da CPU porque o pool não consegue gerenciar efetivamente o paralelismo aninhado.Ao criar um novo
ForkJoinPool
para cada fluxo interno, você fornece threads dedicados para tarefas internas, evitando contenção com os threads do fluxo externo. Isso permite que ambos os níveis de paralelismo utilizem os núcleos da CPU completamente. Obviamente, como você notou, isso resulta em um aumento substancial de desempenho, apesar da sobrecarga de criar novos pools.Possíveis melhorias:
Você pode evitar a sobrecarga de criar múltiplos pools usando um costume compartilhado
ForkJoinPool
para todos os fluxos internos. Essa abordagem elimina a sobrecarga de criação de pools enquanto ainda fornece threads separadas para paralelismo interno, levando a um desempenho ainda melhor.Alternativamente, você pode achatar o paralelismo para um único nível combinando os loops externo e interno em um fluxo paralelo. Este método pode utilizar efetivamente o fluxo paralelo sem paralelismo aninhado, frequentemente resultando no tempo de execução mais rápido porque maximiza a utilização da CPU e minimiza a sobrecarga.
Se você busca simplicidade e rapidez, o achatamento provavelmente vencerá.
Por que o rendimento aumenta
Suas tarefas são simplesmente uma chamada para
Thread::sleep
. Isso bloqueia o thread de chamada, o que significa que o SO não agendará o thread para execução até que a duração especificada passe. Isso deixa a CPU livre para executar quaisquer outros threads. Em outras palavras, suas tarefas não são limitadas pela CPU e, portanto, não sobrecarregam a CPU. O que significa que lançar mais threads em seu conjunto de tarefas aumentará o rendimento sem sobrecarregar a CPU.Ao usar múltiplos pools fork-join, você está efetivamente aumentando o número de threads disponíveis para executar suas tarefas. Não é muito diferente de simplesmente aumentar o número de threads em um único pool. Quer você tenha 1 pool com 15 threads ou 3 pools com 5 threads cada, você ainda acaba com um total de 15 threads.
Digamos que você tenha 10 tarefas que dormem por 5 milissegundos cada. Se você tiver 5 threads para executar essas tarefas, verá aproximadamente:
Mas se você tiver 10 threads, verá aproximadamente:
O primeiro leva um total de 10 milissegundos para executar cada tarefa, o segundo leva apenas 5 milissegundos. E é basicamente daí que vem o aumento de throughput nos seus testes.
Mantendo o paralelismo
Dito isso, a
ForkJoinPool
tem um nível definido de paralelismo. Uma maneira de tentar manter esse paralelismo é gerando uma nova thread (se o número máximo de threads ainda não tiver sido atingido) quando uma de suas threads for bloqueada. Da documentação :Você está chamando
Thread::sleep
de uma forma não gerenciada. Em outras palavras, você está bloqueando os threads do pool de tal forma que o pool não pode compensar. Para evitar isso, considere usar umManagedBlocker
. Aqui está um exemplo de implementação:Então você substituiria as
Thread.sleep(5)
chamadas por:Você deverá ver aumentos de rendimento semelhantes em seus testes sem precisar usar vários pools de bifurcação e junção.
Referências JMH
Aqui está um benchmark mostrando o efeito de usar
ManagedBlocker
neste caso. Ele foi compilado e executado no Java 23.Resultados (da execução do benchmark em um computador com 8 processadores):