← Back to Tutorials
Python

Build a Metrics Collection System

Difficulty: Intermediate Est. Time: ~4 hours

Introduction

Metrics collection systems gather, process, and store numerical data about system behavior, application performance, and business metrics. They are the foundation of observability in modern software systems.

In this tutorial, we'll build a metrics collection system that supports different metric types, aggregation, and export to various backends.

What You'll Build
  • Counter and gauge metrics
  • Histogram support
  • Time-series aggregation
  • Multiple export formats

Core Concepts

Metric Types

  • Counter - Monotonically increasing value
  • Gauge - Current value that can go up or down
  • Histogram - Distribution of values
  • Summary - Aggregated percentiles

Labels

Labels provide dimensions to metrics, allowing for flexible querying and aggregation.

Prerequisites

  • Python 3.8+
  • Basic statistics knowledge

Metric Types

Create metrics/types.py:

import time
import threading
from typing import Dict, List, Any
from dataclasses import dataclass, field
from collections import defaultdict


@dataclass
class Metric:
    name: str
    value: float
    timestamp: float = field(default_factory=time.time)
    labels: Dict[str, str] = field(default_factory=dict)


class Counter:
    def __init__(self, name: str, labels: Dict[str, str] = None):
        self.name = name
        self.labels = labels or {}
        self._value = 0
        self._lock = threading.Lock()
    
    def inc(self, value: float = 1):
        with self._lock:
            self._value += value
    
    def get(self) -> float:
        return self._value
    
    def collect(self) -> Metric:
        return Metric(self.name, self._value, labels=self.labels)


class Gauge:
    def __init__(self, name: str, labels: Dict[str, str] = None):
        self.name = name
        self.labels = labels or {}
        self._value = 0
        self._lock = threading.Lock()
    
    def set(self, value: float):
        with self._lock:
            self._value = value
    
    def inc(self, value: float = 1):
        with self._lock:
            self._value += value
    
    def dec(self, value: float = 1):
        with self._lock:
            self._value -= value
    
    def collect(self) -> Metric:
        return Metric(self.name, self._value, labels=self.labels)


class Histogram:
    BUCKETS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
    
    def __init__(self, name: str, labels: Dict[str, str] = None):
        self.name = name
        self.labels = labels or {}
        self._values = []
        self._lock = threading.Lock()
    
    def observe(self, value: float):
        with self._lock:
            self._values.append(value)
    
    def collect(self) -> List[Metric]:
        with self._lock:
            values = self._values[:]
        
        if not values:
            return []
        
        metrics = []
        
        metrics.append(Metric(f'{self.name}_count', len(values), labels=self.labels))
        metrics.append(Metric(f'{self.name}_sum', sum(values), labels=self.labels))
        
        sorted_values = sorted(values)
        for quantile in [0.5, 0.9, 0.95, 0.99]:
            idx = int(len(sorted_values) * quantile)
            metrics.append(Metric(f'{self.name}', sorted_values[idx], labels={**self.labels, 'quantile': str(quantile)}))
        
        for bucket in self.BUCKETS:
            count = sum(1 for v in values if v <= bucket)
            metrics.append(Metric(f'{self.name}_bucket', count, labels={**self.labels, 'le': str(bucket)}))
        
        return metrics

Collector

Create metrics/collector.py:

import threading
from typing import Dict, Callable
from .types import Counter, Gauge, Histogram


class Registry:
    def __init__(self):
        self._counters: Dict[str, Counter] = {}
        self._gauges: Dict[str, Gauge] = {}
        self._histograms: Dict[str, Histogram] = {}
        self._lock = threading.Lock()
    
    def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
        key = self._make_key(name, labels)
        with self._lock:
            if key not in self._counters:
                self._counters[key] = Counter(name, labels)
            return self._counters[key]
    
    def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
        key = self._make_key(name, labels)
        with self._lock:
            if key not in self._gauges:
                self._gauges[key] = Gauge(name, labels)
            return self._gauges[key]
    
    def histogram(self, name: str, labels: Dict[str, str] = None) -> Histogram:
        key = self._make_key(name, labels)
        with self._lock:
            if key not in self._histograms:
                self._histograms[key] = Histogram(name, labels)
            return self._histograms[key]
    
    def collect(self):
        metrics = []
        
        with self._lock:
            for counter in self._counters.values():
                metrics.append(counter.collect())
            
            for gauge in self._gauges.values():
                metrics.append(gauge.collect())
            
            for histogram in self._histograms.values():
                metrics.extend(histogram.collect())
        
        return metrics
    
    def _make_key(self, name: str, labels: Dict[str, str] = None) -> str:
        if not labels:
            return name
        label_str = ','.join(f'{k}={v}' for k, v in sorted(labels.items()))
        return f'{name}{{{label_str}}}'

Aggregator

Create metrics/aggregator.py:

import time
from typing import Dict, List
from collections import defaultdict
from .types import Metric


class Aggregator:
    def __init__(self, window_size: int = 60):
        self.window_size = window_size
        self._metrics: Dict[str, List[Metric]] = defaultdict(list)
    
    def record(self, metric: Metric):
        key = f"{metric.name}{{{','.join(f'{k}={v}' for k,v in sorted(metric.labels.items()))}}}"
        self._metrics[key].append(metric)
        self._cleanup(key)
    
    def _cleanup(self, key: str):
        now = time.time()
        cutoff = now - self.window_size
        self._metrics[key] = [m for m in self._metrics[key] if m.timestamp > cutoff]
    
    def get_stats(self, name: str) -> Dict:
        values = []
        for key, metrics in self._metrics.items():
            if key.startswith(name):
                values.extend([m.value for m in metrics])
        
        if not values:
            return {}
        
        return {
            'count': len(values),
            'sum': sum(values),
            'min': min(values),
            'max': max(values),
            'avg': sum(values) / len(values)
        }
    
    def get_rate(self, name: str) -> float:
        now = time.time()
        cutoff = now - self.window_size
        
        total = 0
        for key, metrics in self._metrics.items():
            if key.startswith(name):
                total += sum(m.value for m in metrics if m.timestamp > cutoff)
        
        return total / self.window_size

Export

Create export formats:

def export_prometheus(metrics: List[Metric]) -> str:
    lines = []
    
    for metric in metrics:
        labels = ','.join(f'{k}="{v}"' for k, v in metric.labels.items())
        
        if labels:
            line = f'{metric.name}{{{labels}}} {metric.value}'
        else:
            line = f'{metric.name} {metric.value}'
        
        lines.append(line)
    
    return '\n'.join(lines)


def export_json(metrics: List[Metric]) -> str:
    import json
    return json.dumps([
        {
            'name': m.name,
            'value': m.value,
            'timestamp': m.timestamp,
            'labels': m.labels
        }
        for m in metrics
    ], indent=2)


class MetricsServer:
    def __init__(self, registry, port: int = 8080):
        self.registry = registry
        self.port = port
    
    def start(self):
        import http.server
        import socketserver
        
        class Handler(http.server.BaseHTTPRequestHandler):
            def do_GET(self):
                if self.path == '/metrics':
                    metrics = self.server.registry.collect()
                    body = export_prometheus(metrics)
                    
                    self.send_response(200)
                    self.send_header('Content-Type', 'text/plain')
                    self.end_headers()
                    self.wfile.write(body.encode())
                else:
                    self.send_response(404)
                    self.end_headers()
        
        with socketserver.TCPServer(('0.0.0.0', self.port), Handler) as httpd:
            print(f"Metrics server running on port {self.port}")
            httpd.serve_forever()

Testing

from metrics import Registry, export_prometheus

registry = Registry()

counter = registry.counter('requests_total', {'method': 'GET'})
counter.inc()

gauge = registry.gauge('temperature')
gauge.set(72.5)

histogram = registry.histogram('request_duration')
histogram.observe(0.125)
histogram.observe(0.250)

metrics = registry.collect()
print(export_prometheus(metrics))

Summary

You built a complete metrics collection system with counters, gauges, histograms, aggregation, and export capabilities.

Possible Extensions

  • Add pushgateway support
  • Implement alerting
  • Add metric derivation