Build a Log Monitoring System
Introduction
Log monitoring is critical for maintaining healthy applications. It helps detect errors, track performance, and identify security threats in real-time.
In this tutorial, we'll build "LogWatch" - a real-time log monitoring system that watches log files, parses entries, triggers alerts, and provides a dashboard.
What You'll Build
- Real-time log file watcher
- Multi-format log parser
- Pattern-based alerting
- Statistics dashboard
- Notification system
What You'll Learn
- File system monitoring
- Log parsing techniques
- Real-time alerting
- Pattern matching
- Data visualization
Core Concepts
Let's understand log monitoring fundamentals.
Log Monitoring Architecture
- Watcher - Monitors files for changes
- Parser - Extracts structured data from logs
- Analyzer - Detects patterns and anomalies
- Alerter - Sends notifications
- Dashboard - Displays metrics and alerts
Common Log Formats
- Syslog - Standard Unix logging
- Apache/Nginx - Web server logs
- JSON - Structured logging
- Custom - Application-specific formats
Project Setup
Bash
# Create project directory
mkdir logwatch
cd logwatch
# Create virtual environment
python -m venv venv
source venv/bin/activate
# Install dependencies
pip install watchdog colorama flask
Project Structure
File Structure
logwatch/
├── logwatch/
│ ├── __init__.py
│ ├── watcher.py
│ ├── parser.py
│ ├── analyzer.py
│ ├── alerter.py
│ └── dashboard.py
├── templates/
│ └── index.html
├── logs/
└── main.py
Log Watcher
Let's create the file watcher component.
Python
# logwatch/watcher.py
import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileModifiedEvent
from typing import Callable, List
class LogFileHandler(FileSystemEventHandler):
__init__(self, callback: Callable):
super().__init__()
self.callback = callback
self.file_positions = {}
on_modified(self, event):
if event.is_directory:
return
if isinstance(event, FileModifiedEvent):
self._read_new_lines(event.src_path)
_read_new_lines(self, filepath: str):
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
position = self.file_positions.get(filepath, 0)
f.seek(position)
new_lines = f.readlines()
if new_lines:
self.callback(filepath, new_lines)
self.file_positions[filepath] = f.tell()
except Exception as e:
print(f"Error reading ${filepath}: ${e}")
class LogWatcher:
__init__(self):
self.observer = Observer()
self.handlers = []
self.watched_files = []
watch_file(self, filepath: str, callback: Callable):
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: ${filepath}")
directory = os.path.dirname(filepath)
filename = os.path.basename(filepath)
handler = LogFileHandler(callback)
self.handlers.append(handler)
self.observer.schedule(handler, directory, recursive=False)
self.watched_files.append(filepath)
return filepath
watch_directory(self, directory: str, pattern: str = "*.log", callback: Callable = None):
import glob
if not os.path.isdir(directory):
raise NotADirectoryError(f"Not a directory: ${directory}")
log_files = glob.glob(os.path.join(directory, pattern))
for filepath in log_files:
self.watch_file(filepath, callback or self._default_callback)
_default_callback(self, filepath: str, lines: List[str]):
for line in lines:
print(f"${filepath}: ${line.strip()}")
start(self):
self.observer.start()
print("Log watcher started...")
stop(self):
self.observer.stop()
self.observer.join()
print("Log watcher stopped")
run_forever(self):
self.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
self.stop()
Log Parser
Let's create a flexible log parser.
Python
# logwatch/parser.py
import re
import json
from datetime import datetime
from typing import Dict, Optional, List
from enum import Enum
class LogLevel(Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
UNKNOWN = "UNKNOWN"
class ParsedLog:
__init__(self,
raw: str,
timestamp: Optional[datetime] = None,
level: LogLevel = LogLevel.UNKNOWN,
message: str = '',
source: str = '',
metadata: Dict = None):
self.raw = raw
self.timestamp = timestamp or datetime.now()
self.level = level
self.message = message
self.source = source
self.metadata = metadata or {}
to_dict(self) -> Dict:
return {
'timestamp': self.timestamp.isoformat(),
'level': self.level.value,
'message': self.message,
'source': self.source,
'metadata': self.metadata,
'raw': self.raw
}
class LogParser:
__init__(self):
self.patterns = {
'apache': re.compile(
r'(\S+)\s+\S+\s+\S+\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+)'
),
'syslog': re.compile(
r'([A-Z][a-z]{2}\s+\d+\s+\d+:\d+:\d+)\s+(\S+)\s+(\S+):\s+(.*)'
),
'python': re.compile(
r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+)\s+-\s+(\S+)\s+-\s+(\S+)\s+-\s+(.*)'
),
'generic': re.compile(
r'\[?(\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}[^\]]*)\]?\s*\[?(DEBUG|INFO|WARNING|ERROR|CRITICAL)\]?\s*(.*)',
re.IGNORECASE
)
}
parse(self, line: str, source: str = '') -> ParsedLog:
line = line.strip()
if not line:
return ParsedLog(line, source=source)
if line.startswith('{'):
return self._parse_json(line, source)
for name, pattern in self.patterns.items():
match = pattern.match(line)
if match:
return self._parse_match(name, match, line, source)
return ParsedLog(line, message=line, source=source)
_parse_json(self, line: str, source: str) -> ParsedLog:
try:
data = json.loads(line)
timestamp = None
if 'timestamp' in data:
try:
timestamp = datetime.fromisoformat(data['timestamp'])
except:
pass
level = LogLevel.UNKNOWN
if 'level' in data:
try:
level = LogLevel[data['level'].upper()]
except KeyError:
pass
message = data.get('message', '')
return ParsedLog(
line, timestamp, level, message, source,
metadata={k: v for k, v in data.items()
if k not in ['timestamp', 'message', 'level']}
)
except json.JSONDecodeError:
return ParsedLog(line, message=line, source=source)
_parse_match(self, name: str, match, line: str, source: str) -> ParsedLog:
if name == 'apache':
return ParsedLog(
line,
timestamp=datetime.now(),
level=LogLevel.INFO,
message=match.group(3),
source=source,
metadata={'ip': match.group(1), 'status': match.group(4)}
)
elif name == 'syslog':
return ParsedLog(
line,
timestamp=datetime.now(),
level=LogLevel.INFO,
message=match.group(4),
source=source,
metadata={'host': match.group(2), 'process': match.group(3)}
)
elif name == 'python':
try:
timestamp = datetime.strptime(match.group(1), '%Y-%m-%d %H:%M:%S,%f')
except:
timestamp = datetime.now()
try:
level = LogLevel[match.group(3).upper()]
except KeyError:
level = LogLevel.UNKNOWN
return ParsedLog(line, timestamp, level, match.group(4), source)
else:
try:
level = LogLevel[match.group(2).upper()]
except KeyError:
level = LogLevel.UNKNOWN
return ParsedLog(line, level=level, message=match.group(3), source=source)
Alerting System
Let's create the alerting system.
Python
# logwatch/alerter.py
import re
import time
from typing import List, Dict, Callable, Optional
from datetime import datetime, timedelta
from collections import defaultdict, deque
from .parser import ParsedLog, LogLevel
class AlertRule:
__init__(self,
name: str,
pattern: str = None,
level: LogLevel = None,
threshold: int = 1,
window: int = 60,
message: str = ''):
self.name = name
self.pattern = re.compile(pattern) if pattern else None
self.level = level
self.threshold = threshold
self.window = window
self.message = message
matches(self, log: ParsedLog) -> bool:
if self.pattern and not self.pattern.search(log.message):
return False
if self.level and log.level != self.level:
return False
return True
class Alert:
__init__(self, rule: AlertRule, logs: List[ParsedLog]):
self.rule = rule
self.logs = logs
self.timestamp = datetime.now()
self.message = rule.message or f"Alert triggered: ${rule.name}"
to_dict(self) -> Dict:
return {
'rule': self.rule.name,
'message': self.message,
'timestamp': self.timestamp.isoformat(),
'count': len(self.logs),
'logs': [log.message for log in self.logs[:5]]
}
class Alerter:
__init__(self):
self.rules: List[AlertRule] = []
self.log_buffer: deque = deque(maxlen=10000)
self.alert_handlers: List[Callable] = []
self.recent_alerts: deque = deque(maxlen=100)
self.cooldowns: Dict[str, datetime] = {}
self.default_cooldown = 300
add_rule(self, rule: AlertRule):
self.rules.append(rule)
print(f"Added alert rule: ${rule.name}")
add_handler(self, handler: Callable):
self.alert_handlers.append(handler)
process_log(self, log: ParsedLog):
self.log_buffer.append(log)
for rule in self.rules:
if rule.matches(log):
self._check_threshold(rule)
_check_threshold(self, rule: AlertRule):
window_start = datetime.now() - timedelta(seconds=rule.window)
matching_logs = [
log for log in self.log_buffer
if log.timestamp > window_start and rule.matches(log)
]
if len(matching_logs) >= rule.threshold:
if not self._in_cooldown(rule.name):
alert = Alert(rule, matching_logs)
self._trigger_alert(alert)
self._set_cooldown(rule.name)
_in_cooldown(self, rule_name: str) -> bool:
if rule_name in self.cooldowns:
return datetime.now() < self.cooldowns[rule_name]
return False
_set_cooldown(self, rule_name: str):
self.cooldowns[rule_name] = datetime.now() + timedelta(seconds=self.default_cooldown)
_trigger_alert(self, alert: Alert):
self.recent_alerts.append(alert)
for handler in self.alert_handlers:
try:
handler(alert)
except Exception as e:
print(f"Alert handler error: ${e}")
get_recent_alerts(self, limit: int = 10) -> List[Dict]:
return [alert.to_dict() for alert in list(self.recent_alerts)[-limit:]]
get_stats(self) -> Dict:
level_counts = defaultdict(int)
for log in self.log_buffer:
level_counts[log.level.value] += 1
return {
'total_logs': len(self.log_buffer),
'level_counts': dict(level_counts),
'total_alerts': len(self.recent_alerts),
'active_rules': len(self.rules)
}
Dashboard
Let's create a simple dashboard.
Python
# logwatch/dashboard.py
from flask import Flask, render_template, jsonify, request
from threading import Lock
import time
from typing import List, Dict
class Dashboard:
__init__(self, alerter, parser, port: int = 5000):
self.app = Flask(__name__)
self.alerter = alerter
self.parser = parser
self.port = port
self.recent_logs: List[Dict] = []
self.lock = Lock()
self._setup_routes()
_setup_routes(self):
@self.app.route('/')
def index():
return render_template('index.html')
@self.app.route('/api/logs')
def get_logs():
limit = request.args.get('limit', 100, type=int)
with self.lock:
logs = list(self.recent_logs)[-limit:]
return jsonify(logs)
@self.app.route('/api/stats')
def get_stats():
stats = self.alerter.get_stats()
return jsonify(stats)
@self.app.route('/api/alerts')
def get_alerts():
alerts = self.alerter.get_recent_alerts()
return jsonify(alerts)
add_log(self, filepath: str, lines: List[str]):
with self.lock:
for line in lines:
parsed = self.parser.parse(line, filepath)
log_dict = parsed.to_dict()
self.recent_logs.append(log_dict)
if len(self.recent_logs) > 1000:
self.recent_logs = self.recent_logs[-500:]
self.alerter.process_log(parsed)
run(self):
self.app.run(port=self.port, debug=False, threaded=True)
HTML
<!-- templates/index.html -->
<!DOCTYPE html>
<html>
<head>
<title>LogWatch Dashboard</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.stats { display: flex; gap: 20px; margin-bottom: 20px; }
.stat-card {
background: #f0f0f0; padding: 20px; border-radius: 8px;
min-width: 150px;
}
.log-list {
border: 1px solid #ddd; height: 400px; overflow-y: auto;
padding: 10px;
}
.log-entry {
padding: 5px; border-bottom: 1px solid #eee;
font-family: monospace;
}
.log-entry.ERROR { color: red; }
.log-entry.WARNING { color: orange; }
.log-entry.INFO { color: blue; }
.alerts {
background: #ffe6e6; padding: 10px;
margin-top: 20px; border-radius: 8px;
}
</style>
</head>
<body>
<h1>LogWatch Dashboard</h1>
<div class="stats">
<div class="stat-card">
<h3>Total Logs</h3>
<p id="totalLogs">0</p>
</div>
<div class="stat-card">
<h3>Alerts</h3>
<p id="totalAlerts">0</p>
</div>
</div>
<h2>Recent Logs</h2>
<div class="log-list" id="logList"></div>
<div class="alerts">
<h3>Recent Alerts</h3>
<div id="alertList"></div>
</div>
<script>
async function updateData() {
const logsRes = await fetch('/api/logs?limit=50');
const logs = await logsRes.json();
const statsRes = await fetch('/api/stats');
const stats = await statsRes.json();
const alertsRes = await fetch('/api/alerts');
const alerts = await alertsRes.json();
document.getElementById('totalLogs').textContent = stats.total_logs;
document.getElementById('totalAlerts').textContent = stats.total_alerts;
const logList = document.getElementById('logList');
logList.innerHTML = logs.slice().reverse().map(log =>
\`<div class="log-entry \${log.level}">
[\${log.level}] \${log.message}
</div>\`
).join('');
const alertList = document.getElementById('alertList');
alertList.innerHTML = alerts.slice().reverse().map(alert =>
\`<p><strong>\${alert.rule}</strong>: \${alert.message}</p>\`
).join('');
}
setInterval(updateData, 2000);
updateData();
</script>
</body>
</html>
Testing
Python
# main.py
from logwatch.watcher import LogWatcher
from logwatch.parser import LogParser
from logwatch.alerter import Alerter, AlertRule
from logwatch.dashboard import Dashboard
from logwatch.parser import LogLevel
parser = LogParser()
alerter = Alerter()
dashboard = Dashboard(alerter, parser)
alerter.add_rule(AlertRule(
name="High Error Rate",
level=LogLevel.ERROR,
threshold=3,
window=60,
message="Multiple errors detected in short time"
))
alerter.add_rule(AlertRule(
name="Connection Failed",
pattern=r"connection failed|connection refused",
threshold=1,
message="Connection failure detected"
))
def handle_alert(alert):
print(f"\nâš ï¸ ALERT: ${alert.message}\n")
alerter.add_handler(handle_alert)
watcher = LogWatcher()
watcher.watch_file("./logs/app.log", dashboard.add_log)
watcher.start()
import threading
dashboard_thread = threading.Thread(target=dashboard.run)
dashboard_thread.daemon = True
dashboard_thread.start()
print("Dashboard available at http://localhost:5000")
print("Press Ctrl+C to stop")
try:
while True:
import time
time.sleep(1)
except KeyboardInterrupt:
watcher.stop()
Testing Checklist
- Log watcher detects new lines
- Parser extracts structured data
- Alerts trigger on patterns
- Dashboard displays logs
Summary
Congratulations! You've built a complete log monitoring system.
What You Built
- Log Watcher - Real-time file monitoring
- Log Parser - Multiple format support
- Alerter - Pattern-based alerting
- Dashboard - Web interface for monitoring
Next Steps
- Add email notifications
- Implement log aggregation
- Add more parsers
- Implement log retention