Async Queues (producer-consumer)
Why asyncio.Queue
asyncio.Queueasyncio.Queue enables:
- safe communication between coroutines
- backpressure (limit queue size)
Example pipeline
async_queue.py
import asyncio
async def producer(q: asyncio.Queue):
for i in range(10):
await q.put(i)
print("produced", i)
await q.put(None) # sentinel
async def consumer(q: asyncio.Queue):
while True:
item = await q.get()
try:
if item is None:
break
print("consumed", item)
await asyncio.sleep(0.1)
finally:
q.task_done()
async def main():
q = asyncio.Queue(maxsize=5)
p = asyncio.create_task(producer(q))
c = asyncio.create_task(consumer(q))
await p
await q.join()
await c
asyncio.run(main())async_queue.py
import asyncio
async def producer(q: asyncio.Queue):
for i in range(10):
await q.put(i)
print("produced", i)
await q.put(None) # sentinel
async def consumer(q: asyncio.Queue):
while True:
item = await q.get()
try:
if item is None:
break
print("consumed", item)
await asyncio.sleep(0.1)
finally:
q.task_done()
async def main():
q = asyncio.Queue(maxsize=5)
p = asyncio.create_task(producer(q))
c = asyncio.create_task(consumer(q))
await p
await q.join()
await c
asyncio.run(main())Tips
- Use a sentinel to stop consumers.
- Use
maxsizemaxsizefor backpressure.
๐งช Try It Yourself
Exercise 1 โ asyncio.Queue Basics
Exercise 2 โ Async Producer-Consumer
Exercise 3 โ Queue Size
If this helped you, consider buying me a coffee โ
Buy me a coffeeWas this page helpful?
Let us know how we did
