← Back to Tutorials
Python

Build a Job Retry System

Difficulty: Intermediate Est. Time: ~3 hours

Introduction

Job retry systems handle transient failures by automatically retrying failed operations. They are essential for building resilient distributed systems where temporary failures are common.

What You'll Build
  • Configurable retry policies
  • Exponential backoff
  • Circuit breaker pattern
  • Retry decorators

Core Concepts

Retry Policy

A retry policy defines how many times to retry and when to give up.

Backoff

Backoff strategies increase the delay between retries to avoid overwhelming the system.

Retry Policy

Create retry/policy.py:

import time
from dataclasses import dataclass
from typing import Callable, Any, Optional
from enum import Enum


class BackoffStrategy(Enum):
    FIXED = "fixed"
    EXPONENTIAL = "exponential"
    LINEAR = "linear"


@dataclass
class RetryPolicy:
    max_attempts: int = 3
    initial_delay: float = 1.0
    max_delay: float = 60.0
    multiplier: float = 2.0
    strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL
    retriable_exceptions: tuple = (Exception,)
    
    def get_delay(self, attempt: int) -> float:
        if self.strategy == BackoffStrategy.FIXED:
            return self.initial_delay
        
        elif self.strategy == BackoffStrategy.EXPONENTIAL:
            delay = self.initial_delay * (self.multiplier ** (attempt - 1))
        
        elif self.strategy == BackoffStrategy.LINEAR:
            delay = self.initial_delay * attempt
        
        return min(delay, self.max_delay)
    
    def should_retry(self, attempt: int, exception: Exception) -> bool:
        if attempt >= self.max_attempts:
            return False
        
        if self.retriable_exceptions:
            return isinstance(exception, self.retriable_exceptions)
        
        return True

Executor

Create retry/executor.py:

import time
import threading
import functools
from .policy import RetryPolicy


class RetryExecutor:
    def __init__(self, policy: RetryPolicy = None):
        self.policy = policy or RetryPolicy()
    
    def execute(self, func: Callable, *args, **kwargs) -> Any:
        last_exception = None
        
        for attempt in range(1, self.policy.max_attempts + 1):
            try:
                return func(*args, **kwargs)
            
            except Exception as e:
                last_exception = e
                
                if not self.policy.should_retry(attempt, e):
                    raise
                
                if attempt < self.policy.max_attempts:
                    delay = self.policy.get_delay(attempt)
                    time.sleep(delay)
        
        raise last_exception
    
    def execute_with_callback(self, func: Callable, on_retry: Callable = None, *args, **kwargs) -> Any:
        last_exception = None
        
        for attempt in range(1, self.policy.max_attempts + 1):
            try:
                return func(*args, **kwargs)
            
            except Exception as e:
                last_exception = e
                
                if not self.policy.should_retry(attempt, e):
                    raise
                
                if attempt < self.policy.max_attempts:
                    delay = self.policy.get_delay(attempt)
                    
                    if on_retry:
                        on_retry(attempt, delay, e)
                    
                    time.sleep(delay)
        
        raise last_exception


def retry(policy: RetryPolicy = None):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            executor = RetryExecutor(policy)
            return executor.execute(func, *args, **kwargs)
        return wrapper
    return decorator

Backoff Strategies

import random


class JitteredBackoff:
    def __init__(self, base_delay: float, max_delay: float = 60.0, jitter: float = 0.1):
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.jitter = jitter
    
    def get_delay(self, attempt: int) -> float:
        import math
        delay = self.base_delay * (2 ** (attempt - 1))
        delay = min(delay, self.max_delay)
        
        jitter_range = delay * self.jitter
        delay += random.uniform(-jitter_range, jitter_range)
        
        return max(0, delay)


class DecorrelatedJitterBackoff:
    def __init__(self, base_delay: float = 1.0, max_delay: float = 60.0):
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.previous_delay = base_delay
    
    def get_delay(self, attempt: int) -> float:
        delay = min(self.base_delay * (3 ** attempt), self.max_delay)
        
        if attempt > 1:
            delay = min(delay * self.previous_delay * random.uniform(0.5, 1.5), self.max_delay)
        
        self.previous_delay = delay
        return delay

Circuit Breaker

import time
from enum import Enum


class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
    
    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is open")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

Testing

from retry import RetryPolicy, RetryExecutor, retry, BackoffStrategy

policy = RetryPolicy(
    max_attempts=3,
    initial_delay=1.0,
    strategy=BackoffStrategy.EXPONENTIAL
)

executor = RetryExecutor(policy)

call_count = 0

def unreliable_function():
    global call_count
    call_count += 1
    if call_count < 3:
        raise ConnectionError("Temporary failure")
    return "Success!"

result = executor.execute(unreliable_function)
print(result)

@retry()
def decorated_function():
    pass

Summary

You built a job retry system with configurable policies, multiple backoff strategies, and circuit breaker pattern.