Build a Service Discovery System
Introduction
Service discovery allows services to find each other without hardcoded addresses. In microservices architectures, services need to communicate and dynamically discover their dependencies.
What You'll Build
- Service registry
- Health checking
- Load balancing
- Client-side discovery
Core Concepts
Registry
The registry maintains a list of available services and their network locations.
Health Checks
Services periodically report their health status to enable load balancing and failure detection.
Service Registry
Create discovery/registry.py:
import time
import threading
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
@dataclass
class ServiceInstance:
id: str
name: str
host: str
port: int
metadata: Dict = field(default_factory=dict)
health_check_url: str = None
last_heartbeat: float = field(default_factory=time.time)
status: str = 'healthy'
class ServiceRegistry:
def __init__(self):
self._services: Dict[str, Dict[str, ServiceInstance]] = {}
self._lock = threading.Lock()
def register(self, instance: ServiceInstance):
with self._lock:
if instance.name not in self._services:
self._services[instance.name] = {}
self._services[instance.name][instance.id] = instance
def deregister(self, service_name: str, instance_id: str):
with self._lock:
if service_name in self._services:
self._services[service_name].pop(instance_id, None)
def heartbeat(self, service_name: str, instance_id: str):
with self._lock:
if service_name in self._services:
instance = self._services[service_name].get(instance_id)
if instance:
instance.last_heartbeat = time.time()
instance.status = 'healthy'
def get_instances(self, service_name: str) -> List[ServiceInstance]:
with self._lock:
if service_name not in self._services:
return []
return list(self._services[service_name].values())
def get_healthy_instances(self, service_name: str) -> List[ServiceInstance]:
instances = self.get_instances(service_name)
cutoff = time.time() - 30
return [i for i in instances
if i.status == 'healthy' and i.last_heartbeat > cutoff]
def get_all_services(self) -> List[str]:
with self._lock:
return list(self._services.keys())
Discovery
import random
class ServiceDiscovery:
def __init__(self, registry):
self.registry = registry
def discover(self, service_name: str) -> Optional[ServiceInstance]:
instances = self.registry.get_healthy_instances(service_name)
if not instances:
return None
return random.choice(instances)
def discover_all(self, service_name: str) -> List[ServiceInstance]:
return self.registry.get_healthy_instances(service_name)
def discover_round_robin(self, service_name: str) -> Optional[ServiceInstance]:
instances = self.registry.get_healthy_instances(service_name)
if not instances:
return None
return instances[int(time.time()) % len(instances)]
def discover_least_connections(self, service_name: str) -> Optional[ServiceInstance]:
instances = self.registry.get_healthy_instances(service_name)
if not instances:
return None
return min(instances, key=lambda i: i.metadata.get('connections', 0))
Health Checks
import threading
import time
import requests
class HealthChecker:
def __init__(self, registry, interval: int = 10):
self.registry = registry
self.interval = interval
self._running = False
self._thread = None
def start(self):
self._running = True
self._thread = threading.Thread(target=self._check_loop)
self._thread.daemon = True
self._thread.start()
def stop(self):
self._running = False
if self._thread:
self._thread.join()
def _check_loop(self):
while self._running:
self._check_all_services()
time.sleep(self.interval)
def _check_all_services(self):
for service_name in self.registry.get_all_services():
for instance in self.registry.get_instances(service_name):
self._check_instance(instance)
def _check_instance(self, instance: ServiceInstance):
if not instance.health_check_url:
return
try:
response = requests.get(instance.health_check_url, timeout=5)
if response.status_code == 200:
instance.status = 'healthy'
else:
instance.status = 'unhealthy'
except:
instance.status = 'unhealthy'
Client
import requests
from typing import Optional
class ServiceClient:
def __init__(self, discovery: ServiceDiscovery):
self.discovery = discovery
def call(self, service_name: str, path: str, method: str = 'GET', **kwargs):
instance = self.discovery.discover(service_name)
if not instance:
raise Exception(f"No healthy instance found for {service_name}")
url = f"http://{instance.host}:{instance.port}{path}"
return requests.request(method, url, **kwargs)
def get(self, service_name: str, path: str, **kwargs):
return self.call(service_name, 'GET', path, **kwargs)
def post(self, service_name: str, path: str, **kwargs):
return self.call(service_name, 'POST', path, **kwargs)
Testing
from discovery import ServiceRegistry, ServiceDiscovery, ServiceInstance
registry = ServiceRegistry()
instance = ServiceInstance(
id='instance-1',
name='userservice',
host='localhost',
port=8080,
health_check_url='http://localhost:8080/health'
)
registry.register(instance)
discovery = ServiceDiscovery(registry)
service = discovery.discover('userservice')
print(f"Found: {service.host}:{service.port}")
Summary
You built a service discovery system with registration, health checking, and multiple discovery strategies.