Considere este exemplo em que tenho 3 tarefas de trabalho que enviam resultados para uma fila e uma tarefa que lida com os dados enviados.
async def worker1(queue: asyncio.Queue):
while True:
res = await do_some_work(param=1)
await queue.put(res)
async def worker2(queue: asyncio.Queue):
while True:
res = await do_some_work(param=2)
await queue.put(res)
async def worker3(queue: asyncio.Queue):
while True:
res = await do_some_work(param=3)
await queue.put(res)
async def handle_results(queue: asyncio.Queue):
while True:
res = await queue.get()
await handle_result(res)
queue.task_done()
async def main():
queue = asyncio.Queue()
t1 = asyncio.create_task(worker1(queue))
t2 = asyncio.create_task(worker2(queue))
t3 = asyncio.create_task(worker3(queue))
handler = asyncio.create_task(handle_result(queue))
while True:
# do some other stuff
....
asyncio.run(main())
A documentação diz que asyncio.Queue
não é thread-safe, mas isso não deveria se aplicar aqui, pois todas as tarefas estão sendo executadas na mesma thread. Mas preciso de um asyncio.Lock
para proteger a fila quando tenho 3 tarefas que enviam para a mesma fila? Observando a implementação em Python 3.12 (que cria um putter
future e aguarda por ele antes de enviar para a fila), eu diria que não , mas não tenho certeza, e a documentação não menciona o que aconteceria neste caso. Então, o asyncio.Lock
neste caso é necessário?
Não - não há necessidade de bloqueios para colocar ou ler itens de filas assíncronas.
Tenha em mente que o código multithread em Python já exigirá muito menos bloqueios do que a maioria dos códigos em outras linguagens, já que as próprias estruturas de dados são thread-safe. Portanto, mesmo com uma compilação de thread livre (sem o GIL), se você tiver várias threads anexando valores a uma lista, por exemplo, a lista sempre estará em um estado consistente. É claro que o código que modificaria ou criaria novas chaves em um dicionário compartilhado precisará de bloqueios adequados, mesmo que o dicionário em si nunca "quebre".
Quando paramos de fazer programação assíncrona, outras tarefas simultâneas só serão executadas quando nosso código atingir uma
await
expressão (async for
ouasync with
instrução) - então a necessidade de bloqueios é reduzida ainda mais.Em outras palavras, se não houver código em execução em outras threads, mesmo com muitas tarefas simultâneas, coisas como esta:
são seguros em termos de simultaneidade em código assíncrono.
Além disso, as filas assíncronas são construídas para consistência em contextos assíncronos. Elas nunca quebrariam ou entrariam em um estado inconsistente se duas tarefas simultâneas tentassem
put
,get
ou usarno_wait
variantes daquelas no código em execução na mesma thread. (Embora se você precisar colocar dados em outra thread para serem consumidos em uma tarefa assíncrona, isso é outra história e exigirá um padrão cuidadosamente desenvolvido para funcionar)Vá em frente - e tenha em mente que, a menos que você queira ceder ao loop assíncrono no momento em que estiver fazendo um
put
, ou esteja realmente preocupado em restringir o tamanho da fila, você pode simplesmente usar toput_nowait
(sem oawait
) - isso até impedirá que outras tarefas sejam executadas "perto" do seu put.