Tenho trabalhado em uma implementação Future em Rust, e encontrei um comportamento que não entendo completamente. Especificamente, estou usando std::sync::mpsc::Receiver
dentro do poll
método, e estou tentando decidir entre while let
e if let
para receber mensagens.
Aqui está a versão simplificada do meu método de enquete:
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// Option 1: Using `while let`
while let Ok(new_attributes) = this.receiver.try_recv() {
// Process the message
}
// Option 2: Using `if let`
if let Ok(new_attributes) = this.receiver.try_recv() {
// Process the message
}
Poll::Pending
}
O que eu observei
Quando uso while let, tudo parece funcionar como esperado: o Future é re-polled quando novas mensagens chegam. No entanto, quando uso if let, o Future parece travar e nunca mais acorda, mesmo quando novas mensagens estão disponíveis.
Meu Entendimento
Eu sei que o Waker deve ser usado para notificar o executor de que o Future deve ser re-pesquisado. Dado que minha função de pesquisa sempre retorna Poll::Pending, eu esperaria que o Future continuasse sendo pesquisado repetidamente, a menos que o próprio processo pare. No entanto, não entendo por que while let garante que o Waker seja acionado corretamente, enquanto if let não parece fazer o mesmo.
Contexto adicional
O Futuro é gerado assim:
ctx.task_executor()
.spawn_critical_blocking("transaction execution service", Box::pin(fut_struct));
A função spawn_critical_blocking funciona da seguinte maneira:
pub fn spawn_critical_blocking<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
where
F: Future<Output = ()> + Send + 'static,
{
self.spawn_critical_as(name, fut, TaskKind::Blocking)
}
E internamente:
fn spawn_critical_as<F>(
&self,
name: &'static str,
fut: F,
task_kind: TaskKind,
) -> JoinHandle<()>
where
F: Future<Output = ()> + Send + 'static,
{
// Wrapping the future and handling task errors
let task = std::panic::AssertUnwindSafe(fut)
.catch_unwind()
.map_err(...);
let task = async move {
// Handling task shutdown and execution
let task = pin!(task);
let _ = select(on_shutdown, task).await;
};
self.spawn_on_rt(task, task_kind)
}
O spawn_on_rt
método então usa spawn_blocking
.
Minha pergunta
- Por que usar while let garante que o Waker seja acionado, mas usar if let
- faz
não?
- Qual é o mecanismo subjacente que causa essa diferença de comportamento?
Quaisquer informações ou explicações serão muito apreciadas!
Antes de responder às suas perguntas, gostaria de desafiar uma de suas suposições:
Isso, embora tecnicamente possível, não é de se esperar.
async
Tempos de execução normais (comotokio
) tentam ser eficientes apenas sondando quando o progresso foi feito, conforme sinalizado usando oWaker
transmitido peloContext
.Observe que você nunca usa o
Context
argumento which, a menos que seu future sempre retornePoll::Ready
imediatamente, é um erro, pois não há razão confiável para que suapoll
função seja chamada novamente. (Tecnicamente, é possível que ela seja chamada novamente, porqueasync
os tempos de execução podem criar futures espúriospoll
, mas você nunca deve confiar nisso!)Não há nada sobre
while let
isso que seja garantido para disparar oWaker
. É por esse motivo que o código a seguir não funciona:Saída:
A única diferença entre
if let
ewhile let
é que o último continuará a ler do canal se houver mais mensagens armazenadas em buffer. Uma explicação alternativa, mais plausível, para o que você está vendo é quewhile let
permite uma única invocação depoll
para processar várias mensagens sem fazer nada para agendar uma invocação subsequente depoll
.Como consertar
O principal problema com seu código é que você está tentando usar uma implementação de canal síncrono da biblioteca padrão em um
async
contexto; Eles são incompatíveis, e misturá-los dessa forma é incorreto e não confiável. Você deve mudar para umasync
canal baseado em - comotokio::sync::mpsc
:Observe como receber no canal agora envolve uma
poll
chamada interna do tipo - que toma oContext
como argumento. Se não houver uma mensagem para receber no momento, oWaker
será associado ao canal (de uma forma específica da implementação) de modo que pode fazer com que o future seja pesquisado quando uma mensagem chegar.Saída:
Observe que há duas pesquisas. Presumivelmente, a primeira não encontrou nenhuma mensagem para receber. O uso adequado da
async
pesquisa significou que o futuro foi pesquisado produtivamente novamente.