AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / coding / Perguntas / 79556930
Accepted
Ken Kiarie
Ken Kiarie
Asked: 2025-04-05 20:07:36 +0800 CST2025-04-05 20:07:36 +0800 CST 2025-04-05 20:07:36 +0800 CST

Como gerar novas corrotinas para o processamento de cada elemento em um fluxo frio?

  • 772

Me deparei com um exemplo que simula o acesso a um banco de dados lento para obter um fluxo de identificadores de usuários associados a um perfil que pode ser acessado por meio de um recurso de rede ainda mais lento:

fun getAllUserIds():Flow<Int>{
    return flow{
        repeat(3){
            delay(200.milliseconds)
            log("Emitting!")
            emit(it)
            }
         } 
      }

suspend fun getProfileFromNetwork(id:Int):String{
    delay(2.seconds)
    return "Profile[$id]"
  }

fun main(){
    val ids = getAllUserIds()
    runBlocking{
        ids
            .map{getProfileFromNetwork(it)}
            .collect{log("Got $it")}
        }
  }

a emissão dos IDs e a solicitação de perfis estão interligadas. Como eu otimizaria isso gerando novas coroutines para o processamento de cada elemento no fluxo?

kotlin-coroutines
  • 2 2 respostas
  • 27 Views

2 respostas

  • Voted
  1. Best Answer
    IliyaTi
    2025-04-05T21:20:55+08:002025-04-05T21:20:55+08:00

    Se quiser obter o resultado como um todo, você pode usar flatMapMerge, que foi projetado para esse cenário exato: mapear simultaneamente cada elemento no fluxo para um novo fluxo e mesclar os resultados.

    fun getAllUserIds(): Flow<Int> {
        return flow {
            repeat(3) {
                delay(200.milliseconds)
                log("Emitting!")
                emit(it)
            }
        }
    }
    
    suspend fun getProfileFromNetwork(id: Int): String {
        delay(2.seconds)
        return "Profile[$id]"
    }
    
    fun main() = runBlocking {
        getAllUserIds()
            .flatMapMerge { id ->
                flow {
                    emit(getProfileFromNetwork(id))
                }
            }
            .collect { log("Got $it") }
    }
    

    Mas eu penso em uma abordagem melhor; mostrar o máximo de dados buscados para o usuário e ter um efeito visual, indicando que algum pedaço de dados ainda está para carregar usando, por exemplo, ShimmerEffect ou qualquer outra alternativa. Tente carregá-lo preguiçosamente para cada item da lista.

    Tente disparar uma corrotina independente para buscar o Perfil quando o UserID tiver sido buscado do banco de dados dentro da lógica do item RecyclerView/List (não se esqueça de ter uma política de cache para minimizar a carga da rede).

    • 2
  2. tyg
    2025-04-06T00:05:06+08:002025-04-06T00:05:06+08:00

    Acho que você confundiu o que o fluxo do banco de dados retorna. Se você consultar todos os IDs de usuário, você obterá um List<Int>. Quando os fluxos são suportados, então você também pode ter a opção de obter um Flow<List<Int>>. O fluxo emitirá um novo valor sempre que algo mudar no banco de dados. Quando isso acontece, a consulta é executada automaticamente novamente e uma nova lista de todos os IDs de usuário é retornada. O fluxo é apenas uma maneira conveniente de observar o banco de dados ao longo do tempo para alterações. Ele não emite cada ID de usuário um de cada vez.

    Com isso em mente, sua função fictícia getAllUserIds()deve ser parecida com esta:

    fun getAllUserIds(): Flow<List<Int>> {
        return flow {
            repeat(3) {
                delay(200.milliseconds)
                log("Emitting!")
                val ids = (0..it).toList()
                emit(ids)
            }
        }
    }
    

    Este código emula que a cada 200 ms um novo usuário é inserido no banco de dados. O fluxo agora emitirá três listas diferentes de usuários:

    [0]
    [0, 1]
    [0, 1, 2]
    

    Sua função principal deve então iterar a lista para recuperar o perfil de cada id:

    fun main() {
        val flowOfIds = getAllUserIds()
        runBlocking {
            flowOfIds
                .map { listOfIds ->
                    listOfIds.map { getProfileFromNetwork(it) }
                }
                .collect { log("Got $it") }
        }
    }
    

    flowOfIds.mapsubstitui cada valor de fluxo (ou seja, o fluxo agora contém um List<String>em vez de um List<Int>). listOfIds.mapsubstitui cada entrada da lista (ou seja, a lista agora contém Strings em vez de Ints).


    Voltando à pergunta original.

    Como está agora, getProfileFromNetwork()é chamado sequentially . Se você quiser que isso aconteça concorrentemente , você precisa lançar uma nova coroutine para cada:

    flowOfIds
        .map { listOfIds ->
            listOfIds
                .map { async { getProfileFromNetwork(it) } }
                .awaitAll()
        }
    

    asynclança uma nova coroutine no escopo fornecido por runBlocking, e awaitAll()espera que todas as coroutines terminem. Ele retorna uma lista de perfis, ou seja, um List<String>.

    Isso é repetido para cada novo valor de fluxo, então o perfil para o ID do usuário 0é recuperado três vezes no total. O perfil para o ID do usuário 2é recuperado apenas uma vez porque ocorre apenas na última emissão de fluxo.

    Como getProfileFromNetworké mais lento do que novos valores de fluxo são emitidos, você pode querer alternar de flowOfIds.mappara flowOfIds.mapLatest. Este cancela a transformação de fluxo anterior quando um novo valor de fluxo chega, então você não precisa esperar a transformação anterior terminar antes de iniciar a nova.

    Além disso, você deve considerar seriamente armazenar em cache os resultados de getProfileFromNetwork(), para não fazer repetidamente as mesmas solicitações de rede (como buscar 0o perfil do id do usuário três vezes). Isso é algo que deve ser feito dentro de getProfileFromNetwork(). Você provavelmente deve mover essa função para uma classe de repositório ou algo semelhante, para poder construir seu cache lá.

    • 1

relate perguntas

  • Como reverter uma transação programaticamente usando a API não bloqueante do Jooq?

  • Condições de chamada para as três funções de kotlinx.coroutines.channels.ChannelResult

  • Lançamento de corrotinas Kotlin versus tratamento de exceções assíncronas

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    Reformatar números, inserindo separadores em posições fixas

    • 6 respostas
  • Marko Smith

    Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não?

    • 2 respostas
  • Marko Smith

    Problema com extensão desinstalada automaticamente do VScode (tema Material)

    • 2 respostas
  • Marko Smith

    Vue 3: Erro na criação "Identificador esperado, mas encontrado 'import'" [duplicado]

    • 1 respostas
  • Marko Smith

    Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores?

    • 1 respostas
  • Marko Smith

    Como faço para corrigir um erro MODULE_NOT_FOUND para um módulo que não importei manualmente?

    • 6 respostas
  • Marko Smith

    `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso?

    • 3 respostas
  • Marko Smith

    Um programa vazio que não faz nada em C++ precisa de um heap de 204 KB, mas não em C

    • 1 respostas
  • Marko Smith

    PowerBI atualmente quebrado com BigQuery: problema de driver Simba com atualização do Windows

    • 2 respostas
  • Marko Smith

    AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos

    • 1 respostas
  • Martin Hope
    Fantastic Mr Fox Somente o tipo copiável não é aceito na implementação std::vector do MSVC 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant Encontre o próximo dia da semana usando o cronógrafo 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor O inicializador de membro do construtor pode incluir a inicialização de outro membro? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul O C++20 mudou para permitir a conversão de `type(&)[N]` de matriz de limites conhecidos para `type(&)[]` de matriz de limites desconhecidos? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann Como/por que {2,3,10} e {x,3,10} com x=2 são ordenados de forma diferente? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller O ponto e vírgula agora é opcional em condicionais bash com [[ .. ]] na versão 5.2? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench Por que um traço duplo (--) faz com que esta cláusula MariaDB seja avaliada como verdadeira? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng Por que `dict(id=1, **{'id': 2})` às vezes gera `KeyError: 'id'` em vez de um TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos 2024-03-20 03:12:31 +0800 CST

Hot tag

python javascript c++ c# java typescript sql reactjs html

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve