← Back to Tutorials
Python

Build a Distributed Lock System

Difficulty: Advanced Est. Time: ~3 hours

Introduction

Distributed locks coordinate access to shared resources across multiple processes or servers. They're essential for implementing idempotency, preventing race conditions, and managing exclusive access to distributed resources.

What You'll Build
  • Mutex lock implementation
  • Redis-based distributed locks
  • Lock acquisition with timeouts
  • Automatic lock expiration

Core Concepts

Mutual Exclusion

Only one process can hold the lock at a time.

Liveness

The system must not deadlock - locks must eventually be released or expire.

Prerequisites

  • Python 3.8+
  • Redis

Lock Implementation

Create distlock/lock.py:

import threading
import time
import uuid
from contextlib import contextmanager
from typing import Optional


class Lock:
    def __init__(self, name: str, timeout: float = 30, 
                 retry_interval: float = 0.1, max_retries: int = 100):
        self.name = name
        self.timeout = timeout
        self.retry_interval = retry_interval
        self.max_retries = max_retries
        self._lock = threading.Lock()
        self._acquired = False
        self._owner = None
    
    def acquire(self, blocking: bool = True) -> bool:
        retries = 0
        
        while True:
            if self._lock.acquire(blocking=False):
                self._acquired = True
                self._owner = uuid.uuid4().hex
                return True
            
            if not blocking or retries >= self.max_retries:
                return False
            
            time.sleep(self.retry_interval)
            retries += 1
    
    def release(self):
        if self._acquired and self._lock.locked():
            self._lock.release()
            self._acquired = False
            self._owner = None
    
    def extend(self, additional_time: float) -> bool:
        if not self._acquired:
            return False
        return True
    
    def __enter__(self):
        self.acquire()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
        return False


class LockTimeout(Exception):
    pass


class LockManager:
    def __init__(self):
        self._locks = {}
        self._lock = threading.Lock()
    
    def get_lock(self, name: str, **kwargs) -> Lock:
        with self._lock:
            if name not in self._locks:
                self._locks[name] = Lock(name, **kwargs)
            return self._locks[name]
    
    @contextmanager
    def acquire(self, name: str, blocking: bool = True, timeout: float = 30):
        lock = self.get_lock(name)
        
        start_time = time.time()
        
        while True:
            if lock.acquire(blocking=False):
                try:
                    yield lock
                finally:
                    lock.release()
                return
            
            if not blocking:
                raise LockTimeout(f"Could not acquire lock: {name}")
            
            if time.time() - start_time > timeout:
                raise LockTimeout(f"Lock acquisition timeout: {name}")
            
            time.sleep(0.01)

Redis Lock

import redis
import time
import uuid


class RedisLock:
    def __init__(self, client: redis.Redis, name: str, 
                 timeout: int = 30, retry_times: int = 3, 
                 retry_delay: float = 0.2):
        self.client = client
        self.name = f"lock:{name}"
        self.timeout = timeout
        self.retry_times = retry_times
        self.retry_delay = retry_delay
        self.token = uuid.uuid4().hex
    
    def acquire(self) -> bool:
        for _ in range(self.retry_times):
            acquired = self.client.set(
                self.name,
                self.token,
                nx=True,
                ex=self.timeout
            )
            
            if acquired:
                return True
            
            time.sleep(self.retry_delay)
        
        return False
    
    def release(self):
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        self.client.eval(lua_script, 1, self.name, self.token)
    
    def extend(self, additional_time: int):
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("expire", KEYS[1], ARGV[2])
        else
            return 0
        end
        """
        
        self.client.eval(lua_script, 1, self.name, self.token, additional_time)
    
    def __enter__(self):
        if not self.acquire():
            raise LockTimeout(f"Could not acquire lock: {self.name}")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
        return False


class RedisLockManager:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.client = redis.from_url(redis_url)
    
    def get_lock(self, name: str, timeout: int = 30):
        return RedisLock(self.client, name, timeout)

Usage

from distlock import LockManager, RedisLockManager

# Local locks
manager = LockManager()

with manager.acquire('process-order-123') as lock:
    # Process the order
    process_order(123)

# Distributed locks with Redis
redis_manager = RedisLockManager()

with redis_manager.get_lock('process-payment-456') as lock:
    # Process payment
    process_payment(456)

# Manual lock usage
lock = redis_manager.get_lock('inventory-update', timeout=60)
lock.acquire()

try:
    update_inventory()
finally:
    lock.release()

Testing

import threading
from distlock import LockManager

def test_concurrent_access():
    results = []
    manager = LockManager()
    
    def worker(worker_id):
        with manager.acquire('shared-resource', timeout=5) as lock:
            results.append(f"Worker {worker_id} acquired lock")
            time.sleep(0.1)
            results.append(f"Worker {worker_id} released lock")
    
    threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
    
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    print(results)

test_concurrent_access()

Summary

You built a distributed lock system with local and Redis-based implementations, timeout handling, and automatic expiration.

Possible Extensions

  • Add lock metrics
  • Implement dead lock detection
  • Add read/write locks