Skip to content

pgqrs

pgqrs is a durable workflow engine and job queue that works with PostgreSQL, SQLite, and Turso. Written in Rust with Python bindings.

Use PostgreSQL for production scale. Use SQLite or Turso for CLI tools, testing, and embedded apps.

Key Features

Multi-Backend Support

  • PostgreSQL: Production-ready with unlimited concurrent workers
  • SQLite: Zero-config embedded option for single-process applications
  • Turso: SQLite-compatible with enhanced features for local storage
  • Unified API: Switch backends by changing your connection string

Core

  • Library-only: No servers to operate. Use directly in your Rust or Python applications.
  • Connection Pooler Compatible: Works with pgBouncer and pgcat for connection scaling (PostgreSQL).

Job Queue

Workflows

  • Crash Recovery: Resume from the last completed step after failures.
  • Exactly-once Steps: Completed steps are never re-executed.
  • Persistent State: All workflow progress stored durably.

Choose Your Backend

Scenario Recommended Backend Why
Production with multiple workers PostgreSQL Full concurrency, no writer conflicts
CLI tools & scripts SQLite / Turso Zero-config, embedded, portable
Testing & prototyping SQLite / Turso Fast setup, no external dependencies
Embedded applications SQLite / Turso Single-file database, no server
High write throughput PostgreSQL SQLite/Turso allow only 1 writer at a time

SQLite/Turso Concurrency Limit

SQLite and Turso use database-level locks. With many concurrent writers, you may hit lock contention. See SkyPilot's findings on SQLite concurrency. pgqrs enables WAL mode and sets a 5s busy timeout to mitigate this, but PostgreSQL is recommended for multi-worker scenarios.

Job Queue

Simple, reliable message queue for background processing:

use pgqrs;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to PostgreSQL
    let store = pgqrs::connect("postgresql://localhost/mydb").await?;

    // Setup (run once)
    pgqrs::admin(&store).install().await?;
    pgqrs::admin(&store).create_queue("tasks").await?;

    // Producer: enqueue a job
    let ids = pgqrs::enqueue()
        .message(&json!({"task": "send_email", "to": "[email protected]"}))
        .to("tasks")
        .execute(&store)
        .await?;

    // Consumer: process jobs
    pgqrs::dequeue()
        .from("tasks")
        .handle(|msg| async move {
            println!("Processing: {:?}", msg.payload);
            Ok(())
        })
        .execute(&store)
        .await?;

    Ok(())
}
import pgqrs
import asyncio

async def main():
    # Connect to PostgreSQL
    store = await pgqrs.connect("postgresql://localhost/mydb")

    # Setup (run once)
    admin = pgqrs.admin(store)
    await admin.install()
    await admin.create_queue("tasks")

    # Producer: enqueue a job
    msg_id = await pgqrs.produce(store, "tasks", {
        "task": "send_email",
        "to": "[email protected]"
    })

    # Consumer: process jobs
    async def handler(msg):
        print(f"Processing: {msg.payload}")
        return True

    await pgqrs.consume(store, "tasks", handler)

asyncio.run(main())

Learn more about Producer & Consumer

Durable Workflows

Orchestrate multi-step processes that survive crashes and resume from where they left off:

use pgqrs;
use pgqrs_macros::{pgqrs_workflow, pgqrs_step};

#[pgqrs_step]
async fn fetch_data(ctx: &pgqrs::Workflow, url: &str) -> Result<String, anyhow::Error> {
    Ok(reqwest::get(url).await?.text().await?)
}

#[pgqrs_step]
async fn process_data(ctx: &pgqrs::Workflow, data: String) -> Result<i32, anyhow::Error> {
    Ok(data.lines().count() as i32)
}

#[pgqrs_workflow]
async fn data_pipeline(ctx: &pgqrs::Workflow, url: &str) -> Result<String, anyhow::Error> {
    let data = fetch_data(ctx, url).await?;
    let count = process_data(ctx, data).await?;
    Ok(format!("Processed {} lines", count))
}

// Usage
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let store = pgqrs::connect("postgresql://localhost/mydb").await?;
    pgqrs::admin(&store).install().await?;

    let url = "https://example.com/data.txt";
    let workflow = pgqrs::admin(&store)
        .create_workflow("data_pipeline", &url)
        .await?;

    let result = data_pipeline(&workflow, url).await?;
    println!("Result: {}", result);
    Ok(())
}
import pgqrs
from pgqrs.decorators import workflow, step

@step
async def fetch_data(ctx, url: str) -> dict:
    # Fetch data from API
    return {"lines": 100, "data": "..."}

@step
async def process_data(ctx, data: dict) -> dict:
    return {"processed": True, "count": data["lines"]}

@workflow
async def data_pipeline(ctx, url: str):
    data = await fetch_data(ctx, url)
    result = await process_data(ctx, data)
    return result

# Usage
async def main():
    store = await pgqrs.connect("postgresql://localhost/mydb")
    admin = pgqrs.admin(store)
    await admin.install()

    url = "https://example.com/data"
    ctx = await admin.create_workflow("data_pipeline", url)
    result = await data_pipeline(ctx, url)
    print(f"Result: {result}")

import asyncio
asyncio.run(main())

Key benefits:

  • Crash recovery: Automatically resumes from the last completed step
  • Exactly-once semantics: Completed steps are never re-executed
  • Persistent state: All progress stored in PostgreSQL

Learn more about Durable Workflows

Next Steps