Basic Workflow Guide¶
This guide walks you through setting up a complete producer-consumer workflow using the new Trigger/Worker architecture introduced in pgqrs v0.14.
What You'll Build¶
A simple task queue system where:
- A Trigger (Producer) submits tasks to a workflow.
- A Worker (Consumer) discovers and executes those tasks.
- You monitor the workflow status and results.
Prerequisites¶
- pgqrs v0.14+ installed
- PostgreSQL running
- Database connection string (DSN)
Setup¶
The snippets in this page focus on the workflow patterns.
They assume you already have:
store(connected + bootstrapped)
If you want fully runnable examples end-to-end, use the guide tests directly:
- Rust:
crates/pgqrs/tests/guide_tests.rs
Step 1: Define and Create the Workflow¶
In pgqrs v0.14, workflows must be defined before they can be triggered. This is an idempotent operation that sets up the necessary queues and metadata.
#[pgqrs_workflow(name = "process_task")]
async fn process_task_wf(
ctx: &Run,
_input: serde_json::Value,
) -> anyhow::Result<serde_json::Value> {
// Use workflow_step for durability - this caches results across crashes
let result: serde_json::Value = pgqrs::workflow_step(ctx, "process_item", || async {
// Simulate processing
Ok(serde_json::json!({
"processed": "task_data",
"status": "done"
}))
})
.await?;
Ok(result)
}
import asyncio
import pgqrs
async def setup():
store = await pgqrs.connect("postgresql://localhost/mydb")
# Install schema
await pgqrs.admin(store).install()
print("✓ Schema installed")
# Create workflow definition
await store.create_workflow("process_task")
print("✓ Workflow 'process_task' defined")
asyncio.run(setup())
Step 2: Trigger the Workflow¶
The trigger submits a "run" of the workflow. It doesn't execute the work itself; it enqueues the input for a worker to pick up.
import asyncio
import pgqrs
async def main():
store = await pgqrs.connect("postgresql://localhost/mydb")
params = {"id": 1, "payload": "Hello pgqrs"}
# Trigger the workflow
run_id = await pgqrs.workflow("process_task") \
.trigger(params) \
.execute(store)
print(f"✓ Triggered workflow run: {run_id}")
asyncio.run(main())
Step 3: Create a Worker to Process the Workflow¶
The worker registers for a workflow queue and polls for messages. The workflow builder handles the run lifecycle automatically.
// Create a consumer to process the workflow
let consumer = pgqrs::consumer("workflow-worker", 9501, "process_task")
.create(&store)
.await
.unwrap();
// Spawn the consumer polling task using official workflow API
let store_for_poll = store.clone();
let consumer_handle = consumer.clone();
let consumer_task = tokio::spawn(async move {
pgqrs::workflow()
.name(process_task_wf)
.consumer(&consumer_handle)
.poll(&store_for_poll)
.await
});
import asyncio
import pgqrs
async def main():
store = await pgqrs.connect("postgresql://localhost/mydb")
# Register worker for the workflow queue
consumer = await pgqrs.consumer("worker-1", 8080, "process_task").create(store)
print("Worker started. Waiting for runs...")
while True:
messages = await consumer.dequeue(batch_size=1)
if not messages:
continue
msg = messages[0]
run = await pgqrs.run().message(msg).store(store).execute()
step = await run.acquire_step("process_task", current_time=run.current_time)
if step.status == "EXECUTE":
await process_task(msg.payload)
await step.guard.success({"status": "success"})
await run.complete({"status": "success"})
await consumer.archive(msg.id)
asyncio.run(main())
Step 4: Check Results¶
You can retrieve the status and output of a workflow run at any time using its message ID.
Key Concepts in v0.14¶
- Workflow Definition: A named template (e.g., "process_task") that maps to a queue.
- Workflow Run: A specific execution instance with its own
run_id, input, and output. - Trigger: The client that submits a run (noun-verb API:
workflow().trigger()). - Worker: The process that executes the handler (fluent API:
consumer().handler().create()). - workflow_step: Wraps step execution for durability (crash recovery).
Next Steps¶
- Durable Workflows - Learn how to build multi-step, crash-resistant workflows using
pgqrs::workflow_step(). - Workflow API Reference - Detailed API documentation.