← Back to Tutorials
Python

Build a Service Discovery System

Difficulty: Advanced Est. Time: ~4 hours

Introduction

Service discovery allows services to find each other without hardcoded addresses. In microservices architectures, services need to communicate and dynamically discover their dependencies.

What You'll Build
  • Service registry
  • Health checking
  • Load balancing
  • Client-side discovery

Core Concepts

Registry

The registry maintains a list of available services and their network locations.

Health Checks

Services periodically report their health status to enable load balancing and failure detection.

Service Registry

Create discovery/registry.py:

import time
import threading
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime


@dataclass
class ServiceInstance:
    id: str
    name: str
    host: str
    port: int
    metadata: Dict = field(default_factory=dict)
    health_check_url: str = None
    last_heartbeat: float = field(default_factory=time.time)
    status: str = 'healthy'


class ServiceRegistry:
    def __init__(self):
        self._services: Dict[str, Dict[str, ServiceInstance]] = {}
        self._lock = threading.Lock()
    
    def register(self, instance: ServiceInstance):
        with self._lock:
            if instance.name not in self._services:
                self._services[instance.name] = {}
            self._services[instance.name][instance.id] = instance
    
    def deregister(self, service_name: str, instance_id: str):
        with self._lock:
            if service_name in self._services:
                self._services[service_name].pop(instance_id, None)
    
    def heartbeat(self, service_name: str, instance_id: str):
        with self._lock:
            if service_name in self._services:
                instance = self._services[service_name].get(instance_id)
                if instance:
                    instance.last_heartbeat = time.time()
                    instance.status = 'healthy'
    
    def get_instances(self, service_name: str) -> List[ServiceInstance]:
        with self._lock:
            if service_name not in self._services:
                return []
            return list(self._services[service_name].values())
    
    def get_healthy_instances(self, service_name: str) -> List[ServiceInstance]:
        instances = self.get_instances(service_name)
        cutoff = time.time() - 30
        
        return [i for i in instances 
                if i.status == 'healthy' and i.last_heartbeat > cutoff]
    
    def get_all_services(self) -> List[str]:
        with self._lock:
            return list(self._services.keys())

Discovery

import random


class ServiceDiscovery:
    def __init__(self, registry):
        self.registry = registry
    
    def discover(self, service_name: str) -> Optional[ServiceInstance]:
        instances = self.registry.get_healthy_instances(service_name)
        
        if not instances:
            return None
        
        return random.choice(instances)
    
    def discover_all(self, service_name: str) -> List[ServiceInstance]:
        return self.registry.get_healthy_instances(service_name)
    
    def discover_round_robin(self, service_name: str) -> Optional[ServiceInstance]:
        instances = self.registry.get_healthy_instances(service_name)
        
        if not instances:
            return None
        
        return instances[int(time.time()) % len(instances)]
    
    def discover_least_connections(self, service_name: str) -> Optional[ServiceInstance]:
        instances = self.registry.get_healthy_instances(service_name)
        
        if not instances:
            return None
        
        return min(instances, key=lambda i: i.metadata.get('connections', 0))

Health Checks

import threading
import time
import requests


class HealthChecker:
    def __init__(self, registry, interval: int = 10):
        self.registry = registry
        self.interval = interval
        self._running = False
        self._thread = None
    
    def start(self):
        self._running = True
        self._thread = threading.Thread(target=self._check_loop)
        self._thread.daemon = True
        self._thread.start()
    
    def stop(self):
        self._running = False
        if self._thread:
            self._thread.join()
    
    def _check_loop(self):
        while self._running:
            self._check_all_services()
            time.sleep(self.interval)
    
    def _check_all_services(self):
        for service_name in self.registry.get_all_services():
            for instance in self.registry.get_instances(service_name):
                self._check_instance(instance)
    
    def _check_instance(self, instance: ServiceInstance):
        if not instance.health_check_url:
            return
        
        try:
            response = requests.get(instance.health_check_url, timeout=5)
            if response.status_code == 200:
                instance.status = 'healthy'
            else:
                instance.status = 'unhealthy'
        except:
            instance.status = 'unhealthy'

Client

import requests
from typing import Optional


class ServiceClient:
    def __init__(self, discovery: ServiceDiscovery):
        self.discovery = discovery
    
    def call(self, service_name: str, path: str, method: str = 'GET', **kwargs):
        instance = self.discovery.discover(service_name)
        
        if not instance:
            raise Exception(f"No healthy instance found for {service_name}")
        
        url = f"http://{instance.host}:{instance.port}{path}"
        
        return requests.request(method, url, **kwargs)
    
    def get(self, service_name: str, path: str, **kwargs):
        return self.call(service_name, 'GET', path, **kwargs)
    
    def post(self, service_name: str, path: str, **kwargs):
        return self.call(service_name, 'POST', path, **kwargs)

Testing

from discovery import ServiceRegistry, ServiceDiscovery, ServiceInstance

registry = ServiceRegistry()

instance = ServiceInstance(
    id='instance-1',
    name='userservice',
    host='localhost',
    port=8080,
    health_check_url='http://localhost:8080/health'
)
registry.register(instance)

discovery = ServiceDiscovery(registry)
service = discovery.discover('userservice')

print(f"Found: {service.host}:{service.port}")

Summary

You built a service discovery system with registration, health checking, and multiple discovery strategies.