Producer & Consumer¶
This page explains the Producer and Consumer pattern in pgqrs and how messages flow through the system.
Overview¶
pgqrs uses a classic producer-consumer pattern:
- Producers create messages and add them to queues
- Consumers fetch messages from queues and process them
Both are implemented as lightweight library components that connect directly to PostgreSQL.
Producer¶
A Producer is responsible for creating and enqueueing messages to a specific queue.
Creating a Producer¶
Producer Operations¶
Single Message¶
use serde_json::json;
let payload = json!({
"action": "send_email",
"to": "[email protected]"
});
let message = producer.enqueue(&payload).await?;
println!("Message ID: {}", message.id);
import asyncio
import pgqrs
async def main():
store = await pgqrs.connect("postgresql://localhost/mydb")
payload = {
"action": "send_email",
"to": "[email protected]"
}
msg_id = await pgqrs.produce(store, "tasks", payload)
print(f"Message ID: {msg_id}")
asyncio.run(main())
Batch Enqueue¶
Send multiple messages in a single transaction:
Delayed Messages¶
Schedule messages for future processing:
import pgqrs
admin = pgqrs.admin("postgresql://localhost/mydb")
producer = pgqrs.producer(admin, "tasks", "scheduler", 8080)
payload = {"reminder": "Follow up with customer"}
# Available after 5 minutes (300 seconds)
message_id = await producer.enqueue_delayed(payload, delay_seconds=300)
print(f"Scheduled message {message_id} for 5 minutes from now")
Extend Visibility¶
If processing takes longer than expected, extend the lock:
Consumer¶
A Consumer fetches and processes messages from a specific queue.
Creating a Consumer¶
Consumer Operations¶
Dequeue Messages¶
import asyncio
import pgqrs
async def main():
store = await pgqrs.connect("postgresql://localhost/mydb")
# Dequeue using iterator pattern
async for msg in pgqrs.consume_iter(store, "tasks"):
print(f"ID: {msg.id}")
print(f"Payload: {msg.payload}")
print(f"Enqueued at: {msg.enqueued_at}")
# Archive after processing
await msg.archive()
asyncio.run(main())
Dequeue with Options¶
Archive Messages¶
After successful processing, archive the message:
Archiving moves the message to pgqrs_archive for audit trails.
Delete Messages¶
If you don't need an audit trail, delete directly:
Message Structure¶
Messages have the following structure:
| Field | Type | Description |
|---|---|---|
id |
i64 |
Unique message identifier |
queue_id |
i64 |
Reference to the queue |
payload |
JSON |
Your message data |
enqueued_at |
DateTime |
When the message was created |
vt |
DateTime |
Visibility timeout (when lock expires) |
read_ct |
i32 |
Number of times dequeued |
Visibility Timeout¶
When a consumer dequeues a message, it's locked with a visibility timeout:
- Message is locked (not visible to other consumers)
- Consumer has until the timeout to process and archive/delete
- If timeout expires without completion, message becomes visible again
sequenceDiagram
participant C1 as Consumer 1
participant DB as PostgreSQL
participant C2 as Consumer 2
C1->>DB: dequeue()
Note over DB: Message locked<br/>vt = now + 5s
C2->>DB: dequeue()
Note over DB: Message not returned<br/>(locked by C1)
alt C1 processes in time
C1->>DB: archive(id)
Note over DB: Message archived
else Timeout expires
Note over DB: vt < now<br/>Message visible again
C2->>DB: dequeue()
Note over DB: Message returned to C2
end
Configuring Lock Time¶
Best Practices¶
1. Use Appropriate Lock Times¶
Set lock times based on your processing duration:
2. Extend Locks for Long Processing¶
If processing might exceed the lock time:
let messages = consumer.dequeue().await?;
for message in messages {
// Start processing
let result = start_long_processing(&message);
// Extend lock if needed
if result.needs_more_time() {
// Processing is taking longer than expected
consumer.extend_visibility(message.id, 60).await?;
}
// Complete and archive
result.finish().await?;
consumer.archive(message.id).await?;
}
3. Always Archive or Delete¶
Always explicitly handle completed messages:
4. Use Batch Operations¶
For high throughput, use batch operations:
// Fetch batch
let messages = consumer.dequeue_many_with_delay(100, 30).await?;
// Process all
let results = process_batch(&messages).await;
// Archive successful ones
let successful: Vec<i64> = results
.iter()
.filter(|r| r.is_ok())
.map(|r| r.id)
.collect();
consumer.archive_many(successful).await?;
What's Next?¶
- Workers - Worker lifecycle and management
- Message Lifecycle - Detailed message states
- Batch Processing Guide - Processing at scale