Skip to main content
Back to Nuggets

Preventing Duplicate Jobs with Redis Locks

Why SET NX PX is your friend when building distributed task queues. A simple pattern that prevents race conditions and duplicate processing.

Oct 25, 2025 3 min

The Problem

You’re running multiple workers processing jobs from a queue. Two workers grab the same job. Now you’ve processed the same task twice—sent duplicate emails, charged a customer twice, or worse.

Redis to the Rescue

Redis provides atomic operations that solve this elegantly. The key is SET with the NX (Not eXists) and PX (milliseconds) flags.

import redis
import time

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def acquire_lock(job_id, timeout_ms=30000):
    """
    Try to acquire a lock for a job.
    Returns True if lock acquired, False otherwise.
    """
    lock_key = f"lock:job:{job_id}"
    
    # SET NX PX - Set if Not eXists, with millisecond expiration
    acquired = redis_client.set(
        lock_key,
        "1",
        nx=True,  # Only set if key doesn't exist
        px=timeout_ms  # Expire after 30 seconds
    )
    
    return acquired is not None

def release_lock(job_id):
    """
    Release the lock when job completes.
    """
    lock_key = f"lock:job:{job_id}"
    redis_client.delete(lock_key)

Complete Worker Pattern

Here’s how to build a worker that safely processes jobs:

def process_job_safely(job_id, job_data):
    # Try to acquire lock
    if not acquire_lock(job_id):
        print(f"Job {job_id} already being processed, skipping")
        return False
    
    try:
        # Do the actual work
        result = do_expensive_work(job_data)
        
        # Mark job as complete
        mark_job_complete(job_id, result)
        
        return True
    except Exception as e:
        print(f"Error processing job {job_id}: {e}")
        # Lock will auto-expire, allowing retry
        raise
    finally:
        # Always release the lock
        release_lock(job_id)

def do_expensive_work(job_data):
    # Simulate work
    time.sleep(5)
    return {"status": "success", "data": job_data}

Why This Works

1. Atomic Operation

SET NX PX is a single atomic command. No race condition between check and set.

Bad (Race Condition):

# DON'T DO THIS
if not redis_client.exists(lock_key):  # Check
    redis_client.set(lock_key, "1")     # Set (race here!)

Good (Atomic):

# DO THIS
redis_client.set(lock_key, "1", nx=True, px=30000)

2. Auto-Expiration

The PX flag means locks auto-expire if a worker crashes. No manual cleanup needed.

# Lock expires automatically after 30 seconds
# Even if worker dies mid-job
acquired = redis_client.set(lock_key, "1", nx=True, px=30000)

3. Idempotent Operations

Combine with idempotency keys for bulletproof duplicate prevention:

def process_job_with_idempotency(job_id, job_data, idempotency_key):
    # Check if already processed
    result_key = f"result:{idempotency_key}"
    cached_result = redis_client.get(result_key)
    
    if cached_result:
        return json.loads(cached_result)
    
    # Acquire lock
    if not acquire_lock(job_id):
        # Wait and check for result
        time.sleep(1)
        return redis_client.get(result_key)
    
    try:
        result = do_expensive_work(job_data)
        
        # Cache result for 24 hours
        redis_client.setex(
            result_key,
            86400,  # 24 hours
            json.dumps(result)
        )
        
        return result
    finally:
        release_lock(job_id)

Advanced: Extending Lock Time

For long-running jobs, extend the lock periodically:

import threading

def extend_lock_periodically(job_id, interval_sec=10):
    """
    Background thread that extends lock every N seconds.
    """
    lock_key = f"lock:job:{job_id}"
    
    def extend():
        while redis_client.exists(lock_key):
            redis_client.expire(lock_key, 30)
            time.sleep(interval_sec)
    
    thread = threading.Thread(target=extend, daemon=True)
    thread.start()
    return thread

def process_long_job(job_id, job_data):
    if not acquire_lock(job_id, timeout_ms=30000):
        return False
    
    # Start background lock extension
    extender = extend_lock_periodically(job_id)
    
    try:
        # Long-running work here
        result = process_for_5_minutes(job_data)
        return result
    finally:
        release_lock(job_id)

Production Checklist

  • ✅ Use SET NX PX for atomic lock acquisition
  • ✅ Set reasonable timeout (job duration + buffer)
  • ✅ Always use try/finally to release locks
  • ✅ Log lock acquisition failures
  • ✅ Monitor lock timeout metrics
  • ✅ For long jobs, extend locks periodically
  • ✅ Combine with idempotency for extra safety

The Takeaway

Redis locks are simple, fast, and reliable. SET NX PX gives you atomic lock acquisition with auto-expiration in one command.

Pattern: Acquire lock → Process job → Release lock. Let expiration handle crashed workers.

Comments