Build a Distributed Lock System
Introduction
Distributed locks coordinate access to shared resources across multiple processes or servers. They're essential for implementing idempotency, preventing race conditions, and managing exclusive access to distributed resources.
What You'll Build
- Mutex lock implementation
- Redis-based distributed locks
- Lock acquisition with timeouts
- Automatic lock expiration
Core Concepts
Mutual Exclusion
Only one process can hold the lock at a time.
Liveness
The system must not deadlock - locks must eventually be released or expire.
Prerequisites
- Python 3.8+
- Redis
Lock Implementation
Create distlock/lock.py:
import threading
import time
import uuid
from contextlib import contextmanager
from typing import Optional
class Lock:
def __init__(self, name: str, timeout: float = 30,
retry_interval: float = 0.1, max_retries: int = 100):
self.name = name
self.timeout = timeout
self.retry_interval = retry_interval
self.max_retries = max_retries
self._lock = threading.Lock()
self._acquired = False
self._owner = None
def acquire(self, blocking: bool = True) -> bool:
retries = 0
while True:
if self._lock.acquire(blocking=False):
self._acquired = True
self._owner = uuid.uuid4().hex
return True
if not blocking or retries >= self.max_retries:
return False
time.sleep(self.retry_interval)
retries += 1
def release(self):
if self._acquired and self._lock.locked():
self._lock.release()
self._acquired = False
self._owner = None
def extend(self, additional_time: float) -> bool:
if not self._acquired:
return False
return True
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
return False
class LockTimeout(Exception):
pass
class LockManager:
def __init__(self):
self._locks = {}
self._lock = threading.Lock()
def get_lock(self, name: str, **kwargs) -> Lock:
with self._lock:
if name not in self._locks:
self._locks[name] = Lock(name, **kwargs)
return self._locks[name]
@contextmanager
def acquire(self, name: str, blocking: bool = True, timeout: float = 30):
lock = self.get_lock(name)
start_time = time.time()
while True:
if lock.acquire(blocking=False):
try:
yield lock
finally:
lock.release()
return
if not blocking:
raise LockTimeout(f"Could not acquire lock: {name}")
if time.time() - start_time > timeout:
raise LockTimeout(f"Lock acquisition timeout: {name}")
time.sleep(0.01)
Redis Lock
import redis
import time
import uuid
class RedisLock:
def __init__(self, client: redis.Redis, name: str,
timeout: int = 30, retry_times: int = 3,
retry_delay: float = 0.2):
self.client = client
self.name = f"lock:{name}"
self.timeout = timeout
self.retry_times = retry_times
self.retry_delay = retry_delay
self.token = uuid.uuid4().hex
def acquire(self) -> bool:
for _ in range(self.retry_times):
acquired = self.client.set(
self.name,
self.token,
nx=True,
ex=self.timeout
)
if acquired:
return True
time.sleep(self.retry_delay)
return False
def release(self):
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.client.eval(lua_script, 1, self.name, self.token)
def extend(self, additional_time: int):
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
"""
self.client.eval(lua_script, 1, self.name, self.token, additional_time)
def __enter__(self):
if not self.acquire():
raise LockTimeout(f"Could not acquire lock: {self.name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
return False
class RedisLockManager:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.client = redis.from_url(redis_url)
def get_lock(self, name: str, timeout: int = 30):
return RedisLock(self.client, name, timeout)
Usage
from distlock import LockManager, RedisLockManager
# Local locks
manager = LockManager()
with manager.acquire('process-order-123') as lock:
# Process the order
process_order(123)
# Distributed locks with Redis
redis_manager = RedisLockManager()
with redis_manager.get_lock('process-payment-456') as lock:
# Process payment
process_payment(456)
# Manual lock usage
lock = redis_manager.get_lock('inventory-update', timeout=60)
lock.acquire()
try:
update_inventory()
finally:
lock.release()
Testing
import threading
from distlock import LockManager
def test_concurrent_access():
results = []
manager = LockManager()
def worker(worker_id):
with manager.acquire('shared-resource', timeout=5) as lock:
results.append(f"Worker {worker_id} acquired lock")
time.sleep(0.1)
results.append(f"Worker {worker_id} released lock")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
print(results)
test_concurrent_access()
Summary
You built a distributed lock system with local and Redis-based implementations, timeout handling, and automatic expiration.
Possible Extensions
- Add lock metrics
- Implement dead lock detection
- Add read/write locks