AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / user-10560184

Andorrax's questions

Martin Hope
Andorrax
Asked: 2024-12-05 11:30:57 +0800 CST

RejectedExecutionException ao usar ManagedBlocker em parallelStreams aninhados

  • 5

Esta é uma continuação desta postagem: Por que os fluxos paralelos internos são mais rápidos com novos pools do que com o commonPool para este cenário?

Desculpas antecipadas pelo muro de texto. Isto é para JDK 17.

Atualmente estou testando métodos com estes cenários:

  • Um externo parallelStreame um interno aninhado parallelStreamque costuma ManagedBlockerchamarThread::sleep
  • Um externo parallelStreame um interno aninhado parallelStreamque usa um FJPool diferente, mas também usa ManagedBlockerpara chamarThread::sleep

Veja como o código se parece:

public class NestedPerf {
  private static final ForkJoinPool sharedInnerPool = new ForkJoinPool();

  public static void main(String[] args){
//    testInnerParallelLoopWithSharedPoolAndManagedBlock(10000);
//    testInnerParallelLoopWithManagedBlock(2);
  }

  public static void testInnerParallelLoopWithSharedPoolAndManagedBlock(int limit){
    Map<Integer, Integer> threads = new ConcurrentHashMap<>();
    boolean threwException = false;
    for(int c = 0; c < limit; c++) {
      try {
        StateData.outerLoop.parallelStream().unordered().forEach(i -> {
          sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
            try {
              ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
          })).join();
          int count = sharedInnerPool.getActiveThreadCount();
          threads.put(count, count);
        });
      } catch (Exception e) {
        System.out.println(e.getMessage());
        threwException = true;
        int count = ForkJoinPool.commonPool().getActiveThreadCount();
        threads.clear();
        threads.put(count, count);
        break;
      }
    }
    if(threwException){
      System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    } else {
      System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    }
  }

  public static void testInnerParallelLoopWithManagedBlock(int limit){
    Map<Integer, Integer> threads = new ConcurrentHashMap<>();
    boolean threwException = false;
    for(int c = 0; c < limit; c++) {
      try {
        StateData.outerLoop.parallelStream().unordered().forEach(i -> {
          innerParallelLoopWithManagedBlock();
          int count = ForkJoinPool.commonPool().getActiveThreadCount();
          threads.put(count, count);
        });
      } catch (Exception e) {
        System.out.println(e.getMessage());
        threwException = true;
        int count = ForkJoinPool.commonPool().getActiveThreadCount();
        threads.clear();
        threads.put(count, count);
        break;
      }
    }
    if(threwException){
      System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    } else {
      System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    }
  }
}

Como o título diz, o testInnerParallelLoopWithManagedBlockmétodo falha com a seguinte exceção:

java.util.concurrent.RejectedExecutionException: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker

Mas o testInnerParallelLoopWithSharedPoolAndManagedBlockmétodo funciona bem até uma contagem de 10.000 (e provavelmente mais, mas não testei além de 10.000).

Inicialmente, pensei que o problema poderia ser devido às interações entre fluxos paralelos aninhados e ManagedBlockeratuando no mesmo pool. Então, criei outro método de teste onde uso o mesmo FJPool personalizado compartilhado para loops internos e externos:

public static void testBothLoopsWithSharedPoolAndManagedBlock(int limit){
    Map<Integer, Integer> threads = new ConcurrentHashMap<>();
    boolean threwException = false;
    for(int c = 0; c < limit; c++) {
      try {
        sharedInnerPool.submit(()->StateData.outerLoop.parallelStream().unordered().forEach(i -> {
          sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
            try {
              ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
          })).join();
          int count = sharedInnerPool.getActiveThreadCount();
          threads.put(count, count);
        })).join();
      } catch (Exception e) {
        System.out.println(e.getMessage());
        threwException = true;
        int count = ForkJoinPool.commonPool().getActiveThreadCount();
        threads.clear();
        threads.put(count, count);
        break;
      }
    }
    if(threwException){
      System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    } else {
      System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
    }
  }

Mas este funciona muito bem. Então o problema parece ser especificamente com ForkJoinPool.commonPool().

Alguém tem alguma ideia do que está acontecendo nos bastidores?

java
  • 1 respostas
  • 42 Views
Martin Hope
Andorrax
Asked: 2024-12-04 13:10:10 +0800 CST

Por que os fluxos paralelos internos são mais rápidos com novos pools do que com o commonPool para este cenário?

  • 10

Então, recentemente executei um benchmark onde comparei o desempenho de fluxos aninhados em 3 casos:

  • Fluxo externo paralelo e fluxo interno sequencial
  • Fluxos externos e internos paralelos (usando parallelStream) - isso testa efetivamente `ForkJoinPool.commonPool()
  • Fluxos externos e internos paralelos, mas os fluxos internos criam novos fluxos ForkJoinPoolpara cada tarefa

Aqui está o código de benchmark (usei JMH):

public class NestedPerf {
  @State(Scope.Benchmark)
  public static class StateData{
    public static final List<Integer> outerLoop = IntStream.range(0, 32).boxed().toList();
    public static final List<Integer> innerLoop = IntStream.range(0, 32).boxed().toList();
  }
  private static void runInNewPool(Runnable task) {
    ForkJoinPool pool = new ForkJoinPool();
    try {
      pool.submit(task).join();
    } finally {
      pool.shutdown();
    }
  }
  private static void innerParallelLoop() {
    StateData.innerLoop.parallelStream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  private static void innerSequentialLoop() {
    StateData.innerLoop.stream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  @Benchmark
  public void testingNewPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      runInNewPool(ParallelPerf::innerParallelLoop);
      bh.consume(i);
    });
  }

  @Benchmark
  public void testingCommonPoolWithSequentialInner(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerSequentialLoop();
      bh.consume(i);
    });
  }
  @Benchmark
  public void testingCommonPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerParallelLoop();
      bh.consume(i);
    });
  }
}

E aqui está o resultado:

Benchmark                                         Mode  Cnt   Score   Error  Units
NestedPerf.testingCommonPool                     thrpt   25   1.935 ± 0.005  ops/s
NestedPerf.testingCommonPoolWithSequentialInner  thrpt   25   1.744 ± 0.007  ops/s
NestedPerf.testingNewPool                        thrpt   25  22.648 ± 0.559  ops/s

A diferença entre o método com novos Pools e o método com commonPool é surpreendente. Alguém tem uma ideia de por que criar novos pools torna as coisas cerca de 20x mais rápidas para esse benchmark?

Se ajudar, estou executando isso em um sistema Core i7 10850H com 12 CPUs disponíveis (hexcore + hyperthreading).

java
  • 2 respostas
  • 88 Views

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    Reformatar números, inserindo separadores em posições fixas

    • 6 respostas
  • Marko Smith

    Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não?

    • 2 respostas
  • Marko Smith

    Problema com extensão desinstalada automaticamente do VScode (tema Material)

    • 2 respostas
  • Marko Smith

    Vue 3: Erro na criação "Identificador esperado, mas encontrado 'import'" [duplicado]

    • 1 respostas
  • Marko Smith

    Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores?

    • 1 respostas
  • Marko Smith

    Como faço para corrigir um erro MODULE_NOT_FOUND para um módulo que não importei manualmente?

    • 6 respostas
  • Marko Smith

    `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso?

    • 3 respostas
  • Marko Smith

    Um programa vazio que não faz nada em C++ precisa de um heap de 204 KB, mas não em C

    • 1 respostas
  • Marko Smith

    PowerBI atualmente quebrado com BigQuery: problema de driver Simba com atualização do Windows

    • 2 respostas
  • Marko Smith

    AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos

    • 1 respostas
  • Martin Hope
    Fantastic Mr Fox Somente o tipo copiável não é aceito na implementação std::vector do MSVC 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant Encontre o próximo dia da semana usando o cronógrafo 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor O inicializador de membro do construtor pode incluir a inicialização de outro membro? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul O C++20 mudou para permitir a conversão de `type(&)[N]` de matriz de limites conhecidos para `type(&)[]` de matriz de limites desconhecidos? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann Como/por que {2,3,10} e {x,3,10} com x=2 são ordenados de forma diferente? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller O ponto e vírgula agora é opcional em condicionais bash com [[ .. ]] na versão 5.2? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench Por que um traço duplo (--) faz com que esta cláusula MariaDB seja avaliada como verdadeira? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng Por que `dict(id=1, **{'id': 2})` às vezes gera `KeyError: 'id'` em vez de um TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos 2024-03-20 03:12:31 +0800 CST

Hot tag

python javascript c++ c# java typescript sql reactjs html

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve