Workflows API¶
pgqrs provides durable workflow capabilities, allowing you to define multi-step processes that persist state and recover automatically from failures.
Defining Workflows¶
Both Rust and Python APIs use declarative attributes/decorators to define workflows and steps.
use pgqrs::workflow::Workflow;
use pgqrs_macros::{pgqrs_workflow, pgqrs_step};
// Define a step
#[pgqrs_step]
async fn fetch_data(ctx: &Workflow, url: &str) -> Result<String, anyhow::Error> {
// ... logic ...
Ok("data".to_string())
}
// Define the workflow
#[pgqrs_workflow]
async fn data_pipeline(ctx: &Workflow, url: &str) -> Result<String, anyhow::Error> {
let data = fetch_data(ctx, url).await?;
Ok(data)
}
from pgqrs import PyWorkflow
from pgqrs.decorators import workflow, step
# Define a step
@step
async def fetch_data(ctx: PyWorkflow, url: str) -> str:
# ... logic ...
return "data"
# Define the workflow
@workflow
async def data_pipeline(ctx: PyWorkflow, url: str) -> str:
data = await fetch_data(ctx, url)
return data
Core Concepts¶
Steps¶
Steps are the fundamental unit of durability.
- Identity: Steps are identified by their function name (Rust) or function name (Python).
- Idempotency: If a step completes successfully, its result is saved. Subsequent calls (e.g., after a crash) return the saved result immediately without re-executing limits.
- Serialization: Inputs and outputs must be JSON-serializable (Python) or implement
Serialize/Deserialize(Rust).
Workflow Context¶
The context object (&Workflow in Rust, PyWorkflow in Python) provides access to the workflow's state and is required as the first argument to all step and workflow functions.
Execution¶
Starting a Workflow¶
Error Handling¶
Exceptions/Errors within steps are automatically captured and persisted.
Low-Level API¶
For advanced users requiring dynamic step definitions or manual control.
Use StepGuard directly.