← Back to Tutorials
Python

Build a Distributed Logger

Difficulty: Advanced Est. Time: ~5 hours

Introduction

Distributed logging systems collect log messages from multiple services and aggregate them into a central location. They are essential for debugging and monitoring distributed systems.

What You'll Build
  • Hierarchical logger
  • Multiple handlers
  • Log formatting
  • Network transmission

Core Concepts

Log Levels

DEBUG, INFO, WARNING, ERROR, CRITICAL - each representing message severity.

Handlers

Handlers determine where log messages go - console, files, network.

Prerequisites

  • Python 3.8+
  • Socket programming basics

Logger

Create distlog/logger.py:

import threading
import time
from datetime import datetime
from enum import IntEnum
from typing import List, Optional, Dict
from dataclasses import dataclass


class Level(IntEnum):
    DEBUG = 10
    INFO = 20
    WARNING = 30
    ERROR = 40
    CRITICAL = 50


@dataclass
class LogRecord:
    name: str
    level: Level
    message: str
    timestamp: float
    extra: Dict = None


class Handler:
    def emit(self, record: LogRecord):
        raise NotImplementedError
    
    def set_level(self, level: Level):
        self.level = level
    
    def format(self, record: LogRecord) -> str:
        return f"{record.name} - {record.level.name} - {record.message}"


class Logger:
    def __init__(self, name: str, level: Level = Level.INFO):
        self.name = name
        self.level = level
        self.handlers: List[Handler] = []
        self.parent: Optional[Logger] = None
        self._lock = threading.Lock()
    
    def add_handler(self, handler: Handler):
        with self._lock:
            self.handlers.append(handler)
    
    def set_level(self, level: Level):
        self.level = level
    
    def _log(self, level: Level, message: str, **kwargs):
        if level < self.level:
            return
        
        record = LogRecord(
            name=self.name,
            level=level,
            message=message,
            timestamp=time.time(),
            extra=kwargs
        )
        
        with self._lock:
            for handler in self.handlers:
                if level >= handler.level:
                    handler.emit(record)
    
    def debug(self, message: str):
        self._log(Level.DEBUG, message)
    
    def info(self, message: str):
        self._log(Level.INFO, message)
    
    def warning(self, message: str):
        self._log(Level.WARNING, message)
    
    def error(self, message: str):
        self._log(Level.ERROR, message)
    
    def critical(self, message: str):
        self._log(Level.CRITICAL, message)


_loggers: Dict[str, Logger] = {}


def get_logger(name: str) -> Logger:
    if name not in _loggers:
        _loggers[name] = Logger(name)
    return _loggers[name]

Handlers

import sys
import queue
import threading
from pathlib import Path


class ConsoleHandler(Handler):
    def __init__(self, stream=None):
        self.stream = stream or sys.stdout
        self.level = Level.INFO
    
    def emit(self, record: LogRecord):
        self.stream.write(self.format(record) + '\n')
        self.stream.flush()


class FileHandler(Handler):
    def __init__(self, filename: str, max_bytes: int = 0, backup_count: int = 0):
        self.filename = filename
        self.max_bytes = max_bytes
        self.backup_count = backup_count
        self.level = Level.DEBUG
        self._lock = threading.Lock()
    
    def emit(self, record: LogRecord):
        with self._lock:
            try:
                with open(self.filename, 'a') as f:
                    f.write(self.format(record) + '\n')
            except Exception as e:
                print(f"Error writing log: {e}")
    
    def format(self, record: LogRecord):
        dt = datetime.fromtimestamp(record.timestamp)
        return f"{dt.isoformat()} {record.level.name} {record.name} {record.message}"


class QueueHandler(Handler):
    def __init__(self, queue: queue.Queue):
        self.queue = queue
        self.level = Level.DEBUG
    
    def emit(self, record: LogRecord):
        self.queue.put(record)

Formatters

import json
from datetime import datetime


class Formatter:
    def format(self, record: LogRecord) -> str:
        raise NotImplementedError


class SimpleFormatter(Formatter):
    def format(self, record: LogRecord):
        dt = datetime.fromtimestamp(record.timestamp)
        return f"{dt.strftime('%Y-%m-%d %H:%M:%S')} [{record.level.name}] {record.name}: {record.message}"


class JsonFormatter(Formatter):
    def format(self, record: LogRecord):
        return json.dumps({
            'timestamp': record.timestamp,
            'level': record.level.name,
            'logger': record.name,
            'message': record.message,
            'extra': record.extra or {}
        })


class SyslogFormatter(Formatter):
    def format(self, record: LogRecord):
        dt = datetime.fromtimestamp(record.timestamp)
        timestamp = dt.strftime('%b %d %H:%M:%S')
        return f"{timestamp} localhost {record.name}[1234]: {record.message}"

Distributed Features

import socket
import pickle


class NetworkHandler(Handler):
    def __init__(self, host: str, port: int):
        self.host = host
        self.port = port
        self.level = Level.DEBUG
        self.socket = None
    
    def emit(self, record: LogRecord):
        try:
            if not self.socket:
                self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.socket.connect((self.host, self.port))
            
            data = pickle.dumps(record)
            self.socket.sendall(len(data).to_bytes(4) + data)
        except Exception as e:
            self.socket = None


class LogServer:
    def __init__(self, host: str = '0.0.0.0', port: int = 5000):
        self.host = host
        self.port = port
        self.handlers: List[Handler] = []
        self._server = None
    
    def add_handler(self, handler: Handler):
        self.handlers.append(handler)
    
    def start(self):
        import socketserver
        
        class LogHandler(socketserver.BaseRequestHandler):
            def handle(self):
                size = int.from_bytes(self.request.recv(4), 'big')
                data = self.request.recv(size)
                record = pickle.loads(data)
                
                for handler in self.server.handlers:
                    handler.emit(record)
        
        self._server = socketserver.TCPServer((self.host, self.port), LogHandler)
        print(f"Log server running on {self.host}:{self.port}")
        self._server.serve_forever()
    
    def stop(self):
        if self._server:
            self._server.shutdown()

Testing

from distlog import get_logger, FileHandler, ConsoleHandler

logger = get_logger('myapp')
logger.add_handler(ConsoleHandler())
logger.add_handler(FileHandler('app.log'))

logger.info('Application started')
logger.warning('This is a warning')
logger.error('An error occurred')

Summary

You built a distributed logging system with hierarchical loggers, multiple handlers, formatters, and network transmission capabilities.