← Back to Tutorials
Python

Build a Log Monitoring System

Difficulty: Intermediate Est. Time: ~3 hours

Introduction

Log monitoring is critical for maintaining healthy applications. It helps detect errors, track performance, and identify security threats in real-time.

In this tutorial, we'll build "LogWatch" - a real-time log monitoring system that watches log files, parses entries, triggers alerts, and provides a dashboard.

What You'll Build
  • Real-time log file watcher
  • Multi-format log parser
  • Pattern-based alerting
  • Statistics dashboard
  • Notification system
What You'll Learn
  • File system monitoring
  • Log parsing techniques
  • Real-time alerting
  • Pattern matching
  • Data visualization

Core Concepts

Let's understand log monitoring fundamentals.

Log Monitoring Architecture

  • Watcher - Monitors files for changes
  • Parser - Extracts structured data from logs
  • Analyzer - Detects patterns and anomalies
  • Alerter - Sends notifications
  • Dashboard - Displays metrics and alerts

Common Log Formats

  • Syslog - Standard Unix logging
  • Apache/Nginx - Web server logs
  • JSON - Structured logging
  • Custom - Application-specific formats

Project Setup

Bash
# Create project directory
mkdir logwatch
cd logwatch

# Create virtual environment
python -m venv venv
source venv/bin/activate

# Install dependencies
pip install watchdog colorama flask

Project Structure

File Structure
logwatch/
├── logwatch/
│   ├── __init__.py
│   ├── watcher.py
│   ├── parser.py
│   ├── analyzer.py
│   ├── alerter.py
│   └── dashboard.py
├── templates/
│   └── index.html
├── logs/
└── main.py

Log Watcher

Let's create the file watcher component.

Python
# logwatch/watcher.py
import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileModifiedEvent
from typing import Callable, List

class LogFileHandler(FileSystemEventHandler):
    __init__(self, callback: Callable):
        super().__init__()
        self.callback = callback
        self.file_positions = {}
    
    on_modified(self, event):
        if event.is_directory:
            return
        
        if isinstance(event, FileModifiedEvent):
            self._read_new_lines(event.src_path)
    
    _read_new_lines(self, filepath: str):
        try:
            with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
                position = self.file_positions.get(filepath, 0)
                f.seek(position)
                
                new_lines = f.readlines()
                
                if new_lines:
                    self.callback(filepath, new_lines)
                
                self.file_positions[filepath] = f.tell()
        except Exception as e:
            print(f"Error reading ${filepath}: ${e}")


class LogWatcher:
    __init__(self):
        self.observer = Observer()
        self.handlers = []
        self.watched_files = []
    
    watch_file(self, filepath: str, callback: Callable):
        if not os.path.exists(filepath):
            raise FileNotFoundError(f"File not found: ${filepath}")
        
        directory = os.path.dirname(filepath)
        filename = os.path.basename(filepath)
        
        handler = LogFileHandler(callback)
        self.handlers.append(handler)
        
        self.observer.schedule(handler, directory, recursive=False)
        self.watched_files.append(filepath)
        
        return filepath
    
    watch_directory(self, directory: str, pattern: str = "*.log", callback: Callable = None):
        import glob
        
        if not os.path.isdir(directory):
            raise NotADirectoryError(f"Not a directory: ${directory}")
        
        log_files = glob.glob(os.path.join(directory, pattern))
        
        for filepath in log_files:
            self.watch_file(filepath, callback or self._default_callback)
    
    _default_callback(self, filepath: str, lines: List[str]):
        for line in lines:
            print(f"${filepath}: ${line.strip()}")
    
    start(self):
        self.observer.start()
        print("Log watcher started...")
    
    stop(self):
        self.observer.stop()
        self.observer.join()
        print("Log watcher stopped")
    
    run_forever(self):
        self.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            self.stop()

Log Parser

Let's create a flexible log parser.

Python
# logwatch/parser.py
import re
import json
from datetime import datetime
from typing import Dict, Optional, List
from enum import Enum

class LogLevel(Enum):
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"
    UNKNOWN = "UNKNOWN"


class ParsedLog:
    __init__(self, 
                 raw: str,
                 timestamp: Optional[datetime] = None,
                 level: LogLevel = LogLevel.UNKNOWN,
                 message: str = '',
                 source: str = '',
                 metadata: Dict = None):
        self.raw = raw
        self.timestamp = timestamp or datetime.now()
        self.level = level
        self.message = message
        self.source = source
        self.metadata = metadata or {}
    
    to_dict(self) -> Dict:
        return {
            'timestamp': self.timestamp.isoformat(),
            'level': self.level.value,
            'message': self.message,
            'source': self.source,
            'metadata': self.metadata,
            'raw': self.raw
        }


class LogParser:
    __init__(self):
        self.patterns = {
            'apache': re.compile(
                r'(\S+)\s+\S+\s+\S+\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+)'
            ),
            'syslog': re.compile(
                r'([A-Z][a-z]{2}\s+\d+\s+\d+:\d+:\d+)\s+(\S+)\s+(\S+):\s+(.*)'
            ),
            'python': re.compile(
                r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+)\s+-\s+(\S+)\s+-\s+(\S+)\s+-\s+(.*)'
            ),
            'generic': re.compile(
                r'\[?(\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}[^\]]*)\]?\s*\[?(DEBUG|INFO|WARNING|ERROR|CRITICAL)\]?\s*(.*)',
                re.IGNORECASE
            )
        }
    
    parse(self, line: str, source: str = '') -> ParsedLog:
        line = line.strip()
        
        if not line:
            return ParsedLog(line, source=source)
        
        if line.startswith('{'):
            return self._parse_json(line, source)
        
        for name, pattern in self.patterns.items():
            match = pattern.match(line)
            if match:
                return self._parse_match(name, match, line, source)
        
        return ParsedLog(line, message=line, source=source)
    
    _parse_json(self, line: str, source: str) -> ParsedLog:
        try:
            data = json.loads(line)
            
            timestamp = None
            if 'timestamp' in data:
                try:
                    timestamp = datetime.fromisoformat(data['timestamp'])
                except:
                    pass
            
            level = LogLevel.UNKNOWN
            if 'level' in data:
                try:
                    level = LogLevel[data['level'].upper()]
                except KeyError:
                    pass
            
            message = data.get('message', '')
            
            return ParsedLog(
                line, timestamp, level, message, source,
                metadata={k: v for k, v in data.items() 
                         if k not in ['timestamp', 'message', 'level']}
            )
        except json.JSONDecodeError:
            return ParsedLog(line, message=line, source=source)
    
    _parse_match(self, name: str, match, line: str, source: str) -> ParsedLog:
        if name == 'apache':
            return ParsedLog(
                line,
                timestamp=datetime.now(),
                level=LogLevel.INFO,
                message=match.group(3),
                source=source,
                metadata={'ip': match.group(1), 'status': match.group(4)}
            )
        
        elif name == 'syslog':
            return ParsedLog(
                line,
                timestamp=datetime.now(),
                level=LogLevel.INFO,
                message=match.group(4),
                source=source,
                metadata={'host': match.group(2), 'process': match.group(3)}
            )
        
        elif name == 'python':
            try:
                timestamp = datetime.strptime(match.group(1), '%Y-%m-%d %H:%M:%S,%f')
            except:
                timestamp = datetime.now()
            
            try:
                level = LogLevel[match.group(3).upper()]
            except KeyError:
                level = LogLevel.UNKNOWN
            
            return ParsedLog(line, timestamp, level, match.group(4), source)
        
        else:
            try:
                level = LogLevel[match.group(2).upper()]
            except KeyError:
                level = LogLevel.UNKNOWN
            
            return ParsedLog(line, level=level, message=match.group(3), source=source)

Alerting System

Let's create the alerting system.

Python
# logwatch/alerter.py
import re
import time
from typing import List, Dict, Callable, Optional
from datetime import datetime, timedelta
from collections import defaultdict, deque
from .parser import ParsedLog, LogLevel

class AlertRule:
    __init__(self, 
                 name: str,
                 pattern: str = None,
                 level: LogLevel = None,
                 threshold: int = 1,
                 window: int = 60,
                 message: str = ''):
        self.name = name
        self.pattern = re.compile(pattern) if pattern else None
        self.level = level
        self.threshold = threshold
        self.window = window
        self.message = message
    
    matches(self, log: ParsedLog) -> bool:
        if self.pattern and not self.pattern.search(log.message):
            return False
        
        if self.level and log.level != self.level:
            return False
        
        return True


class Alert:
    __init__(self, rule: AlertRule, logs: List[ParsedLog]):
        self.rule = rule
        self.logs = logs
        self.timestamp = datetime.now()
        self.message = rule.message or f"Alert triggered: ${rule.name}"
    
    to_dict(self) -> Dict:
        return {
            'rule': self.rule.name,
            'message': self.message,
            'timestamp': self.timestamp.isoformat(),
            'count': len(self.logs),
            'logs': [log.message for log in self.logs[:5]]
        }


class Alerter:
    __init__(self):
        self.rules: List[AlertRule] = []
        self.log_buffer: deque = deque(maxlen=10000)
        self.alert_handlers: List[Callable] = []
        self.recent_alerts: deque = deque(maxlen=100)
        self.cooldowns: Dict[str, datetime] = {}
        self.default_cooldown = 300
    
    add_rule(self, rule: AlertRule):
        self.rules.append(rule)
        print(f"Added alert rule: ${rule.name}")
    
    add_handler(self, handler: Callable):
        self.alert_handlers.append(handler)
    
    process_log(self, log: ParsedLog):
        self.log_buffer.append(log)
        
        for rule in self.rules:
            if rule.matches(log):
                self._check_threshold(rule)
    
    _check_threshold(self, rule: AlertRule):
        window_start = datetime.now() - timedelta(seconds=rule.window)
        
        matching_logs = [
            log for log in self.log_buffer
            if log.timestamp > window_start and rule.matches(log)
        ]
        
        if len(matching_logs) >= rule.threshold:
            if not self._in_cooldown(rule.name):
                alert = Alert(rule, matching_logs)
                self._trigger_alert(alert)
                self._set_cooldown(rule.name)
    
    _in_cooldown(self, rule_name: str) -> bool:
        if rule_name in self.cooldowns:
            return datetime.now() < self.cooldowns[rule_name]
        return False
    
    _set_cooldown(self, rule_name: str):
        self.cooldowns[rule_name] = datetime.now() + timedelta(seconds=self.default_cooldown)
    
    _trigger_alert(self, alert: Alert):
        self.recent_alerts.append(alert)
        
        for handler in self.alert_handlers:
            try:
                handler(alert)
            except Exception as e:
                print(f"Alert handler error: ${e}")
    
    get_recent_alerts(self, limit: int = 10) -> List[Dict]:
        return [alert.to_dict() for alert in list(self.recent_alerts)[-limit:]]
    
    get_stats(self) -> Dict:
        level_counts = defaultdict(int)
        
        for log in self.log_buffer:
            level_counts[log.level.value] += 1
        
        return {
            'total_logs': len(self.log_buffer),
            'level_counts': dict(level_counts),
            'total_alerts': len(self.recent_alerts),
            'active_rules': len(self.rules)
        }

Dashboard

Let's create a simple dashboard.

Python
# logwatch/dashboard.py
from flask import Flask, render_template, jsonify, request
from threading import Lock
import time
from typing import List, Dict

class Dashboard:
    __init__(self, alerter, parser, port: int = 5000):
        self.app = Flask(__name__)
        self.alerter = alerter
        self.parser = parser
        self.port = port
        
        self.recent_logs: List[Dict] = []
        self.lock = Lock()
        
        self._setup_routes()
    
    _setup_routes(self):
        @self.app.route('/')
        def index():
            return render_template('index.html')
        
        @self.app.route('/api/logs')
        def get_logs():
            limit = request.args.get('limit', 100, type=int)
            with self.lock:
                logs = list(self.recent_logs)[-limit:]
            return jsonify(logs)
        
        @self.app.route('/api/stats')
        def get_stats():
            stats = self.alerter.get_stats()
            return jsonify(stats)
        
        @self.app.route('/api/alerts')
        def get_alerts():
            alerts = self.alerter.get_recent_alerts()
            return jsonify(alerts)
    
    add_log(self, filepath: str, lines: List[str]):
        with self.lock:
            for line in lines:
                parsed = self.parser.parse(line, filepath)
                log_dict = parsed.to_dict()
                self.recent_logs.append(log_dict)
                
                if len(self.recent_logs) > 1000:
                    self.recent_logs = self.recent_logs[-500:]
                
                self.alerter.process_log(parsed)
    
    run(self):
        self.app.run(port=self.port, debug=False, threaded=True)
HTML
<!-- templates/index.html -->
<!DOCTYPE html>
<html>
<head>
    <title>LogWatch Dashboard</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .stats { display: flex; gap: 20px; margin-bottom: 20px; }
        .stat-card { 
            background: #f0f0f0; padding: 20px; border-radius: 8px;
            min-width: 150px;
        }
        .log-list { 
            border: 1px solid #ddd; height: 400px; overflow-y: auto;
            padding: 10px;
        }
        .log-entry { 
            padding: 5px; border-bottom: 1px solid #eee;
            font-family: monospace;
        }
        .log-entry.ERROR { color: red; }
        .log-entry.WARNING { color: orange; }
        .log-entry.INFO { color: blue; }
        .alerts { 
            background: #ffe6e6; padding: 10px; 
            margin-top: 20px; border-radius: 8px;
        }
    </style>
</head>
<body>
    <h1>LogWatch Dashboard</h1>
    
    <div class="stats">
        <div class="stat-card">
            <h3>Total Logs</h3>
            <p id="totalLogs">0</p>
        </div>
        <div class="stat-card">
            <h3>Alerts</h3>
            <p id="totalAlerts">0</p>
        </div>
    </div>
    
    <h2>Recent Logs</h2>
    <div class="log-list" id="logList"></div>
    
    <div class="alerts">
        <h3>Recent Alerts</h3>
        <div id="alertList"></div>
    </div>

    <script>
        async function updateData() {
            const logsRes = await fetch('/api/logs?limit=50');
            const logs = await logsRes.json();
            
            const statsRes = await fetch('/api/stats');
            const stats = await statsRes.json();
            
            const alertsRes = await fetch('/api/alerts');
            const alerts = await alertsRes.json();

            document.getElementById('totalLogs').textContent = stats.total_logs;
            document.getElementById('totalAlerts').textContent = stats.total_alerts;

            const logList = document.getElementById('logList');
            logList.innerHTML = logs.slice().reverse().map(log => 
                \`<div class="log-entry \${log.level}">
                    [\${log.level}] \${log.message}
                </div>\`
            ).join('');

            const alertList = document.getElementById('alertList');
            alertList.innerHTML = alerts.slice().reverse().map(alert => 
                \`<p><strong>\${alert.rule}</strong>: \${alert.message}</p>\`
            ).join('');
        }

        setInterval(updateData, 2000);
        updateData();
    </script>
</body>
</html>

Testing

Python
# main.py
from logwatch.watcher import LogWatcher
from logwatch.parser import LogParser
from logwatch.alerter import Alerter, AlertRule
from logwatch.dashboard import Dashboard
from logwatch.parser import LogLevel

parser = LogParser()
alerter = Alerter()
dashboard = Dashboard(alerter, parser)

alerter.add_rule(AlertRule(
    name="High Error Rate",
    level=LogLevel.ERROR,
    threshold=3,
    window=60,
    message="Multiple errors detected in short time"
))

alerter.add_rule(AlertRule(
    name="Connection Failed",
    pattern=r"connection failed|connection refused",
    threshold=1,
    message="Connection failure detected"
))

def handle_alert(alert):
    print(f"\n⚠️  ALERT: ${alert.message}\n")

alerter.add_handler(handle_alert)

watcher = LogWatcher()
watcher.watch_file("./logs/app.log", dashboard.add_log)

watcher.start()

import threading
dashboard_thread = threading.Thread(target=dashboard.run)
dashboard_thread.daemon = True
dashboard_thread.start()

print("Dashboard available at http://localhost:5000")
print("Press Ctrl+C to stop")

try:
    while True:
        import time
        time.sleep(1)
except KeyboardInterrupt:
    watcher.stop()
Testing Checklist
  • Log watcher detects new lines
  • Parser extracts structured data
  • Alerts trigger on patterns
  • Dashboard displays logs

Summary

Congratulations! You've built a complete log monitoring system.

What You Built

  • Log Watcher - Real-time file monitoring
  • Log Parser - Multiple format support
  • Alerter - Pattern-based alerting
  • Dashboard - Web interface for monitoring

Next Steps

  • Add email notifications
  • Implement log aggregation
  • Add more parsers
  • Implement log retention

Continue Learning

Try these tutorials:

  • Build a Web Crawler
  • Build a Caching System
  • Build a Background Job Queue