AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / coding / Perguntas / 79249956
Accepted
Andorrax
Andorrax
Asked: 2024-12-04 13:10:10 +0800 CST2024-12-04 13:10:10 +0800 CST 2024-12-04 13:10:10 +0800 CST

Por que os fluxos paralelos internos são mais rápidos com novos pools do que com o commonPool para este cenário?

  • 772

Então, recentemente executei um benchmark onde comparei o desempenho de fluxos aninhados em 3 casos:

  • Fluxo externo paralelo e fluxo interno sequencial
  • Fluxos externos e internos paralelos (usando parallelStream) - isso testa efetivamente `ForkJoinPool.commonPool()
  • Fluxos externos e internos paralelos, mas os fluxos internos criam novos fluxos ForkJoinPoolpara cada tarefa

Aqui está o código de benchmark (usei JMH):

public class NestedPerf {
  @State(Scope.Benchmark)
  public static class StateData{
    public static final List<Integer> outerLoop = IntStream.range(0, 32).boxed().toList();
    public static final List<Integer> innerLoop = IntStream.range(0, 32).boxed().toList();
  }
  private static void runInNewPool(Runnable task) {
    ForkJoinPool pool = new ForkJoinPool();
    try {
      pool.submit(task).join();
    } finally {
      pool.shutdown();
    }
  }
  private static void innerParallelLoop() {
    StateData.innerLoop.parallelStream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  private static void innerSequentialLoop() {
    StateData.innerLoop.stream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  @Benchmark
  public void testingNewPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      runInNewPool(ParallelPerf::innerParallelLoop);
      bh.consume(i);
    });
  }

  @Benchmark
  public void testingCommonPoolWithSequentialInner(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerSequentialLoop();
      bh.consume(i);
    });
  }
  @Benchmark
  public void testingCommonPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerParallelLoop();
      bh.consume(i);
    });
  }
}

E aqui está o resultado:

Benchmark                                         Mode  Cnt   Score   Error  Units
NestedPerf.testingCommonPool                     thrpt   25   1.935 ± 0.005  ops/s
NestedPerf.testingCommonPoolWithSequentialInner  thrpt   25   1.744 ± 0.007  ops/s
NestedPerf.testingNewPool                        thrpt   25  22.648 ± 0.559  ops/s

A diferença entre o método com novos Pools e o método com commonPool é surpreendente. Alguém tem uma ideia de por que criar novos pools torna as coisas cerca de 20x mais rápidas para esse benchmark?

Se ajudar, estou executando isso em um sistema Core i7 10850H com 12 CPUs disponíveis (hexcore + hyperthreading).

java
  • 2 2 respostas
  • 88 Views

2 respostas

  • Voted
  1. l'L'l
    2024-12-04T13:40:20+08:002024-12-04T13:40:20+08:00

    A diferença de desempenho que você está observando vem de como os ForkJoinPool.commonPool()identificadores aninham fluxos paralelos. Quando os fluxos externo e interno são usados parallelStream()​​sem um pool de threads personalizado, eles competem pelo mesmo conjunto limitado de threads no pool comum. Isso leva à contenção de threads e à subutilização de recursos da CPU porque o pool não consegue gerenciar efetivamente o paralelismo aninhado.

    Ao criar um novo ForkJoinPoolpara cada fluxo interno, você fornece threads dedicados para tarefas internas, evitando contenção com os threads do fluxo externo. Isso permite que ambos os níveis de paralelismo utilizem os núcleos da CPU completamente. Obviamente, como você notou, isso resulta em um aumento substancial de desempenho, apesar da sobrecarga de criar novos pools.

    Possíveis melhorias:

    Você pode evitar a sobrecarga de criar múltiplos pools usando um costume compartilhado ForkJoinPoolpara todos os fluxos internos. Essa abordagem elimina a sobrecarga de criação de pools enquanto ainda fornece threads separadas para paralelismo interno, levando a um desempenho ainda melhor.

    public class NestedPerf {
      private static final ForkJoinPool innerPool = new ForkJoinPool();
    
      @Benchmark
      public void testingSharedInnerPool(Blackhole bh){
        StateData.outerLoop.parallelStream().unordered().forEach(i -> {
          innerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
            try {
              Thread.sleep(5);
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
          })).join();
          bh.consume(i);
        });
      }
    }
    

    Alternativamente, você pode achatar o paralelismo para um único nível combinando os loops externo e interno em um fluxo paralelo. Este método pode utilizar efetivamente o fluxo paralelo sem paralelismo aninhado, frequentemente resultando no tempo de execução mais rápido porque maximiza a utilização da CPU e minimiza a sobrecarga.

    @Benchmark
    public void testingFlattenedStream(Blackhole bh){
      StateData.outerLoop.parallelStream()
        .flatMap(i -> StateData.innerLoop.stream())
        .unordered()
        .forEach(j -> {
          try {
            Thread.sleep(5);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
          bh.consume(j);
        });
    }
    

    Se você busca simplicidade e rapidez, o achatamento provavelmente vencerá.

    • 5
  2. Best Answer
    Slaw
    2024-12-04T16:11:02+08:002024-12-04T16:11:02+08:00

    Por que o rendimento aumenta

    Suas tarefas são simplesmente uma chamada para Thread::sleep. Isso bloqueia o thread de chamada, o que significa que o SO não agendará o thread para execução até que a duração especificada passe. Isso deixa a CPU livre para executar quaisquer outros threads. Em outras palavras, suas tarefas não são limitadas pela CPU e, portanto, não sobrecarregam a CPU. O que significa que lançar mais threads em seu conjunto de tarefas aumentará o rendimento sem sobrecarregar a CPU.

    Ao usar múltiplos pools fork-join, você está efetivamente aumentando o número de threads disponíveis para executar suas tarefas. Não é muito diferente de simplesmente aumentar o número de threads em um único pool. Quer você tenha 1 pool com 15 threads ou 3 pools com 5 threads cada, você ainda acaba com um total de 15 threads.

    Digamos que você tenha 10 tarefas que dormem por 5 milissegundos cada. Se você tiver 5 threads para executar essas tarefas, verá aproximadamente:

    Start 5 tasks => Wait 5 ms => Start 5 tasks => Wait 5 ms => Done!
    

    Mas se você tiver 10 threads, verá aproximadamente:

    Start 10 tasks => Wait 5 ms => Done!
    

    O primeiro leva um total de 10 milissegundos para executar cada tarefa, o segundo leva apenas 5 milissegundos. E é basicamente daí que vem o aumento de throughput nos seus testes.


    Mantendo o paralelismo

    Dito isso, a ForkJoinPooltem um nível definido de paralelismo. Uma maneira de tentar manter esse paralelismo é gerando uma nova thread (se o número máximo de threads ainda não tiver sido atingido) quando uma de suas threads for bloqueada. Da documentação :

    [A ForkJoinPool] tenta manter threads ativas (ou disponíveis) suficientes adicionando, suspendendo ou retomando dinamicamente threads de trabalho internas, mesmo se algumas tarefas estiverem paralisadas esperando para se juntar a outras. No entanto, nenhum ajuste desse tipo é garantido em face de E/S bloqueada ou outra sincronização não gerenciada. A ForkJoinPool.ManagedBlockerinterface aninhada permite a extensão dos tipos de sincronização acomodados.

    Você está chamando Thread::sleepde uma forma não gerenciada. Em outras palavras, você está bloqueando os threads do pool de tal forma que o pool não pode compensar. Para evitar isso, considere usar um ManagedBlocker. Aqui está um exemplo de implementação:

    import java.time.Duration;
    import java.util.concurrent.ForkJoinPool;
    
    public class SleepManagedBlocker implements ForkJoinPool.ManagedBlocker {
    
      private final Duration sleepDuration;
      private boolean slept; // Does this need to be volatile?
    
      public SleepManagedBlocker(Duration slepDuration) {
        this.sleepDuration = slepDuration;
      }
    
      @Override
      public boolean block() throws InterruptedException {
        if (!slept) {
          slept = true;
          Thread.sleep(sleepDuration);
        }
        return slept;
      }
    
      @Override
      public boolean isReleasable() {
        return slept;
      }
    }
    

    Então você substituiria as Thread.sleep(5)chamadas por:

    ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)))
    

    Você deverá ver aumentos de rendimento semelhantes em seus testes sem precisar usar vários pools de bifurcação e junção.


    Referências JMH

    Aqui está um benchmark mostrando o efeito de usar ManagedBlockerneste caso. Ele foi compilado e executado no Java 23.

    import java.time.Duration;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.IntStream;
    import org.openjdk.jmh.annotations.Benchmark;
    import org.openjdk.jmh.annotations.BenchmarkMode;
    import org.openjdk.jmh.annotations.Fork;
    import org.openjdk.jmh.annotations.Measurement;
    import org.openjdk.jmh.annotations.Mode;
    import org.openjdk.jmh.annotations.OutputTimeUnit;
    import org.openjdk.jmh.annotations.Param;
    import org.openjdk.jmh.annotations.Scope;
    import org.openjdk.jmh.annotations.State;
    import org.openjdk.jmh.annotations.Warmup;
    import org.openjdk.jmh.infra.Blackhole;
    
    @Fork(value = 1, jvmArgsAppend = {"-Djava.util.concurrent.ForkJoinPool.common.maximumSpares=1024"})
    @Warmup(iterations = 5)
    @Measurement(iterations = 5)
    @BenchmarkMode(Mode.Throughput)
    @OutputTimeUnit(TimeUnit.SECONDS)
    public class FJPBenchmarks {
    
      @Benchmark
      public void runTest(TestState state, Blackhole bh) {
        state.executeOuterLoop(bh);
      }
    
      @State(Scope.Benchmark)
      public static class TestState {
    
        private static final Duration SLEEP_DURATION = Duration.ofMillis(5);
        private static final int OUTER_LOOP_COUNT = 32;
        private static final int INNER_LOOP_COUNT = 32;
    
        @Param({"sequential", "parallel"})
        private String sequentialMode;
    
        @Param({"common", "separate"})
        private String poolMode;
    
        @Param({"raw", "managed"})
        private String sleepMode;
    
        void executeOuterLoop(Blackhole bh) {
          IntStream.range(0, OUTER_LOOP_COUNT)
              .unordered()
              .parallel()
              .forEach(i -> {
                executeInnerLoop(createInnerLoop());
                bh.consume(i);
              });
        }
    
        IntStream createInnerLoop() {
          var stream = IntStream.range(0, INNER_LOOP_COUNT).unordered();
          return switch (sequentialMode) {
            case "sequential" -> stream.sequential();
            case "parallel" -> stream.parallel();
            default -> throw new IllegalStateException("bad sequentialMode: " + sequentialMode);
          };
        }
    
        void executeInnerLoop(IntStream loop) {
          var sleeper = getSleeper();
          switch (poolMode) {
            case "common" -> loop.forEach(_ -> sleeper.sleepUnchecked());
            case "separate" -> {
              try (var pool = new ForkJoinPool()) {
                loop.forEach(_ -> pool.submit(sleeper::sleepUnchecked).join());
              }
            }
            default -> throw new IllegalStateException("bad poolMode: " + poolMode);
          }
        }
    
        Sleeper getSleeper() {
          return switch (sleepMode) {
            case "raw" -> () -> Thread.sleep(SLEEP_DURATION);
            case "managed" -> () -> ForkJoinPool.managedBlock(new SleepManagedBlocker());
            default -> throw new IllegalStateException("bad sleepMode: " + sleepMode);
          };
        }
    
        @FunctionalInterface
        interface Sleeper {
      
          void sleep() throws InterruptedException;
    
          default Void sleepUnchecked() {
            try {
              sleep();
            } catch (InterruptedException ex) {
              throw new RuntimeException(ex);
            }
            return null;
          }
        }
    
        static class SleepManagedBlocker implements ForkJoinPool.ManagedBlocker {
    
          private boolean slept;
    
          @Override
          public boolean block() throws InterruptedException {
            if (!slept) {
              slept = true;
              Thread.sleep(SLEEP_DURATION);
            }
            return true;
          }
    
          @Override
          public boolean isReleasable() {
            return slept;
          }
        }
      }
    }
    

    Resultados (da execução do benchmark em um computador com 8 processadores):

    Benchmark              (poolMode)  (sequentialMode)  (sleepMode)   Mode  Cnt   Score   Error  Units
    FJPBenchmarks.runTest      common        sequential          raw  thrpt    5   1.463 � 0.022  ops/s
    FJPBenchmarks.runTest      common        sequential      managed  thrpt    5   5.858 � 0.026  ops/s
    FJPBenchmarks.runTest      common          parallel          raw  thrpt    5   1.454 � 0.044  ops/s
    FJPBenchmarks.runTest      common          parallel      managed  thrpt    5  35.997 � 0.234  ops/s
    FJPBenchmarks.runTest    separate        sequential          raw  thrpt    5   1.426 � 0.325  ops/s
    FJPBenchmarks.runTest    separate        sequential      managed  thrpt    5   1.348 � 0.157  ops/s
    FJPBenchmarks.runTest    separate          parallel          raw  thrpt    5  13.505 � 1.175  ops/s
    FJPBenchmarks.runTest    separate          parallel      managed  thrpt    5  16.864 � 0.186  ops/s
    
    • 4

relate perguntas

  • Lock Condition.notify está lançando java.lang.IllegalMonitorStateException

  • Resposta de microsserviço Muitos para Um não aparece no carteiro

  • Validação personalizada do SpringBoot Bean

  • Os soquetes Java são FIFO?

  • Por que não é possível / desencorajado definir um lado do servidor de tempo limite de solicitação?

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    Vue 3: Erro na criação "Identificador esperado, mas encontrado 'import'" [duplicado]

    • 1 respostas
  • Marko Smith

    Por que esse código Java simples e pequeno roda 30x mais rápido em todas as JVMs Graal, mas não em nenhuma JVM Oracle?

    • 1 respostas
  • Marko Smith

    Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores?

    • 1 respostas
  • Marko Smith

    Como faço para corrigir um erro MODULE_NOT_FOUND para um módulo que não importei manualmente?

    • 6 respostas
  • Marko Smith

    `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso?

    • 3 respostas
  • Marko Smith

    Quando devo usar um std::inplace_vector em vez de um std::vector?

    • 3 respostas
  • Marko Smith

    Um programa vazio que não faz nada em C++ precisa de um heap de 204 KB, mas não em C

    • 1 respostas
  • Marko Smith

    PowerBI atualmente quebrado com BigQuery: problema de driver Simba com atualização do Windows

    • 2 respostas
  • Marko Smith

    AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos

    • 1 respostas
  • Marko Smith

    Estou tentando fazer o jogo pacman usando apenas o módulo Turtle Random e Math

    • 1 respostas
  • Martin Hope
    Aleksandr Dubinsky Por que a correspondência de padrões com o switch no InetAddress falha com 'não cobre todos os valores de entrada possíveis'? 2024-12-23 06:56:21 +0800 CST
  • Martin Hope
    Phillip Borge Por que esse código Java simples e pequeno roda 30x mais rápido em todas as JVMs Graal, mas não em nenhuma JVM Oracle? 2024-12-12 20:46:46 +0800 CST
  • Martin Hope
    Oodini Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores? 2024-12-12 06:27:11 +0800 CST
  • Martin Hope
    sleeptightAnsiC `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso? 2024-11-09 07:18:53 +0800 CST
  • Martin Hope
    The Mad Gamer Quando devo usar um std::inplace_vector em vez de um std::vector? 2024-10-29 23:01:00 +0800 CST
  • Martin Hope
    Chad Feller O ponto e vírgula agora é opcional em condicionais bash com [[ .. ]] na versão 5.2? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench Por que um traço duplo (--) faz com que esta cláusula MariaDB seja avaliada como verdadeira? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng Por que `dict(id=1, **{'id': 2})` às vezes gera `KeyError: 'id'` em vez de um TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos 2024-03-20 03:12:31 +0800 CST
  • Martin Hope
    MarkB Por que o GCC gera código que executa condicionalmente uma implementação SIMD? 2024-02-17 06:17:14 +0800 CST

Hot tag

python javascript c++ c# java typescript sql reactjs html

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve