Skip to content

Admin API

The Admin API provides queue management, schema administration, and monitoring capabilities.

Usage

use pgqrs;

let store = pgqrs::connect("postgresql://localhost/mydb").await?;

// Use admin builder for operations
pgqrs::admin(&store).install().await?;
pgqrs::admin(&store).create_queue("tasks").await?;
import pgqrs

store = await pgqrs.connect("postgresql://localhost/mydb")
admin = pgqrs.admin(store)

# Use admin methods
await admin.install()
await admin.create_queue("tasks")

Schema Management

install

Install the pgqrs schema (tables, indexes, constraints). It is idempotent and safe to call multiple times.

pgqrs::admin(&store).install().await?;
println!("Schema installed successfully");
await admin.install()
print("Schema installed successfully")

verify

Verify that the pgqrs schema is correctly installed. Returns/Raises error if schema is missing.

admin.verify().await?;
println!("Schema verification passed");
await admin.verify()
print("Schema verification passed")

uninstall

Remove the pgqrs schema (tables and data).

Danger

This permanently deletes all queues, messages, workers, and archives!

admin.uninstall().await?;
println!("Schema removed");

Not Supported

uninstall() is not exposed in Python to prevent accidental data loss. Use the CLI or SQL for schema removal.

Queue Management

create_queue

Create a new queue.

let queue = admin.create_queue("email-notifications").await?;
println!("Created queue: {} (ID: {})", queue.queue_name, queue.id);
queue = await admin.create_queue("email-notifications")
print(f"Created queue: {queue.queue_name} (ID: {queue.id})")

Returns QueueInfo:

Field Type Description
id i64/int Unique queue ID
queue_name String/str Queue name
created_at DateTime/str Creation timestamp

get_queue

Get a queue by name.

let queue = admin.get_queue("email-notifications").await?;

Not Supported

get_queue() is typically accessed via table APIs (admin.queues.get_by_name()) in Python or implied during worker creation.

delete_queue

Delete a queue and all its messages.

admin.delete_queue("old-queue").await?;

Not Supported

delete_queue() is not directly exposed on the Admin object. Use CLI for destructive queue operations.

Metrics & Monitoring

queue_metrics

Get metrics for a specific queue.

let metrics = admin.queue_metrics("email-notifications").await?;
println!("Pending: {}", metrics.pending_messages);

Not Supported

Direct metrics objects are not exposed in Python. Use table APIs (admin.get_messages().count()) etc.

Table APIs

Admin provides direct access to table operations.

Queues Table

// List all queues
let queues = admin.queues.list().await?;

// Get queue by ID
let queue = admin.queues.get(queue_id).await?;

// Count queues
let count = admin.queues.count().await?;
# List not fully exposed, but count is available
queues = await admin.get_queues()
count = await queues.count()

Workers Table

// List all workers
let workers = admin.workers.list().await?;
workers = await admin.get_workers()
all_workers = await workers.list()
count = await workers.count()

Messages Table

use pgqrs::tables::Messages;
let messages = Messages::new(admin.pool.clone());
let pending = messages.count_pending(queue_id).await?;
messages = await admin.get_messages()
count = await messages.count()

Archive Table

use pgqrs::Archive;
let archive = Archive::new(admin.pool.clone());
// ... extensive archive filtering options ...
archive = await admin.get_archive()
count = await archive.count()

Example: Health Check

async fn health_check(admin: &Admin) -> bool {
    admin.verify().await.is_ok()
}
async def health_check(admin):
    try:
        await admin.verify()
        return True
    except:
        return False