Batch Processing Guide¶
This guide covers efficient batch processing patterns for high-throughput scenarios.
When to Use Batch Processing¶
- Processing thousands of messages per minute
- Reducing database round-trips
- Improving throughput with bulk operations
- ETL and data pipeline workloads
Batch Enqueueing¶
Send multiple messages efficiently.
use pgqrs;
use serde_json::json;
async fn batch_enqueue_example() -> Result<(), Box<dyn std::error::Error>> {
let store = pgqrs::connect("postgresql://localhost/mydb").await?;
// Create managed producer
let producer = pgqrs::producer()
.queue("tasks")
.hostname("batch-producer")
.create(&store)
.await?;
// Prepare batch of messages
let payloads: Vec<_> = (0..1000)
.map(|i| json!({"task_id": i, "data": "process me"}))
.collect();
// Send batch
for payload in &payloads {
producer.enqueue(payload).await?;
}
println!("Enqueued {} messages", payloads.len());
Ok(())
}
import asyncio
import pgqrs
async def batch_enqueue_example():
store = await pgqrs.connect("postgresql://localhost/mydb")
# Create managed producer
producer = await store.producer("tasks")
# Prepare batch
payloads = [{"task_id": i, "data": "process me"} for i in range(1000)]
# Send using producer
msg_ids = await producer.enqueue_batch(payloads)
print(f"Enqueued {len(msg_ids)} messages")
asyncio.run(batch_enqueue_example())
Batch Dequeueing¶
Fetch and process multiple messages in one operation.
use pgqrs;
async fn batch_dequeue_example() -> Result<(), Box<dyn std::error::Error>> {
let store = pgqrs::connect("postgresql://localhost/mydb").await?;
// Create managed consumer
let consumer = pgqrs::consumer()
.queue("tasks")
.hostname("batch-consumer")
.create(&store)
.await?;
// Fetch and process batch with handler
consumer.dequeue()
.batch(100)
.handle_batch(|messages| async move {
println!("Processing {} messages", messages.len());
for message in &messages {
// Process each message
println!("Processing: {}", message.id);
}
Ok(())
})
.await?;
Ok(())
}
import asyncio
import pgqrs
async def batch_dequeue_example():
store = await pgqrs.connect("postgresql://localhost/mydb")
# Create managed consumer
consumer = await store.consumer("tasks")
# Process batch with handler
async def process_batch(messages):
print(f"Processing {len(messages)} messages")
for msg in messages:
print(f"Processing: {msg.id}")
return True
await consumer.consume_batch(batch_size=100, handler=process_batch)
asyncio.run(batch_dequeue_example())
Processing Patterns¶
Sequential Batch Processing¶
Process batches one after another with controlled throughput.
use pgqrs;
use std::time::Duration;
async fn sequential_batch_processing(consumer: &dyn pgqrs::store::Consumer) -> Result<(), Box<dyn std::error::Error>> {
loop {
let result = consumer.dequeue()
.batch(100)
.handle_batch(|messages| async move {
if messages.is_empty() {
return Err("No messages".into());
}
// Process sequentially
for message in &messages {
match process_message(message).await {
Ok(_) => println!("Processed {}", message.id),
Err(e) => {
tracing::warn!("Failed to process {}: {}", message.id, e);
}
}
}
Ok(())
})
.await;
if result.is_err() {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
async fn process_message(msg: &pgqrs::QueueMessage) -> Result<(), Box<dyn std::error::Error>> {
// Your processing logic
Ok(())
}
Parallel Batch Processing¶
Process all messages in a batch concurrently.
use pgqrs;
use futures::future::join_all;
async fn parallel_batch_processing(consumer: &dyn pgqrs::store::Consumer) -> Result<(), Box<dyn std::error::Error>> {
consumer.dequeue()
.batch(100)
.handle_batch(|messages| async move {
// Process all in parallel
let futures: Vec<_> = messages.iter()
.map(|m| async move {
process_message(m).await
})
.collect();
let results = join_all(futures).await;
// Log results
let successful = results.iter().filter(|r| r.is_ok()).count();
let failed = results.iter().filter(|r| r.is_err()).count();
println!("Processed: {} successful, {} failed", successful, failed);
Ok(())
})
.await?;
Ok(())
}
Chunked Processing¶
Process large batches in smaller chunks.
use pgqrs;
async fn chunked_processing(consumer: &dyn pgqrs::store::Consumer, chunk_size: usize) -> Result<(), Box<dyn std::error::Error>> {
consumer.dequeue()
.batch(1000)
.handle_batch(|messages| async move {
// Process in chunks
for chunk in messages.chunks(chunk_size) {
for message in chunk {
process_message(message).await?;
}
tracing::info!("Processed chunk of {} messages", chunk.len());
}
Ok(())
})
.await?;
Ok(())
}
Throughput Optimization¶
Tuning Batch Size¶
| Scenario | Recommended Batch Size |
|---|---|
| Quick tasks (< 10ms) | 100-500 |
| Medium tasks (10-100ms) | 50-100 |
| Slow tasks (> 100ms) | 10-50 |
| I/O bound tasks | 100-200 (with parallel) |
Connection Pool Sizing¶
For batch processing, ensure adequate connection pool size:
use pgqrs::Config;
let config = Config {
dsn: "postgresql://localhost/mydb".into(),
max_connections: 20, // Increase for parallel batch processing
..Default::default()
};
let store = pgqrs::connect_with_config(&config).await?;
Batch Processing with Multiple Workers¶
Scale horizontally with multiple consumers using managed workers.
use pgqrs;
use tokio::task::JoinSet;
async fn run_batch_workers(num_workers: usize) -> Result<(), Box<dyn std::error::Error>> {
let store = pgqrs::connect("postgresql://localhost/mydb").await?;
let mut workers = JoinSet::new();
for i in 0..num_workers {
let store_clone = store.clone();
workers.spawn(async move {
// Create managed consumer for this worker
// Note: In a real app, hostname/port should be unique/discoverable
let consumer = pgqrs::consumer()
.queue("tasks")
.hostname(&format!("worker-{}", i))
.port(3000 + i as i32)
.create(&store_clone)
.await?;
batch_consumer_loop(&store_clone, &*consumer).await
});
}
// Wait for all workers
while let Some(result) = workers.join_next().await {
if let Err(e) = result {
tracing::error!("Worker error: {:?}", e);
}
}
Ok(())
}
async fn batch_consumer_loop(
store: &pgqrs::store::AnyStore,
consumer: &dyn pgqrs::store::Consumer
) -> Result<(), Box<dyn std::error::Error>> {
loop {
let result = pgqrs::dequeue()
.from("tasks")
.batch(100)
.worker(consumer)
.handle_batch(|messages| async move {
if messages.is_empty() {
return Err("No messages".into());
}
for message in &messages {
process_message(message).await?;
}
Ok(())
})
.execute(store)
.await;
if result.is_err() {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
Monitoring Batch Processing¶
Track throughput and processing times:
use std::time::Instant;
use pgqrs;
async fn monitored_batch_processing(store: &pgqrs::store::AnyStore) -> Result<(), Box<dyn std::error::Error>> {
let mut total_processed: u64 = 0;
let start = Instant::now();
loop {
let batch_start = Instant::now();
let result = pgqrs::dequeue()
.from("tasks")
.batch(100)
.handle_batch(|messages| async move {
let batch_size = messages.len();
for message in &messages {
process_message(message).await?;
}
Ok(batch_size)
})
.execute(store)
.await;
match result {
Ok(batch_size) => {
total_processed += batch_size as u64;
let batch_time = batch_start.elapsed();
let total_time = start.elapsed();
let throughput = total_processed as f64 / total_time.as_secs_f64();
tracing::info!(
batch_size = batch_size,
batch_time_ms = batch_time.as_millis(),
total_processed = total_processed,
throughput_per_sec = throughput,
"Batch completed"
);
}
Err(_) => {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
}
Best Practices¶
- Match batch size to processing time - Larger batches for quick tasks
- Use appropriate visibility timeouts - Timeout should cover entire batch processing
- Handle partial failures - Archive successful, let failed retry
- Monitor throughput - Track messages per second
- Scale with workers - Add consumers for more throughput
- Use managed workers - Let pgqrs handle worker lifecycle
What's Next?¶
- Delayed Messages - Schedule future tasks
- Worker Management - Scale your workers