Quickstart¶
This guide will walk you through creating your first queue, sending messages, and processing them.
Prerequisites¶
- pgqrs installed (Configuration)
- A running PostgreSQL database
- pgqrs schema installed (
pgqrs install)
Step 1: Create a Queue¶
use pgqrs;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let store = pgqrs::connect("postgresql://localhost/mydb").await?;
// Create a queue named "tasks"
let queue = pgqrs::admin(&store).create_queue("tasks").await?;
println!("Created queue: {} (id: {})", queue.queue_name, queue.id);
Ok(())
}
Step 2: Send a Message¶
use pgqrs;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let store = pgqrs::connect("postgresql://localhost/mydb").await?;
// Send a message using the high-level API
let payload = json!({
"task_type": "send_email",
"to": "[email protected]",
"subject": "Welcome!"
});
let ids = pgqrs::enqueue()
.message(&payload)
.to("tasks")
.execute(&store)
.await?;
println!("Sent message with ID: {:?}", ids);
Ok(())
}
import asyncio
import pgqrs
async def main():
store = await pgqrs.connect("postgresql://localhost/mydb")
# Send a message using the high-level API
payload = {
"task_type": "send_email",
"to": "[email protected]",
"subject": "Welcome!"
}
message_id = await pgqrs.produce(store, "tasks", payload)
print(f"Sent message with ID: {message_id}")
asyncio.run(main())
Step 3: Consume Messages¶
use pgqrs;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let store = pgqrs::connect("postgresql://localhost/mydb").await?;
// Process messages with automatic lifecycle management
loop {
let result = pgqrs::dequeue()
.from("tasks")
.handle(|msg| async move {
println!("Processing message {}: {:?}", msg.id, msg.payload);
// Your processing logic here
Ok(())
})
.execute(&store)
.await;
if result.is_err() {
println!("No messages, waiting...");
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
import asyncio
import pgqrs
async def main():
store = await pgqrs.connect("postgresql://localhost/mydb")
# Process messages with automatic lifecycle management
async def handler(msg):
print(f"Processing message {msg.id}: {msg.payload}")
# Your processing logic here
return True
while True:
try:
await pgqrs.consume(store, "tasks", handler)
except:
print("No messages, waiting...")
await asyncio.sleep(2)
asyncio.run(main())
Step 4: Monitor Your Queue¶
Complete Example¶
Here's a complete example showing a producer and consumer working together:
use pgqrs;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let store = pgqrs::connect("postgresql://localhost/mydb").await?;
// Setup
pgqrs::admin(&store).install().await?;
pgqrs::admin(&store).create_queue("demo").await?;
// Producer - send messages
for i in 0..5 {
pgqrs::enqueue()
.message(&json!({"task": i}))
.to("demo")
.execute(&store)
.await?;
println!("Sent task {}", i);
}
// Consumer - process messages
loop {
let result = pgqrs::dequeue()
.from("demo")
.handle(|msg| async move {
println!("Processing: {:?}", msg.payload);
Ok(())
})
.execute(&store)
.await;
if result.is_err() {
break;
}
}
// Show metrics
let metrics = pgqrs::admin(&store).all_queues_metrics().await?;
for m in metrics {
if m.name == "demo" {
println!("Archived: {}", m.archived_messages);
}
}
Ok(())
}
import asyncio
import pgqrs
async def main():
store = await pgqrs.connect("postgresql://localhost/mydb")
admin = pgqrs.admin(store)
# Setup
await admin.install()
await admin.create_queue("demo")
# Producer - send messages
for i in range(5):
await pgqrs.produce(store, "demo", {"task": i})
print(f"Sent task {i}")
# Consumer - process messages
processed = 0
while processed < 5:
try:
await pgqrs.consume(store, "demo", lambda msg: print(f"Processing: {msg.payload}") or True)
processed += 1
except:
break
# Show metrics
queues = await admin.get_queues()
metrics = await queues.list_metrics()
for m in metrics:
if m["name"] == "demo":
print(f"Archived: {m['archived_messages']}")
asyncio.run(main())
What's Next?¶
- Workflow API: Detailed API reference
- Producer API - Learn about batch operations and delayed messages
- Consumer API - Learn about batch processing and visibility timeouts
- Worker Management - Scale your workers