Esta é uma continuação desta postagem: Por que os fluxos paralelos internos são mais rápidos com novos pools do que com o commonPool para este cenário?
Desculpas antecipadas pelo muro de texto. Isto é para JDK 17.
Atualmente estou testando métodos com estes cenários:
- Um externo
parallelStream
e um interno aninhadoparallelStream
que costumaManagedBlocker
chamarThread::sleep
- Um externo
parallelStream
e um interno aninhadoparallelStream
que usa um FJPool diferente, mas também usaManagedBlocker
para chamarThread::sleep
Veja como o código se parece:
public class NestedPerf {
private static final ForkJoinPool sharedInnerPool = new ForkJoinPool();
public static void main(String[] args){
// testInnerParallelLoopWithSharedPoolAndManagedBlock(10000);
// testInnerParallelLoopWithManagedBlock(2);
}
public static void testInnerParallelLoopWithSharedPoolAndManagedBlock(int limit){
Map<Integer, Integer> threads = new ConcurrentHashMap<>();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
try {
ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})).join();
int count = sharedInnerPool.getActiveThreadCount();
threads.put(count, count);
});
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
public static void testInnerParallelLoopWithManagedBlock(int limit){
Map<Integer, Integer> threads = new ConcurrentHashMap<>();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerParallelLoopWithManagedBlock();
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.put(count, count);
});
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
}
Como o título diz, o testInnerParallelLoopWithManagedBlock
método falha com a seguinte exceção:
java.util.concurrent.RejectedExecutionException: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
Mas o testInnerParallelLoopWithSharedPoolAndManagedBlock
método funciona bem até uma contagem de 10.000 (e provavelmente mais, mas não testei além de 10.000).
Inicialmente, pensei que o problema poderia ser devido às interações entre fluxos paralelos aninhados e ManagedBlocker
atuando no mesmo pool. Então, criei outro método de teste onde uso o mesmo FJPool personalizado compartilhado para loops internos e externos:
public static void testBothLoopsWithSharedPoolAndManagedBlock(int limit){
Map<Integer, Integer> threads = new ConcurrentHashMap<>();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
sharedInnerPool.submit(()->StateData.outerLoop.parallelStream().unordered().forEach(i -> {
sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
try {
ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})).join();
int count = sharedInnerPool.getActiveThreadCount();
threads.put(count, count);
})).join();
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
Mas este funciona muito bem. Então o problema parece ser especificamente com ForkJoinPool.commonPool()
.
Alguém tem alguma ideia do que está acontecendo nos bastidores?
O comum 1 tem um tamanho máximo de pool relativamente baixo:
ForkJoinPool
O costume
ForkJoinPool
que você está usando é criado chamando o construtor sem argumentos . Esse construtor:O construtor vinculado por essa documentação diz o seguinte sobre o tamanho máximo do pool:
E se você olhar a nota de implementação na classe Javadoc , verá:
Em resumo, seu costume
ForkJoinPool
tem um tamanho máximo de pool muito maior. Você provavelmente pode evitar issoRejectedExecutionException
com o pool comum se aumentar o valor dos spares máximos definindo ajava.util.concurrent.ForkJoinPool.common.maximumSpares
propriedade do sistema.1. Os links são para o Java 23, mas até onde sei a documentação relevante não mudou desde o Java 17.