Skip to content

Async Queues (producer-consumer)

Why asyncio.Queue

asyncio.Queueasyncio.Queue enables:

  • safe communication between coroutines
  • backpressure (limit queue size)

Example pipeline

async_queue.py
import asyncio
 
 
async def producer(q: asyncio.Queue):
    for i in range(10):
        await q.put(i)
        print("produced", i)
    await q.put(None)  # sentinel
 
 
async def consumer(q: asyncio.Queue):
    while True:
        item = await q.get()
        try:
            if item is None:
                break
            print("consumed", item)
            await asyncio.sleep(0.1)
        finally:
            q.task_done()
 
 
async def main():
    q = asyncio.Queue(maxsize=5)
 
    p = asyncio.create_task(producer(q))
    c = asyncio.create_task(consumer(q))
 
    await p
    await q.join()
    await c
 
 
asyncio.run(main())
async_queue.py
import asyncio
 
 
async def producer(q: asyncio.Queue):
    for i in range(10):
        await q.put(i)
        print("produced", i)
    await q.put(None)  # sentinel
 
 
async def consumer(q: asyncio.Queue):
    while True:
        item = await q.get()
        try:
            if item is None:
                break
            print("consumed", item)
            await asyncio.sleep(0.1)
        finally:
            q.task_done()
 
 
async def main():
    q = asyncio.Queue(maxsize=5)
 
    p = asyncio.create_task(producer(q))
    c = asyncio.create_task(consumer(q))
 
    await p
    await q.join()
    await c
 
 
asyncio.run(main())

Tips

  • Use a sentinel to stop consumers.
  • Use maxsizemaxsize for backpressure.

๐Ÿงช Try It Yourself

Exercise 1 โ€“ asyncio.Queue Basics

Exercise 2 โ€“ Async Producer-Consumer

Exercise 3 โ€“ Queue Size

If this helped you, consider buying me a coffee โ˜•

Buy me a coffee

Was this page helpful?

Let us know how we did