Build a Distributed Logger
Introduction
Distributed logging systems collect log messages from multiple services and aggregate them into a central location. They are essential for debugging and monitoring distributed systems.
What You'll Build
- Hierarchical logger
- Multiple handlers
- Log formatting
- Network transmission
Core Concepts
Log Levels
DEBUG, INFO, WARNING, ERROR, CRITICAL - each representing message severity.
Handlers
Handlers determine where log messages go - console, files, network.
Prerequisites
- Python 3.8+
- Socket programming basics
Logger
Create distlog/logger.py:
import threading
import time
from datetime import datetime
from enum import IntEnum
from typing import List, Optional, Dict
from dataclasses import dataclass
class Level(IntEnum):
DEBUG = 10
INFO = 20
WARNING = 30
ERROR = 40
CRITICAL = 50
@dataclass
class LogRecord:
name: str
level: Level
message: str
timestamp: float
extra: Dict = None
class Handler:
def emit(self, record: LogRecord):
raise NotImplementedError
def set_level(self, level: Level):
self.level = level
def format(self, record: LogRecord) -> str:
return f"{record.name} - {record.level.name} - {record.message}"
class Logger:
def __init__(self, name: str, level: Level = Level.INFO):
self.name = name
self.level = level
self.handlers: List[Handler] = []
self.parent: Optional[Logger] = None
self._lock = threading.Lock()
def add_handler(self, handler: Handler):
with self._lock:
self.handlers.append(handler)
def set_level(self, level: Level):
self.level = level
def _log(self, level: Level, message: str, **kwargs):
if level < self.level:
return
record = LogRecord(
name=self.name,
level=level,
message=message,
timestamp=time.time(),
extra=kwargs
)
with self._lock:
for handler in self.handlers:
if level >= handler.level:
handler.emit(record)
def debug(self, message: str):
self._log(Level.DEBUG, message)
def info(self, message: str):
self._log(Level.INFO, message)
def warning(self, message: str):
self._log(Level.WARNING, message)
def error(self, message: str):
self._log(Level.ERROR, message)
def critical(self, message: str):
self._log(Level.CRITICAL, message)
_loggers: Dict[str, Logger] = {}
def get_logger(name: str) -> Logger:
if name not in _loggers:
_loggers[name] = Logger(name)
return _loggers[name]
Handlers
import sys
import queue
import threading
from pathlib import Path
class ConsoleHandler(Handler):
def __init__(self, stream=None):
self.stream = stream or sys.stdout
self.level = Level.INFO
def emit(self, record: LogRecord):
self.stream.write(self.format(record) + '\n')
self.stream.flush()
class FileHandler(Handler):
def __init__(self, filename: str, max_bytes: int = 0, backup_count: int = 0):
self.filename = filename
self.max_bytes = max_bytes
self.backup_count = backup_count
self.level = Level.DEBUG
self._lock = threading.Lock()
def emit(self, record: LogRecord):
with self._lock:
try:
with open(self.filename, 'a') as f:
f.write(self.format(record) + '\n')
except Exception as e:
print(f"Error writing log: {e}")
def format(self, record: LogRecord):
dt = datetime.fromtimestamp(record.timestamp)
return f"{dt.isoformat()} {record.level.name} {record.name} {record.message}"
class QueueHandler(Handler):
def __init__(self, queue: queue.Queue):
self.queue = queue
self.level = Level.DEBUG
def emit(self, record: LogRecord):
self.queue.put(record)
Formatters
import json
from datetime import datetime
class Formatter:
def format(self, record: LogRecord) -> str:
raise NotImplementedError
class SimpleFormatter(Formatter):
def format(self, record: LogRecord):
dt = datetime.fromtimestamp(record.timestamp)
return f"{dt.strftime('%Y-%m-%d %H:%M:%S')} [{record.level.name}] {record.name}: {record.message}"
class JsonFormatter(Formatter):
def format(self, record: LogRecord):
return json.dumps({
'timestamp': record.timestamp,
'level': record.level.name,
'logger': record.name,
'message': record.message,
'extra': record.extra or {}
})
class SyslogFormatter(Formatter):
def format(self, record: LogRecord):
dt = datetime.fromtimestamp(record.timestamp)
timestamp = dt.strftime('%b %d %H:%M:%S')
return f"{timestamp} localhost {record.name}[1234]: {record.message}"
Distributed Features
import socket
import pickle
class NetworkHandler(Handler):
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.level = Level.DEBUG
self.socket = None
def emit(self, record: LogRecord):
try:
if not self.socket:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
data = pickle.dumps(record)
self.socket.sendall(len(data).to_bytes(4) + data)
except Exception as e:
self.socket = None
class LogServer:
def __init__(self, host: str = '0.0.0.0', port: int = 5000):
self.host = host
self.port = port
self.handlers: List[Handler] = []
self._server = None
def add_handler(self, handler: Handler):
self.handlers.append(handler)
def start(self):
import socketserver
class LogHandler(socketserver.BaseRequestHandler):
def handle(self):
size = int.from_bytes(self.request.recv(4), 'big')
data = self.request.recv(size)
record = pickle.loads(data)
for handler in self.server.handlers:
handler.emit(record)
self._server = socketserver.TCPServer((self.host, self.port), LogHandler)
print(f"Log server running on {self.host}:{self.port}")
self._server.serve_forever()
def stop(self):
if self._server:
self._server.shutdown()
Testing
from distlog import get_logger, FileHandler, ConsoleHandler
logger = get_logger('myapp')
logger.add_handler(ConsoleHandler())
logger.add_handler(FileHandler('app.log'))
logger.info('Application started')
logger.warning('This is a warning')
logger.error('An error occurred')
Summary
You built a distributed logging system with hierarchical loggers, multiple handlers, formatters, and network transmission capabilities.