Preventing Duplicate Jobs with Redis Locks
Why SET NX PX is your friend when building distributed task queues. A simple pattern that prevents race conditions and duplicate processing.
The Problem
You’re running multiple workers processing jobs from a queue. Two workers grab the same job. Now you’ve processed the same task twice—sent duplicate emails, charged a customer twice, or worse.
Redis to the Rescue
Redis provides atomic operations that solve this elegantly. The key is SET with the NX (Not eXists) and PX (milliseconds) flags.
import redis
import time
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def acquire_lock(job_id, timeout_ms=30000):
"""
Try to acquire a lock for a job.
Returns True if lock acquired, False otherwise.
"""
lock_key = f"lock:job:{job_id}"
# SET NX PX - Set if Not eXists, with millisecond expiration
acquired = redis_client.set(
lock_key,
"1",
nx=True, # Only set if key doesn't exist
px=timeout_ms # Expire after 30 seconds
)
return acquired is not None
def release_lock(job_id):
"""
Release the lock when job completes.
"""
lock_key = f"lock:job:{job_id}"
redis_client.delete(lock_key)
Complete Worker Pattern
Here’s how to build a worker that safely processes jobs:
def process_job_safely(job_id, job_data):
# Try to acquire lock
if not acquire_lock(job_id):
print(f"Job {job_id} already being processed, skipping")
return False
try:
# Do the actual work
result = do_expensive_work(job_data)
# Mark job as complete
mark_job_complete(job_id, result)
return True
except Exception as e:
print(f"Error processing job {job_id}: {e}")
# Lock will auto-expire, allowing retry
raise
finally:
# Always release the lock
release_lock(job_id)
def do_expensive_work(job_data):
# Simulate work
time.sleep(5)
return {"status": "success", "data": job_data}
Why This Works
1. Atomic Operation
SET NX PX is a single atomic command. No race condition between check and set.
❌ Bad (Race Condition):
# DON'T DO THIS
if not redis_client.exists(lock_key): # Check
redis_client.set(lock_key, "1") # Set (race here!)
✅ Good (Atomic):
# DO THIS
redis_client.set(lock_key, "1", nx=True, px=30000)
2. Auto-Expiration
The PX flag means locks auto-expire if a worker crashes. No manual cleanup needed.
# Lock expires automatically after 30 seconds
# Even if worker dies mid-job
acquired = redis_client.set(lock_key, "1", nx=True, px=30000)
3. Idempotent Operations
Combine with idempotency keys for bulletproof duplicate prevention:
def process_job_with_idempotency(job_id, job_data, idempotency_key):
# Check if already processed
result_key = f"result:{idempotency_key}"
cached_result = redis_client.get(result_key)
if cached_result:
return json.loads(cached_result)
# Acquire lock
if not acquire_lock(job_id):
# Wait and check for result
time.sleep(1)
return redis_client.get(result_key)
try:
result = do_expensive_work(job_data)
# Cache result for 24 hours
redis_client.setex(
result_key,
86400, # 24 hours
json.dumps(result)
)
return result
finally:
release_lock(job_id)
Advanced: Extending Lock Time
For long-running jobs, extend the lock periodically:
import threading
def extend_lock_periodically(job_id, interval_sec=10):
"""
Background thread that extends lock every N seconds.
"""
lock_key = f"lock:job:{job_id}"
def extend():
while redis_client.exists(lock_key):
redis_client.expire(lock_key, 30)
time.sleep(interval_sec)
thread = threading.Thread(target=extend, daemon=True)
thread.start()
return thread
def process_long_job(job_id, job_data):
if not acquire_lock(job_id, timeout_ms=30000):
return False
# Start background lock extension
extender = extend_lock_periodically(job_id)
try:
# Long-running work here
result = process_for_5_minutes(job_data)
return result
finally:
release_lock(job_id)
Production Checklist
- ✅ Use
SET NX PXfor atomic lock acquisition - ✅ Set reasonable timeout (job duration + buffer)
- ✅ Always use try/finally to release locks
- ✅ Log lock acquisition failures
- ✅ Monitor lock timeout metrics
- ✅ For long jobs, extend locks periodically
- ✅ Combine with idempotency for extra safety
The Takeaway
Redis locks are simple, fast, and reliable. SET NX PX gives you atomic lock acquisition with auto-expiration in one command.
Pattern: Acquire lock → Process job → Release lock. Let expiration handle crashed workers.