Skip to content

Durable Workflows Guide

This guide walks you through building a durable workflow with pgqrs v0.14. Durable workflows are multi-step processes that survive worker crashes, handle transient errors with retries, and can even pause for external events.

Prerequisites

  • pgqrs v0.14+ installed
  • PostgreSQL running
  • Database connection string (DSN)

What We'll Build

We'll create data processing workflows that demonstrate:

  1. Successful execution - basic workflow completion
  2. Crash recovery - workflow resumes from cached step results
  3. Transient errors - automatic retry with backoff
  4. Pausing - wait for external events (human approval, webhooks)

Setup

The snippets in this page focus on the durable workflow patterns.

They assume you already have:

  • store (connected + bootstrapped)

If you want fully runnable examples end-to-end, use the guide tests directly:

  • Rust: crates/pgqrs/tests/concurrent_tests.rs

Workflow Patterns

Crash Recovery

When a worker crashes mid-execution, the workflow can resume from the last completed step. Step results are cached in the database to avoid repeating work.

1. Define the Workflow with Durable Steps

Use pgqrs::workflow_step() wrappers alongside #[pgqrs_workflow] to automatically track the completion status and cached results for each step.

    #[pgqrs_workflow(name = "crash_recovery_wf")]
    async fn crash_recovery_wf(run: &Run, _input: serde_json::Value) -> Result<serde_json::Value> {
        // Step 1: This will complete and be cached
        pgqrs::workflow_step(run, "step1", || async {
            Ok(serde_json::json!({"data": "from step 1"}))
        })
        .await?;

        if !CRASH_RECOVERY_HAS_CRASHED.swap(true, std::sync::atomic::Ordering::SeqCst) {
            return Err(pgqrs::Error::TestCrash);
        }

        // Step 2
        pgqrs::workflow_step(run, "step2", || async {
            Ok::<_, pgqrs::Error>("step2_done".to_string())
        })
        .await?;

        Ok(json!({"done": true}))
    }
    @workflow_def(name="crash_recovery_wf")
    async def crash_recovery_wf(run_ctx, input_data: dict) -> dict:
        @step_def
        async def step1(step_ctx):
            return {"data": "from step 1"}

        await step1(run_ctx)

        if not crash_state["has_crashed"]:
            crash_state["has_crashed"] = True
            raise BaseException("TestCrash")

        @step_def
        async def step2(step_ctx):
            return "step2_done"

        await step2(run_ctx)

        return {"done": True}

2. Execute and Simulate Crash

When the workflow begins executing on the first consumer, it processes step1 and saves the output (e.g., {"data": "from step 1"}).

// Trigger the workflow run
    // Trigger workflow - consumer will pick it up and crash after step1
    let input = true;
    let msg = pgqrs::workflow()
        .name(crash_recovery_wf)
        .trigger(&input)
        .unwrap()
        .execute(&store)
        .await
        .unwrap();

// Start processing
    // Start consumer with workflow().poll() - this will process until crash
    let store_for_consumer = store.clone();
    let consumer_handle = consumer.clone();
    let _consumer_task = tokio::spawn(async move {
        pgqrs::workflow()
            .name(crash_recovery_wf)
            .consumer(&consumer_handle)
            .poll(&store_for_consumer)
            .await
    });
# Trigger the workflow run
    input_data = {"test": True}
    msg_trigger = pgqrs.workflow().name("crash_recovery_wf").store(store).trigger(input_data)
    msg = await msg_trigger.execute()

# Start processing
    consumer_task = asyncio.create_task(
        pgqrs.dequeue().worker(consumer).handle_workflow(crash_recovery_wf).poll(store)
    )

If the consumer process crashes before finishing step2, the in-memory execution halts. The run status remains active in the database.

3. Release and Recover

Usually, the orchestrator detects crashed workers via timeouts and releases the message back to the queue. For demonstration, we manually release that worker's messages.

    // Wait for consumer to process and crash (TestCrash)
    // Then interrupt the consumer to simulate real crash
    timeout(Duration::from_secs(10), async {
        loop {
            let steps = store.workflow_steps().list().await.unwrap();
            let step = steps.iter().find(|s| s.step_name == "step1");
            if let Some(s) = step {
                eprintln!("Step 1 status: {:?}", s.status);
                if s.status == pgqrs::WorkflowStatus::Success {
                    consumer.interrupt().await.unwrap();
                    tokio::time::sleep(Duration::from_millis(100)).await;
                    break;
                }
            } else {
                eprintln!("Step 1 not found yet");
            }
            tokio::time::sleep(Duration::from_millis(500)).await;
        }
    })
    .await
    .unwrap();

    // Now the consumer is interrupted, release the message
    pgqrs::admin(&store)
        .release_worker_messages(consumer.worker_id())
        .await
        .unwrap();
    async def wait_for_step1_success():
        while True:
            steps = await (await store.get_workflow_steps()).list()
            step = next((s for s in steps if s.step_name == "step1" and s.status == "SUCCESS"), None)
            if step:
                return step
            await asyncio.sleep(POLL_INTERVAL)

    await asyncio.wait_for(wait_for_step1_success(), timeout=10.0)

    try:
        await consumer.interrupt()
    except pgqrs.StateTransitionError:
        pass

    with pytest.raises(BaseException, match="TestCrash"):
         await asyncio.wait_for(consumer_task, timeout=5)

    await pgqrs.admin(store).release_worker_messages(consumer.worker_id)

A new consumer picks up the workflow run. When the execution hits step1 again, pgqrs sees it is already cached, skips executing the closure, and returns the cached result immediately. The execution smoothly resumes at step2.

    // Create new consumer to resume and complete the workflow
    let consumer2 = pgqrs::consumer("crash-worker-2", 9602, "crash_recovery_wf")
        .create(&store)
        .await
        .unwrap();

    let store_for_consumer2 = store.clone();
    let consumer2_handle = consumer2.clone();
    let _consumer_task2 = tokio::spawn(async move {
        pgqrs::workflow()
            .name(crash_recovery_wf)
            .consumer(&consumer2_handle)
            .poll(&store_for_consumer2)
            .await
    });

    // Wait for workflow to complete
    wait_for_workflow_complete(&store, msg.id, Duration::from_secs(10)).await;
    consumer_2 = await store.consumer("crash_recovery_wf")
    consumer_task_2 = asyncio.create_task(
        pgqrs.dequeue().worker(consumer_2).handle_workflow(crash_recovery_wf).poll(store)
    )

    result = await pgqrs.run().message(msg).store(store).result()

Transient Errors

When a step fails with a transient error (e.g., network timeout), the workflow will be retried automatically with backoff.

1. Define the Transient Error Step

If a closure inside pgqrs::workflow_step() returns a pgqrs::Error::Transient, it signals to the orchestrator to automatically retry the step later.

    #[pgqrs_workflow(name = "transient_error_wf")]
    async fn transient_error_wf(run: &Run, _input: serde_json::Value) -> Result<serde_json::Value> {
        // This step will artificially fail with a transient error
        pgqrs::workflow_step(run, "api_call", || async {
            if true {
                return Err(pgqrs::Error::Transient {
                    code: "TIMEOUT".to_string(),
                    message: "Connection timed out".to_string(),
                    retry_after: Some(std::time::Duration::from_secs(30)),
                });
            }
            Ok::<_, pgqrs::Error>(())
        })
        .await?;

        Ok(json!({"done": true}))
    }
    @workflow_def(name="transient_error_wf")
    async def transient_error_wf(run_ctx, input_data: dict) -> dict:
        @step_def
        async def api_call(step_ctx):
            if True:
                err = pgqrs.TransientStepError("Connection timed out")
                err.code = "TIMEOUT"
                err.retry_after = 30.0
                raise err

        await api_call(run_ctx)
        return {"done": True}

2. Execute the Workflow

When the worker encounters the transient error, it will halt execution but automatically schedule the run to retry in the background.

    // Start consumer with workflow().poll() - it will hit the transient error
    let store_for_consumer = store.clone();
    let consumer_handle = consumer.clone();
    let consumer_task = tokio::spawn(async move {
        pgqrs::workflow()
            .name(transient_error_wf)
            .consumer(&consumer_handle)
            .poll(&store_for_consumer)
            .await
    });
    consumer_task = asyncio.create_task(
        pgqrs.dequeue().worker(consumer).handle_workflow(transient_error_wf).poll(store)
    )

3. Inspect the Paused Status

The workflow run remains in the Running state, while the specific step is marked with Error and is assigned a retry_at timestamp in the database.

    // Verify that the run is still RUNNING (not ERROR) because the error was transient
    let workflow = store
        .workflows()
        .get_by_name("transient_error_wf")
        .await
        .unwrap();
    let runs = pgqrs::tables(&store).workflow_runs().list().await.unwrap();
    let run_rec = runs
        .into_iter()
        .find(|r| r.workflow_id == workflow.id)
        .unwrap();
    assert_eq!(run_rec.status, pgqrs::WorkflowStatus::Running);

    // Verify that the step is ERROR, recorded as transient, and has a retry_at scheduled
    let steps = store.workflow_steps().list().await.unwrap();
    let step_rec = steps.into_iter().find(|s| s.run_id == run_rec.id).unwrap();

    assert_eq!(step_rec.status, pgqrs::WorkflowStatus::Error);
    assert!(
        step_rec.retry_at.is_some(),
        "Step should be scheduled for retry"
    );
    workflow = await (await store.get_workflows()).get_by_name("transient_error_wf")
    runs = await (await store.get_workflow_runs()).list()
    run_rec = next((r for r in runs if r.workflow_id == workflow.id), None)

    assert run_rec.status == "RUNNING"

    steps = await (await store.get_workflow_steps()).list()
    step_rec = next((s for s in steps if s.run_id == run_rec.id), None)

    assert step_rec.status == "ERROR"
    assert step_rec.retry_at is not None

Pausing for External Events

Workflows can pause execution and wait for external events (like human approval or webhook callbacks).

1. Define a Paused Step

Returning pgqrs::Error::Paused tells the workflow to stop its execution until it is externally resumed or until the timeout resume_after is reached.

    #[pgqrs_workflow(name = "pause_wf")]
    async fn pause_wf(run: &Run, _input: serde_json::Value) -> Result<serde_json::Value> {
        // Step 1: Pause execution
        pgqrs::workflow_step(run, "pause_wf_step_1", || async {
            if true {
                return Err(pgqrs::Error::Paused {
                    message: "Waiting for approval".to_string(),
                    resume_after: std::time::Duration::from_secs(60),
                });
            }
            Ok::<_, pgqrs::Error>(())
        })
        .await?;

        Ok(json!({"done": true}))
    }
    @workflow_def(name="pause_wf")
    async def pause_wf(run_ctx, input_data: dict) -> dict:
        @step_def
        async def step1(step_ctx):
                # We can achieve pause by using the run context pause
                await run_ctx.pause("Waiting for approval", 60)
                raise pgqrs.PausedError("Waiting for approval")

        await step1(run_ctx)
        return {"done": True}

2. Process Output

Just like transient errors, when the worker runs into the pause signal, it releases the workflow run smoothly.

    // Start consumer with workflow().poll() - it will hit the paused error
    let store_for_consumer = store.clone();
    let consumer_handle = consumer.clone();
    let consumer_task = tokio::spawn(async move {
        pgqrs::workflow()
            .name(pause_wf)
            .consumer(&consumer_handle)
            .poll(&store_for_consumer)
            .await
    });
    consumer_task = asyncio.create_task(
        pgqrs.dequeue().worker(consumer).handle_workflow(pause_wf).poll(store)
    )

3. Inspect Status

The overall workflow run transitions into a Paused state. It won't execute further until the resume_after duration expires or an admin resumes it manually.

    // Verify that the run is PAUSED waiting for external event
    let workflow = store.workflows().get_by_name("pause_wf").await.unwrap();
    let runs = pgqrs::tables(&store).workflow_runs().list().await.unwrap();
    let run_rec = runs
        .into_iter()
        .find(|r| r.workflow_id == workflow.id)
        .unwrap();
    assert_eq!(run_rec.status, pgqrs::WorkflowStatus::Paused);

    // Verify that the step is ERROR, code PAUSED, and has a retry_at scheduled
    let steps = store.workflow_steps().list().await.unwrap();
    let step_rec = steps.into_iter().find(|s| s.run_id == run_rec.id).unwrap();

    assert_eq!(step_rec.status, pgqrs::WorkflowStatus::Error);
    assert!(
        step_rec.retry_at.is_some(),
        "Step should have resume_after scheduled"
    );
    workflow = await (await store.get_workflows()).get_by_name("pause_wf")
    runs = await (await store.get_workflow_runs()).list()
    run_rec = next((r for r in runs if r.workflow_id == workflow.id), None)

    assert run_rec.status == "PAUSED"

Step-by-Step: Basic Workflow

For a complete step-by-step guide, see Basic Workflow.

Advanced: Manual Step Control

For advanced scenarios where you need more control over step execution, you can use the manual step API:

// Acquire a step
let step = pgqrs::step()
    .run(&run)
    .name("fetch_data")
    .execute()
    .await?;

// Check if step needs execution
if step.status() == pgqrs::WorkflowStatus::Running {
    // Do the work
    let result = do_work().await?;

    // Mark step complete
    run.complete_step("fetch_data", result).await?;
}

Best Practices

  1. Idempotency: Ensure your step closures are idempotent, especially if they have side effects outside of pgqrs.
  2. Step Granularity: Balance between durability and performance. Too many small steps create database overhead; too few large steps mean more work is repeated on crash.
  3. Use workflow_step: Prefer pgqrs::workflow_step() for automatic step caching. Only use manual step API when you need fine-grained control.

Next Steps