Build a Job Retry System
Introduction
Job retry systems handle transient failures by automatically retrying failed operations. They are essential for building resilient distributed systems where temporary failures are common.
What You'll Build
- Configurable retry policies
- Exponential backoff
- Circuit breaker pattern
- Retry decorators
Core Concepts
Retry Policy
A retry policy defines how many times to retry and when to give up.
Backoff
Backoff strategies increase the delay between retries to avoid overwhelming the system.
Retry Policy
Create retry/policy.py:
import time
from dataclasses import dataclass
from typing import Callable, Any, Optional
from enum import Enum
class BackoffStrategy(Enum):
FIXED = "fixed"
EXPONENTIAL = "exponential"
LINEAR = "linear"
@dataclass
class RetryPolicy:
max_attempts: int = 3
initial_delay: float = 1.0
max_delay: float = 60.0
multiplier: float = 2.0
strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL
retriable_exceptions: tuple = (Exception,)
def get_delay(self, attempt: int) -> float:
if self.strategy == BackoffStrategy.FIXED:
return self.initial_delay
elif self.strategy == BackoffStrategy.EXPONENTIAL:
delay = self.initial_delay * (self.multiplier ** (attempt - 1))
elif self.strategy == BackoffStrategy.LINEAR:
delay = self.initial_delay * attempt
return min(delay, self.max_delay)
def should_retry(self, attempt: int, exception: Exception) -> bool:
if attempt >= self.max_attempts:
return False
if self.retriable_exceptions:
return isinstance(exception, self.retriable_exceptions)
return True
Executor
Create retry/executor.py:
import time
import threading
import functools
from .policy import RetryPolicy
class RetryExecutor:
def __init__(self, policy: RetryPolicy = None):
self.policy = policy or RetryPolicy()
def execute(self, func: Callable, *args, **kwargs) -> Any:
last_exception = None
for attempt in range(1, self.policy.max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if not self.policy.should_retry(attempt, e):
raise
if attempt < self.policy.max_attempts:
delay = self.policy.get_delay(attempt)
time.sleep(delay)
raise last_exception
def execute_with_callback(self, func: Callable, on_retry: Callable = None, *args, **kwargs) -> Any:
last_exception = None
for attempt in range(1, self.policy.max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if not self.policy.should_retry(attempt, e):
raise
if attempt < self.policy.max_attempts:
delay = self.policy.get_delay(attempt)
if on_retry:
on_retry(attempt, delay, e)
time.sleep(delay)
raise last_exception
def retry(policy: RetryPolicy = None):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
executor = RetryExecutor(policy)
return executor.execute(func, *args, **kwargs)
return wrapper
return decorator
Backoff Strategies
import random
class JitteredBackoff:
def __init__(self, base_delay: float, max_delay: float = 60.0, jitter: float = 0.1):
self.base_delay = base_delay
self.max_delay = max_delay
self.jitter = jitter
def get_delay(self, attempt: int) -> float:
import math
delay = self.base_delay * (2 ** (attempt - 1))
delay = min(delay, self.max_delay)
jitter_range = delay * self.jitter
delay += random.uniform(-jitter_range, jitter_range)
return max(0, delay)
class DecorrelatedJitterBackoff:
def __init__(self, base_delay: float = 1.0, max_delay: float = 60.0):
self.base_delay = base_delay
self.max_delay = max_delay
self.previous_delay = base_delay
def get_delay(self, attempt: int) -> float:
delay = min(self.base_delay * (3 ** attempt), self.max_delay)
if attempt > 1:
delay = min(delay * self.previous_delay * random.uniform(0.5, 1.5), self.max_delay)
self.previous_delay = delay
return delay
Circuit Breaker
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Testing
from retry import RetryPolicy, RetryExecutor, retry, BackoffStrategy
policy = RetryPolicy(
max_attempts=3,
initial_delay=1.0,
strategy=BackoffStrategy.EXPONENTIAL
)
executor = RetryExecutor(policy)
call_count = 0
def unreliable_function():
global call_count
call_count += 1
if call_count < 3:
raise ConnectionError("Temporary failure")
return "Success!"
result = executor.execute(unreliable_function)
print(result)
@retry()
def decorated_function():
pass
Summary
You built a job retry system with configurable policies, multiple backoff strategies, and circuit breaker pattern.