Build a Caching System
Introduction
Caching is one of the most important optimizations in computer science. It dramatically improves performance by storing frequently accessed data in fast storage.
In this tutorial, we'll build "CacheBox" - a versatile caching library with multiple eviction strategies, TTL support, and distributed caching capabilities.
What You'll Build
- In-memory cache with thread safety
- Multiple eviction policies (LRU, LFU, FIFO)
- TTL (Time-To-Live) support
- Cache statistics
- Decorator for easy integration
What You'll Learn
- Caching strategies
- Eviction algorithms
- Thread-safe programming
- Cache invalidation
- Performance optimization
Core Concepts
Let's understand caching fundamentals.
Why Cache?
- Speed - Memory is faster than disk
- Reduced Load - Fewer database/API calls
- Cost - Reduces infrastructure costs
- Scalability - Handles more requests
Cache Strategies
- Look-Aside - Check cache first, then source
- Write-Through - Write to both cache and source
- Write-Behind - Write to cache, async to source
- Refresh-Ahead - Proactively refresh expiring items
Project Setup
Bash
# Create project directory
mkdir cachebox
cd cachebox
# Create virtual environment
python -m venv venv
source venv/bin/activate
# Install dependencies
pip install redis pytest
Project Structure
File Structure
cachebox/
├── cachebox/
│ ├── __init__.py
│ ├── cache.py
│ ├── policies.py
│ ├── decorators.py
│ └── stats.py
├── main.py
└── requirements.txt
Cache Implementation
Let's create the core cache class.
Python
# cachebox/cache.py
import time
import threading
from typing import Any, Optional, Dict, Callable
from .policies import EvictionPolicy, LRUCache, LFUCache, FIFOCache
from .stats import CacheStats
class Cache:
__init__(self,
max_size: int = 1000,
default_ttl: Optional[int] = None,
policy: str = 'LRU',
on_evict: Optional[Callable] = None):
self.max_size = max_size
self.default_ttl = default_ttl
self.on_evict = on_evict
self.cache: Dict[str, Any] = {}
self.expiry: Dict[str, float] = {}
self.policy = self._create_policy(policy)
self.lock = threading.RLock()
self.stats = CacheStats()
self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self.cleanup_thread.start()
_create_policy(self, policy: str) -> EvictionPolicy:
policies = {
'LRU': LRUCache,
'LFU': LFUCache,
'FIFO': FIFOCache
}
policy_class = policies.get(policy, LRUCache)
return policy_class(self.max_size)
get(self, key: str, default: Any = None) -> Any:
with self.lock:
if key not in self.cache:
self.stats.increment('misses')
return default
if self._is_expired(key):
self._remove(key)
self.stats.increment('misses')
return default
self.stats.increment('hits')
self.policy.access(key)
return self.cache[key]
set(self, key: str, value: Any, ttl: Optional[int] = None):
with self.lock:
if key in self.cache:
if self.on_evict:
self.on_evict(key, self.cache[key])
self.policy.remove(key)
elif len(self.cache) >= self.max_size:
evicted_key = self.policy.evict()
if evicted_key:
self._remove(evicted_key)
self.stats.increment('evictions')
self.cache[key] = value
self.policy.add(key)
ttl_value = ttl if ttl is not None else self.default_ttl
if ttl_value:
self.expiry[key] = time.time() + ttl_value
delete(self, key: str) -> bool:
with self.lock:
if key in self.cache:
self._remove(key)
return True
return False
clear(self):
with self.lock:
self.cache.clear()
self.expiry.clear()
self.policy.clear()
self.stats.reset()
_remove(self, key: str):
if key in self.cache:
if self.on_evict:
self.on_evict(key, self.cache[key])
del self.cache[key]
self.policy.remove(key)
self.expiry.pop(key, None)
_is_expired(self, key: str) -> bool:
if key not in self.expiry:
return False
return time.time() > self.expiry[key]
_cleanup_loop(self):
while True:
time.sleep(1)
self._cleanup_expired()
_cleanup_expired(self):
with self.lock:
expired_keys = [k for k in self.cache.keys() if self._is_expired(k)]
for key in expired_keys:
self._remove(key)
self.stats.increment('expired')
__contains__(self, key: str) -> bool:
with self.lock:
return key in self.cache and not self._is_expired(key)
__len__(self) -> int:
with self.lock:
return len(self.cache)
Eviction Policies
Let's implement different eviction strategies.
Python
# cachebox/policies.py
from abc import ABC, abstractmethod
from typing import Optional, OrderedDict
from collections import OrderedDict, Counter
class EvictionPolicy(ABC):
__init__(self, max_size: int):
self.max_size = max_size
@abstractmethod
def add(self, key: str):
pass
@abstractmethod
def remove(self, key: str):
pass
@abstractmethod
def access(self, key: str):
pass
@abstractmethod
def evict(self) -> Optional[str]:
pass
@abstractmethod
def clear(self):
pass
class LRUCache(EvictionPolicy):
__init__(self, max_size: int):
super().__init__(max_size)
self.cache = OrderedDict()
add(self, key: str):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = True
while len(self.cache) > self.max_size:
self.cache.popitem(last=False)
remove(self, key: str):
self.cache.pop(key, None)
access(self, key: str):
if key in self.cache:
self.cache.move_to_end(key)
evict(self) -> Optional[str]:
if self.cache:
key = self.cache.popitem(last=False)[0]
return key
return None
clear(self):
self.cache.clear()
class LFUCache(EvictionPolicy):
__init__(self, max_size: int):
super().__init__(max_size)
self.frequency = Counter()
self.keys = set()
add(self, key: str):
self.keys.add(key)
self.frequency[key] = 0
if len(self.keys) > self.max_size:
self.evict()
remove(self, key: str):
self.keys.discard(key)
self.frequency.pop(key, None)
access(self, key: str):
self.frequency[key] += 1
evict(self) -> Optional[str]:
if not self.keys:
return None
min_freq = min(self.frequency[k] for k in self.keys)
lfu_keys = [k for k in self.keys if self.frequency[k] == min_freq]
key = lfu_keys[0]
self.remove(key)
return key
clear(self):
self.keys.clear()
self.frequency.clear()
class FIFOCache(EvictionPolicy):
__init__(self, max_size: int):
super().__init__(max_size)
from collections import deque
self.queue = deque()
self.keys = set()
add(self, key: str):
if key not in self.keys:
self.queue.append(key)
self.keys.add(key)
while len(self.keys) > self.max_size:
self.evict()
remove(self, key: str):
self.keys.discard(key)
access(self, key: str):
pass
evict(self) -> Optional[str]:
while self.queue:
key = self.queue.popleft()
if key in self.keys:
self.keys.discard(key)
return key
return None
clear(self):
self.queue.clear()
self.keys.clear()
Distributed Caching
Let's add Redis-based distributed caching.
Python
# cachebox/distributed.py
import redis
import json
import hashlib
from typing import Any, Optional
from .cache import Cache
class DistributedCache:
__init__(self,
redis_host: str = 'localhost',
redis_port: int = 6379,
redis_db: int = 0,
max_size: int = 1000,
default_ttl: Optional[int] = None):
self.redis = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
self.local_cache = Cache(max_size=max_size, default_ttl=default_ttl)
self.prefix = "cachebox:"
_make_key(self, key: str) -> str:
return f"${self.prefix}${key}"
get(self, key: str, default: Any = None) -> Any:
local_value = self.local_cache.get(key)
if local_value is not None:
return local_value
redis_key = self._make_key(key)
value = self.redis.get(redis_key)
if value:
try:
data = json.loads(value)
self.local_cache.set(key, data['value'], data.get('ttl'))
return data['value']
except (json.JSONDecodeError, KeyError):
return default
return default
set(self, key: str, value: Any, ttl: Optional[int] = None):
self.local_cache.set(key, value, ttl)
redis_key = self._make_key(key)
data = json.dumps({'value': value, 'ttl': ttl})
if ttl:
self.redis.setex(redis_key, ttl, data)
else:
self.redis.set(redis_key, data)
delete(self, key: str) -> bool:
self.local_cache.delete(key)
redis_key = self._make_key(key)
return bool(self.redis.delete(redis_key))
clear(self):
self.local_cache.clear()
keys = self.redis.keys(f"${self.prefix}*")
if keys:
self.redis.delete(*keys)
increment(self, key: str, amount: int = 1) -> int:
redis_key = self._make_key(key)
value = self.redis.incrby(redis_key, amount)
self.local_cache.delete(key)
return value
get_many(self, keys: list) -> dict:
result = {}
for key in keys:
value = self.get(key)
if value is not None:
result[key] = value
return result
Statistics & Monitoring
Let's add cache statistics tracking.
Python
# cachebox/stats.py
import time
from threading import Lock
from typing import Dict
class CacheStats:
__init__(self):
self.stats: Dict[str, int] = {
'hits': 0,
'misses': 0,
'sets': 0,
'deletes': 0,
'evictions': 0,
'expired': 0
}
self.start_time = time.time()
self.lock = Lock()
increment(self, key: str, amount: int = 1):
with self.lock:
if key not in self.stats:
self.stats[key] = 0
self.stats[key] += amount
get(self, key: str) -> int:
with self.lock:
return self.stats.get(key, 0)
get_all(self) -> Dict:
with self.lock:
return self.stats.copy()
hit_rate(self) -> float:
with self.lock:
total = self.stats['hits'] + self.stats['misses']
if total == 0:
return 0.0
return self.stats['hits'] / total
reset(self):
with self.lock:
for key in self.stats:
self.stats[key] = 0
self.start_time = time.time()
report(self) -> str:
with self.lock:
uptime = time.time() - self.start_time
lines = [
"Cache Statistics:",
"-" * 30,
f"Hits: ${self.stats['hits']}",
f"Misses: ${self.stats['misses']}",
f"Hit Rate: ${self.hit_rate():.2%}}",
f"Sets: ${self.stats['sets']}",
f"Deletes: ${self.stats['deletes']}",
f"Evictions: ${self.stats['evictions']}",
f"Expired: ${self.stats['expired']}",
f"Uptime: ${uptime:.1f}s"
]
return '\n'.join(lines)
Integration
Let's add decorators for easy caching.
Python
# cachebox/decorators.py
import functools
import hashlib
import json
from typing import Callable, Optional
from .cache import Cache
def cached(cache: Cache,
ttl: Optional[int] = None,
key_func: Optional[Callable] = None):
"""Decorator to cache function results"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if key_func:
cache_key = key_func(*args, **kwargs)
else:
cache_key = f"${func.__name__}:${_make_key(args, kwargs)}"
result = cache.get(cache_key)
if result is not None:
return result
result = func(*args, **kwargs)
cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator
def _make_key(args: tuple, kwargs: dict) -> str:
key_data = {
'args': [str(a) for a in args],
'kwargs': {k: str(v) for k, v in kwargs.items()}
}
key_str = json.dumps(key_data, sort_keys=True)
return hashlib.md5(key_str.encode()).hexdigest()
# Example usage
# cache = Cache(max_size=100, default_ttl=60)
#
# @cached(cache, ttl=300)
# def expensive_function(x, y):
# import time
# time.sleep(2)
# return x + y
#
# result = expensive_function(1, 2)
# print(result) # Waits 2 seconds
# result = expensive_function(1, 2)
# print(result) # Returns immediately from cache
Testing
Python
# main.py
from cachebox.cache import Cache
# Create cache with LRU policy
cache = Cache(max_size=3, default_ttl=60, policy='LRU')
# Set values
cache.set("user:1", {"name": "John", "email": "john@example.com"})
cache.set("user:2", {"name": "Jane", "email": "jane@example.com"})
cache.set("user:3", {"name": "Bob", "email": "bob@example.com"})
# Get values
user1 = cache.get("user:1")
print("User 1:", user1)
# Test LRU eviction
cache.set("user:4", {"name": "Alice", "email": "alice@example.com"})
# user:2 was least recently used, should be evicted
print("User 2 after eviction:", cache.get("user:2"))
# Test TTL
cache.set("temp", "value", ttl=1)
print("Temp value:", cache.get("temp"))
# Print stats
print("\n" + cache.stats.report())
# Test contains
print("\nuser:1 in cache?", "user:1" in cache)
Testing Checklist
- Cache stores and retrieves values
- LRU eviction works correctly
- TTL expiration works
- Statistics are tracked
- Decorator works
Summary
Congratulations! You've built a complete caching system.
What You Built
- In-Memory Cache - Thread-safe storage
- LRU Policy - Least Recently Used
- LFU Policy - Least Frequently Used
- FIFO Policy - First In First Out
- TTL Support - Time-based expiration
- Statistics - Hit rate and performance tracking
- Decorators - Easy function caching