← Back to Tutorials
Python

Build a Caching System

Difficulty: Intermediate Est. Time: ~3 hours

Introduction

Caching is one of the most important optimizations in computer science. It dramatically improves performance by storing frequently accessed data in fast storage.

In this tutorial, we'll build "CacheBox" - a versatile caching library with multiple eviction strategies, TTL support, and distributed caching capabilities.

What You'll Build
  • In-memory cache with thread safety
  • Multiple eviction policies (LRU, LFU, FIFO)
  • TTL (Time-To-Live) support
  • Cache statistics
  • Decorator for easy integration
What You'll Learn
  • Caching strategies
  • Eviction algorithms
  • Thread-safe programming
  • Cache invalidation
  • Performance optimization

Core Concepts

Let's understand caching fundamentals.

Why Cache?

  • Speed - Memory is faster than disk
  • Reduced Load - Fewer database/API calls
  • Cost - Reduces infrastructure costs
  • Scalability - Handles more requests

Cache Strategies

  • Look-Aside - Check cache first, then source
  • Write-Through - Write to both cache and source
  • Write-Behind - Write to cache, async to source
  • Refresh-Ahead - Proactively refresh expiring items

Project Setup

Bash
# Create project directory
mkdir cachebox
cd cachebox

# Create virtual environment
python -m venv venv
source venv/bin/activate

# Install dependencies
pip install redis pytest

Project Structure

File Structure
cachebox/
├── cachebox/
│   ├── __init__.py
│   ├── cache.py
│   ├── policies.py
│   ├── decorators.py
│   └── stats.py
├── main.py
└── requirements.txt

Cache Implementation

Let's create the core cache class.

Python
# cachebox/cache.py
import time
import threading
from typing import Any, Optional, Dict, Callable
from .policies import EvictionPolicy, LRUCache, LFUCache, FIFOCache
from .stats import CacheStats

class Cache:
    __init__(self, 
                 max_size: int = 1000,
                 default_ttl: Optional[int] = None,
                 policy: str = 'LRU',
                 on_evict: Optional[Callable] = None):
        self.max_size = max_size
        self.default_ttl = default_ttl
        self.on_evict = on_evict
        
        self.cache: Dict[str, Any] = {}
        self.expiry: Dict[str, float] = {}
        
        self.policy = self._create_policy(policy)
        
        self.lock = threading.RLock()
        self.stats = CacheStats()
        
        self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
        self.cleanup_thread.start()
    
    _create_policy(self, policy: str) -> EvictionPolicy:
        policies = {
            'LRU': LRUCache,
            'LFU': LFUCache,
            'FIFO': FIFOCache
        }
        policy_class = policies.get(policy, LRUCache)
        return policy_class(self.max_size)
    
    get(self, key: str, default: Any = None) -> Any:
        with self.lock:
            if key not in self.cache:
                self.stats.increment('misses')
                return default
            
            if self._is_expired(key):
                self._remove(key)
                self.stats.increment('misses')
                return default
            
            self.stats.increment('hits')
            self.policy.access(key)
            return self.cache[key]
    
    set(self, key: str, value: Any, ttl: Optional[int] = None):
        with self.lock:
            if key in self.cache:
                if self.on_evict:
                    self.on_evict(key, self.cache[key])
                self.policy.remove(key)
            elif len(self.cache) >= self.max_size:
                evicted_key = self.policy.evict()
                if evicted_key:
                    self._remove(evicted_key)
                    self.stats.increment('evictions')
            
            self.cache[key] = value
            self.policy.add(key)
            
            ttl_value = ttl if ttl is not None else self.default_ttl
            if ttl_value:
                self.expiry[key] = time.time() + ttl_value
    
    delete(self, key: str) -> bool:
        with self.lock:
            if key in self.cache:
                self._remove(key)
                return True
            return False
    
    clear(self):
        with self.lock:
            self.cache.clear()
            self.expiry.clear()
            self.policy.clear()
            self.stats.reset()
    
    _remove(self, key: str):
        if key in self.cache:
            if self.on_evict:
                self.on_evict(key, self.cache[key])
            del self.cache[key]
            self.policy.remove(key)
            self.expiry.pop(key, None)
    
    _is_expired(self, key: str) -> bool:
        if key not in self.expiry:
            return False
        return time.time() > self.expiry[key]
    
    _cleanup_loop(self):
        while True:
            time.sleep(1)
            self._cleanup_expired()
    
    _cleanup_expired(self):
        with self.lock:
            expired_keys = [k for k in self.cache.keys() if self._is_expired(k)]
            for key in expired_keys:
                self._remove(key)
                self.stats.increment('expired')
    
    __contains__(self, key: str) -> bool:
        with self.lock:
            return key in self.cache and not self._is_expired(key)
    
    __len__(self) -> int:
        with self.lock:
            return len(self.cache)

Eviction Policies

Let's implement different eviction strategies.

Python
# cachebox/policies.py
from abc import ABC, abstractmethod
from typing import Optional, OrderedDict
from collections import OrderedDict, Counter

class EvictionPolicy(ABC):
    __init__(self, max_size: int):
        self.max_size = max_size
    
    @abstractmethod
    def add(self, key: str):
        pass
    
    @abstractmethod
    def remove(self, key: str):
        pass
    
    @abstractmethod
    def access(self, key: str):
        pass
    
    @abstractmethod
    def evict(self) -> Optional[str]:
        pass
    
    @abstractmethod
    def clear(self):
        pass


class LRUCache(EvictionPolicy):
    __init__(self, max_size: int):
        super().__init__(max_size)
        self.cache = OrderedDict()
    
    add(self, key: str):
        if key in self.cache:
            self.cache.move_to_end(key)
        self.cache[key] = True
        
        while len(self.cache) > self.max_size:
            self.cache.popitem(last=False)
    
    remove(self, key: str):
        self.cache.pop(key, None)
    
    access(self, key: str):
        if key in self.cache:
            self.cache.move_to_end(key)
    
    evict(self) -> Optional[str]:
        if self.cache:
            key = self.cache.popitem(last=False)[0]
            return key
        return None
    
    clear(self):
        self.cache.clear()


class LFUCache(EvictionPolicy):
    __init__(self, max_size: int):
        super().__init__(max_size)
        self.frequency = Counter()
        self.keys = set()
    
    add(self, key: str):
        self.keys.add(key)
        self.frequency[key] = 0
        
        if len(self.keys) > self.max_size:
            self.evict()
    
    remove(self, key: str):
        self.keys.discard(key)
        self.frequency.pop(key, None)
    
    access(self, key: str):
        self.frequency[key] += 1
    
    evict(self) -> Optional[str]:
        if not self.keys:
            return None
        
        min_freq = min(self.frequency[k] for k in self.keys)
        lfu_keys = [k for k in self.keys if self.frequency[k] == min_freq]
        
        key = lfu_keys[0]
        self.remove(key)
        return key
    
    clear(self):
        self.keys.clear()
        self.frequency.clear()


class FIFOCache(EvictionPolicy):
    __init__(self, max_size: int):
        super().__init__(max_size)
        from collections import deque
        self.queue = deque()
        self.keys = set()
    
    add(self, key: str):
        if key not in self.keys:
            self.queue.append(key)
            self.keys.add(key)
        
        while len(self.keys) > self.max_size:
            self.evict()
    
    remove(self, key: str):
        self.keys.discard(key)
    
    access(self, key: str):
        pass
    
    evict(self) -> Optional[str]:
        while self.queue:
            key = self.queue.popleft()
            if key in self.keys:
                self.keys.discard(key)
                return key
        return None
    
    clear(self):
        self.queue.clear()
        self.keys.clear()

Distributed Caching

Let's add Redis-based distributed caching.

Python
# cachebox/distributed.py
import redis
import json
import hashlib
from typing import Any, Optional
from .cache import Cache

class DistributedCache:
    __init__(self, 
                 redis_host: str = 'localhost',
                 redis_port: int = 6379,
                 redis_db: int = 0,
                 max_size: int = 1000,
                 default_ttl: Optional[int] = None):
        self.redis = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=redis_db,
            decode_responses=True
        )
        self.local_cache = Cache(max_size=max_size, default_ttl=default_ttl)
        self.prefix = "cachebox:"
    
    _make_key(self, key: str) -> str:
        return f"${self.prefix}${key}"
    
    get(self, key: str, default: Any = None) -> Any:
        local_value = self.local_cache.get(key)
        if local_value is not None:
            return local_value
        
        redis_key = self._make_key(key)
        value = self.redis.get(redis_key)
        
        if value:
            try:
                data = json.loads(value)
                self.local_cache.set(key, data['value'], data.get('ttl'))
                return data['value']
            except (json.JSONDecodeError, KeyError):
                return default
        
        return default
    
    set(self, key: str, value: Any, ttl: Optional[int] = None):
        self.local_cache.set(key, value, ttl)
        
        redis_key = self._make_key(key)
        data = json.dumps({'value': value, 'ttl': ttl})
        
        if ttl:
            self.redis.setex(redis_key, ttl, data)
        else:
            self.redis.set(redis_key, data)
    
    delete(self, key: str) -> bool:
        self.local_cache.delete(key)
        redis_key = self._make_key(key)
        return bool(self.redis.delete(redis_key))
    
    clear(self):
        self.local_cache.clear()
        
        keys = self.redis.keys(f"${self.prefix}*")
        if keys:
            self.redis.delete(*keys)
    
    increment(self, key: str, amount: int = 1) -> int:
        redis_key = self._make_key(key)
        value = self.redis.incrby(redis_key, amount)
        self.local_cache.delete(key)
        return value
    
    get_many(self, keys: list) -> dict:
        result = {}
        for key in keys:
            value = self.get(key)
            if value is not None:
                result[key] = value
        return result

Statistics & Monitoring

Let's add cache statistics tracking.

Python
# cachebox/stats.py
import time
from threading import Lock
from typing import Dict

class CacheStats:
    __init__(self):
        self.stats: Dict[str, int] = {
            'hits': 0,
            'misses': 0,
            'sets': 0,
            'deletes': 0,
            'evictions': 0,
            'expired': 0
        }
        self.start_time = time.time()
        self.lock = Lock()
    
    increment(self, key: str, amount: int = 1):
        with self.lock:
            if key not in self.stats:
                self.stats[key] = 0
            self.stats[key] += amount
    
    get(self, key: str) -> int:
        with self.lock:
            return self.stats.get(key, 0)
    
    get_all(self) -> Dict:
        with self.lock:
            return self.stats.copy()
    
    hit_rate(self) -> float:
        with self.lock:
            total = self.stats['hits'] + self.stats['misses']
            if total == 0:
                return 0.0
            return self.stats['hits'] / total
    
    reset(self):
        with self.lock:
            for key in self.stats:
                self.stats[key] = 0
            self.start_time = time.time()
    
    report(self) -> str:
        with self.lock:
            uptime = time.time() - self.start_time
            lines = [
                "Cache Statistics:",
                "-" * 30,
                f"Hits: ${self.stats['hits']}",
                f"Misses: ${self.stats['misses']}",
                f"Hit Rate: ${self.hit_rate():.2%}}",
                f"Sets: ${self.stats['sets']}",
                f"Deletes: ${self.stats['deletes']}",
                f"Evictions: ${self.stats['evictions']}",
                f"Expired: ${self.stats['expired']}",
                f"Uptime: ${uptime:.1f}s"
            ]
            return '\n'.join(lines)

Integration

Let's add decorators for easy caching.

Python
# cachebox/decorators.py
import functools
import hashlib
import json
from typing import Callable, Optional
from .cache import Cache

def cached(cache: Cache, 
             ttl: Optional[int] = None,
             key_func: Optional[Callable] = None):
    """Decorator to cache function results"""
    
    def decorator(func: Callable):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            if key_func:
                cache_key = key_func(*args, **kwargs)
            else:
                cache_key = f"${func.__name__}:${_make_key(args, kwargs)}"
            
            result = cache.get(cache_key)
            
            if result is not None:
                return result
            
            result = func(*args, **kwargs)
            cache.set(cache_key, result, ttl)
            
            return result
        
        return wrapper
    
    return decorator


def _make_key(args: tuple, kwargs: dict) -> str:
    key_data = {
        'args': [str(a) for a in args],
        'kwargs': {k: str(v) for k, v in kwargs.items()}
    }
    key_str = json.dumps(key_data, sort_keys=True)
    return hashlib.md5(key_str.encode()).hexdigest()


# Example usage
# cache = Cache(max_size=100, default_ttl=60)
#
# @cached(cache, ttl=300)
# def expensive_function(x, y):
#     import time
#     time.sleep(2)
#     return x + y
#
# result = expensive_function(1, 2)
# print(result)  # Waits 2 seconds
# result = expensive_function(1, 2)
# print(result)  # Returns immediately from cache

Testing

Python
# main.py
from cachebox.cache import Cache

# Create cache with LRU policy
cache = Cache(max_size=3, default_ttl=60, policy='LRU')

# Set values
cache.set("user:1", {"name": "John", "email": "john@example.com"})
cache.set("user:2", {"name": "Jane", "email": "jane@example.com"})
cache.set("user:3", {"name": "Bob", "email": "bob@example.com"})

# Get values
user1 = cache.get("user:1")
print("User 1:", user1)

# Test LRU eviction
cache.set("user:4", {"name": "Alice", "email": "alice@example.com"})

# user:2 was least recently used, should be evicted
print("User 2 after eviction:", cache.get("user:2"))

# Test TTL
cache.set("temp", "value", ttl=1)
print("Temp value:", cache.get("temp"))

# Print stats
print("\n" + cache.stats.report())

# Test contains
print("\nuser:1 in cache?", "user:1" in cache)
Testing Checklist
  • Cache stores and retrieves values
  • LRU eviction works correctly
  • TTL expiration works
  • Statistics are tracked
  • Decorator works

Summary

Congratulations! You've built a complete caching system.

What You Built

  • In-Memory Cache - Thread-safe storage
  • LRU Policy - Least Recently Used
  • LFU Policy - Least Frequently Used
  • FIFO Policy - First In First Out
  • TTL Support - Time-based expiration
  • Statistics - Hit rate and performance tracking
  • Decorators - Easy function caching

Next Steps

  • Add write-through caching
  • Implement cache warming
  • Add cache invalidation patterns
  • Implement cache clustering

Continue Learning

Try these tutorials:

  • Build a Rate Limiter
  • Build a Background Job Queue
  • Build a REST API with JWT