← Back to Tutorials
Python

Build a Service Health Monitoring Tool

Difficulty: Intermediate Est. Time: ~3 hours

Introduction

Service health monitoring tools track the availability and performance of services. They detect failures early, enable quick response to issues, and provide visibility into system health.

What You'll Build
  • Health check framework
  • Multiple check types
  • Alerting system
  • Status dashboard

Core Concepts

Health Checks

Periodic tests that verify a service is functioning correctly.

Uptime

Percentage of time a service is available over a period.

Prerequisites

  • Python 3.8+
  • requests library

Health Monitor

Create healthmon/monitor.py:

import time
import threading
from typing import Dict, List, Callable, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum


class Status(Enum):
    HEALTHY = "healthy"
    UNHEALTHY = "unhealthy"
    DEGRADED = "degraded"
    UNKNOWN = "unknown"


@dataclass
class HealthResult:
    service: str
    status: Status
    message: str
    timestamp: datetime
    response_time: float
    details: dict = None


class Service:
    def __init__(self, name: str, check_fn: Callable, interval: int = 60):
        self.name = name
        self.check_fn = check_fn
        self.interval = interval
        self.last_result: Optional[HealthResult] = None
        self.history: List[HealthResult] = []
        self.consecutive_failures = 0
    
    def check(self) -> HealthResult:
        start_time = time.time()
        
        try:
            result = self.check_fn()
            response_time = time.time() - start_time
            
            if result is True:
                status = Status.HEALTHY
                message = "OK"
                self.consecutive_failures = 0
            else:
                status = Status.UNHEALTHY
                message = str(result) if result else "Check failed"
                self.consecutive_failures += 1
        except Exception as e:
            status = Status.UNHEALTHY
            message = str(e)
            response_time = time.time() - start_time
            self.consecutive_failures += 1
        
        health_result = HealthResult(
            service=self.name,
            status=status,
            message=message,
            timestamp=datetime.now(),
            response_time=response_time
        )
        
        self.last_result = health_result
        self.history.append(health_result)
        
        if len(self.history) > 1000:
            self.history = self.history[-1000:]
        
        return health_result
    
    def get_uptime(self, window: timedelta = timedelta(hours=1)) -> float:
        cutoff = datetime.now() - window
        recent = [r for r in self.history if r.timestamp > cutoff]
        
        if not recent:
            return 0.0
        
        healthy = sum(1 for r in recent if r.status == Status.HEALTHY)
        return (healthy / len(recent)) * 100


class HealthMonitor:
    def __init__(self):
        self.services: Dict[str, Service] = {}
        self.alert_handlers: List[Callable] = []
        self._running = False
        self._thread = None
    
    def register_service(self, name: str, check_fn: Callable, interval: int = 60):
        service = Service(name, check_fn, interval)
        self.services[name] = service
        return service
    
    def unregister_service(self, name: str):
        self.services.pop(name, None)
    
    def check_service(self, name: str) -> Optional[HealthResult]:
        service = self.services.get(name)
        return service.check() if service else None
    
    def check_all(self) -> List[HealthResult]:
        results = []
        
        for service in self.services.values():
            result = service.check()
            results.append(result)
            
            if result.status != Status.HEALTHY:
                self._trigger_alerts(service, result)
        
        return results
    
    def get_status(self) -> Dict:
        return {
            name: {
                'status': service.last_result.status.value if service.last_result else 'unknown',
                'message': service.last_result.message if service.last_result else '',
                'response_time': service.last_result.response_time if service.last_result else 0,
                'uptime_1h': service.get_uptime(timedelta(hours=1)),
                'uptime_24h': service.get_uptime(timedelta(hours=24))
            }
            for name, service in self.services.items()
        }
    
    def on_alert(self, handler: Callable):
        self.alert_handlers.append(handler)
    
    def _trigger_alerts(self, service: Service, result: HealthResult):
        for handler in self.alert_handlers:
            try:
                handler(service, result)
            except Exception:
                pass
    
    def start(self):
        self._running = True
        self._thread = threading.Thread(target=self._run_loop)
        self._thread.daemon = True
        self._thread.start()
    
    def stop(self):
        self._running = False
        if self._thread:
            self._thread.join()
    
    def _run_loop(self):
        while self._running:
            self.check_all()
            time.sleep(30)

Health Checks

import requests


def http_check(url: str, expected_status: int = 200, timeout: int = 5):
    response = requests.get(url, timeout=timeout)
    return response.status_code == expected_status


def tcp_check(host: str, port: int, timeout: int = 5):
    import socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex((host, port))
    sock.close()
    return result == 0


def ping_check(host: str, timeout: int = 5):
    import subprocess
    try:
        subprocess.run(['ping', '-c', '1', '-W', str(timeout), host],
                      capture_output=True, timeout=timeout + 1)
        return True
    except:
        return False


def database_check(connection_string: str):
    try:
        import psycopg2
        conn = psycopg2.connect(connection_string)
        conn.close()
        return True
    except Exception as e:
        return str(e)


def redis_check(host: str = 'localhost', port: int = 6379):
    import redis
    r = redis.Redis(host=host, port=port)
    return r.ping()


def custom_check():
    # Your custom health logic
    return True

Alerts

def console_alert(service, result):
    print(f"[ALERT] {service.name}: {result.status.value} - {result.message}")


def email_alert(service, result):
    # Send email notification
    print(f"Would send email: {service.name} is {result.status.value}")


def slack_alert(service, result):
    # Send to Slack webhook
    print(f"Would send Slack: {service.name} is {result.status.value}")


def pagerduty_alert(service, result):
    # Trigger PagerDuty incident
    print(f"Would trigger PagerDuty: {service.name} is {result.status.value}")


monitor = HealthMonitor()

monitor.register_service('api', lambda: http_check('http://localhost:8080/health'))
monitor.register_service('database', lambda: redis_check())

monitor.on_alert(console_alert)
monitor.on_alert(email_alert)
monitor.on_alert(slack_alert)

monitor.start()

import time
time.sleep(60)
monitor.stop()

Testing

from healthmon import HealthMonitor, http_check

monitor = HealthMonitor()

monitor.register_service('web', lambda: http_check('http://localhost:3000/health'))
monitor.register_service('api', lambda: http_check('http://localhost:8080/health'))

results = monitor.check_all()

for result in results:
    print(f"{result.service}: {result.status.value} ({result.response_time:.3f}s)")

status = monitor.get_status()
print(f"\nStatus: {status}")

Summary

You built a service health monitoring tool with health checks, alerting, uptime tracking, and status reporting.