Skip to main content
Use queues to limit concurrent workflow executions.

Define queues

from polos import queue

# Limit concurrent API calls
api_queue = queue("api-calls", concurrency_limit=5)

# Limit database connections
db_queue = queue("database-ops", concurrency_limit=10)

# Limit CPU-intensive work
heavy_queue = queue("heavy-processing", concurrency_limit=2)

Assign workflows to queues

from polos import workflow, WorkflowContext

@workflow(id="api_call", queue=api_queue)
async def api_call(ctx: WorkflowContext, payload):
    return await ctx.step.run("request", make_api_request, payload)

@workflow(id="db_read", queue=db_queue)
async def db_read(ctx: WorkflowContext, payload):
    return await ctx.step.run("query", execute_query, payload)

# Multiple workflows can share the same queue
@workflow(id="db_write", queue=db_queue)
async def db_write(ctx: WorkflowContext, payload):
    return await ctx.step.run("insert", insert_data, payload)

Inline queue config

@workflow(id="inline_queue", queue={"concurrency_limit": 3})
async def inline_queue(ctx: WorkflowContext, payload):
    return {"message": "Processed"}

@workflow(id="named_queue", queue="my-queue")
async def named_queue(ctx: WorkflowContext, payload):
    return {"message": "Processed"}

Run it

git clone https://github.com/polos-dev/polos.git
cd polos/python-examples/12-shared-queues
cp .env.example .env
uv sync
python worker.py      # Terminal 1
python main.py        # Terminal 2
Open http://localhost:5173 to view your agents and workflows, run them from the UI, and see execution traces. Python example on GitHub | TypeScript example on GitHub