Skip to content

Basic Queue (Producer + Consumer)

This guide shows the smallest end-to-end setup: enqueue JSON work from a producer, process it with a consumer, and shut down cleanly.

It is intentionally "low level" (queue primitives), and complements the workflow-focused guide.

Prerequisites

  • pgqrs installed
  • A database backend selected (examples use SQLite for simplicity)
  • Schema installed (admin.install())

Setup

The snippets in this page focus on the consumer patterns (polling + interrupt).

They assume you already have:

  • store (connected + bootstrapped)
  • producer and consumer (or consumer_a / consumer_b)

If you want fully runnable examples end-to-end, use the guide tests directly:

  • Rust: crates/pgqrs/tests/guide_tests.rs
  • Python: py-pgqrs/tests/test_guides.py

Step 3: Create a Consumer and Poll

The consumer runs a poll loop that:

  • dequeues up to batch_size messages
  • calls your handler
  • archives messages on success
  • releases messages back to the queue on handler error
    // Assumes `store` and `consumer` already exist.
    let store_task = store.clone();
    let consumer_task_handle = consumer.clone();

    let consumer_task = tokio::spawn(async move {
        pgqrs::dequeue()
            .worker(&consumer_task_handle)
            .batch(1)
            .handle(|_msg| Box::pin(async { Ok(()) }))
            .poll(&store_task)
            .await
    });
    consumer.interrupt().await.unwrap();

    let res = timeout(Duration::from_secs(15), consumer_task)
        .await
        .unwrap()
        .unwrap();
    assert!(matches!(res, Err(pgqrs::error::Error::Suspended { .. })));
    assert_eq!(
        consumer.status().await.unwrap(),
        pgqrs::types::WorkerStatus::Suspended
    );
# Assumes `store` and `consumer` already exist.
async def basic_queue_py_poll_and_interrupt(store, consumer, handler):
    task = asyncio.create_task(
        pgqrs.dequeue().worker(consumer).batch(1).handle(handler).poll(store)
    )
    await consumer.interrupt()
    return task

More Patterns

Two common variations you can build on top of the basic consumer loop.

Handoff Between Consumers

Start consumer A, process one message, interrupt it, then start consumer B and confirm the next message is claimed by B.

    // Assumes `store` and `consumer_a` already exist.
    let store_a = store.clone();
    let consumer_a_task_handle = consumer_a.clone();
    let task_a = tokio::spawn(async move {
        pgqrs::dequeue()
            .worker(&consumer_a_task_handle)
            .batch(5)
            .handle_batch(|_msgs| Box::pin(async { Ok(()) }))
            .poll(&store_a)
            .await
    });
    // Assumes `consumer_a` and `task_a` exist.
    consumer_a.interrupt().await.unwrap();
    let res_a = timeout(Duration::from_secs(5), task_a)
        .await
        .unwrap()
        .unwrap();
    assert!(matches!(res_a, Err(pgqrs::error::Error::Suspended { .. })));
    assert_eq!(
        consumer_a.status().await.unwrap(),
        pgqrs::types::WorkerStatus::Suspended
    );
    // Assumes `store` and `consumer_b` already exist.
    let store_b = store.clone();
    let consumer_b_task_handle = consumer_b.clone();
    let task_b = tokio::spawn(async move {
        pgqrs::dequeue()
            .worker(&consumer_b_task_handle)
            .batch(5)
            .handle_batch(|_msgs| Box::pin(async { Ok(()) }))
            .poll(&store_b)
            .await
    });
    // Assumes `consumer_b` and `task_b` exist.
    consumer_b.interrupt().await.unwrap();
    let res_b = timeout(Duration::from_secs(5), task_b)
        .await
        .unwrap()
        .unwrap();
    assert!(matches!(res_b, Err(pgqrs::error::Error::Suspended { .. })));
    assert_eq!(
        consumer_b.status().await.unwrap(),
        pgqrs::types::WorkerStatus::Suspended
    );
    task_a = asyncio.create_task(
        pgqrs.dequeue()
        .worker(consumer_a)
        .batch(5)
        .handle_batch(handle_batch)
        .poll(store)
    )
    await consumer_a.interrupt()
    with pytest.raises(Exception):
        await asyncio.wait_for(task_a, timeout=5)
    task_b = asyncio.create_task(
        pgqrs.dequeue()
        .worker(consumer_b)
        .batch(5)
        .handle_batch(handle_batch)
        .poll(store)
    )
    await consumer_b.interrupt()
    with pytest.raises(Exception):
        await asyncio.wait_for(task_b, timeout=5)

Two Consumers Processing Continuously

Run two consumers in parallel and enqueue a small batch; both consumers should drain the queue until interrupted.

    // Assumes `store`, `consumer_a`, and `consumer_b` already exist.
    let store_a = store.clone();
    let consumer_a_task_handle = consumer_a.clone();
    let task_a = tokio::spawn(async move {
        pgqrs::dequeue()
            .worker(&consumer_a_task_handle)
            .batch(10)
            .handle_batch(|_msgs| Box::pin(async { Ok(()) }))
            .poll(&store_a)
            .await
    });

    let store_b = store.clone();
    let consumer_b_task_handle = consumer_b.clone();
    let task_b = tokio::spawn(async move {
        pgqrs::dequeue()
            .worker(&consumer_b_task_handle)
            .batch(10)
            .handle_batch(|_msgs| Box::pin(async { Ok(()) }))
            .poll(&store_b)
            .await
    });
    // Assumes `consumer_a`, `consumer_b`, `task_a`, and `task_b` exist.
    consumer_a.interrupt().await.unwrap();
    consumer_b.interrupt().await.unwrap();

    let res_a = timeout(Duration::from_secs(5), task_a)
        .await
        .unwrap()
        .unwrap();
    let res_b = timeout(Duration::from_secs(5), task_b)
        .await
        .unwrap()
        .unwrap();

    assert!(matches!(res_a, Err(pgqrs::error::Error::Suspended { .. })));
    assert!(matches!(res_b, Err(pgqrs::error::Error::Suspended { .. })));

    assert_eq!(
        consumer_a.status().await.unwrap(),
        pgqrs::types::WorkerStatus::Suspended
    );
    assert_eq!(
        consumer_b.status().await.unwrap(),
        pgqrs::types::WorkerStatus::Suspended
    );
    task_a = asyncio.create_task(
        pgqrs.dequeue()
        .worker(consumer_a)
        .batch(10)
        .handle_batch(handle_batch)
        .poll(store)
    )
    task_b = asyncio.create_task(
        pgqrs.dequeue()
        .worker(consumer_b)
        .batch(10)
        .handle_batch(handle_batch)
        .poll(store)
    )
    try:
        await consumer_a.interrupt()
    except pgqrs.StateTransitionError:
        pass

    try:
        await consumer_b.interrupt()
    except pgqrs.StateTransitionError:
        pass

    with pytest.raises(Exception):
        await asyncio.wait_for(task_a, timeout=5)
    with pytest.raises(Exception):
        await asyncio.wait_for(task_b, timeout=5)

    assert await consumer_a.status() == "SUSPENDED"
    assert await consumer_b.status() == "SUSPENDED"

Next Steps

  • If you need retries/backoff, see docs/user-guide/guides/durable-workflows.md
  • If you need visibility timeouts and scheduling, see docs/user-guide/guides/delayed-messages.md
  • For production worker patterns, see docs/user-guide/guides/worker-management.md