Build a Metrics Collection System
Introduction
Metrics collection systems gather, process, and store numerical data about system behavior, application performance, and business metrics. They are the foundation of observability in modern software systems.
In this tutorial, we'll build a metrics collection system that supports different metric types, aggregation, and export to various backends.
What You'll Build
- Counter and gauge metrics
- Histogram support
- Time-series aggregation
- Multiple export formats
Core Concepts
Metric Types
- Counter - Monotonically increasing value
- Gauge - Current value that can go up or down
- Histogram - Distribution of values
- Summary - Aggregated percentiles
Labels
Labels provide dimensions to metrics, allowing for flexible querying and aggregation.
Prerequisites
- Python 3.8+
- Basic statistics knowledge
Metric Types
Create metrics/types.py:
import time
import threading
from typing import Dict, List, Any
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class Metric:
name: str
value: float
timestamp: float = field(default_factory=time.time)
labels: Dict[str, str] = field(default_factory=dict)
class Counter:
def __init__(self, name: str, labels: Dict[str, str] = None):
self.name = name
self.labels = labels or {}
self._value = 0
self._lock = threading.Lock()
def inc(self, value: float = 1):
with self._lock:
self._value += value
def get(self) -> float:
return self._value
def collect(self) -> Metric:
return Metric(self.name, self._value, labels=self.labels)
class Gauge:
def __init__(self, name: str, labels: Dict[str, str] = None):
self.name = name
self.labels = labels or {}
self._value = 0
self._lock = threading.Lock()
def set(self, value: float):
with self._lock:
self._value = value
def inc(self, value: float = 1):
with self._lock:
self._value += value
def dec(self, value: float = 1):
with self._lock:
self._value -= value
def collect(self) -> Metric:
return Metric(self.name, self._value, labels=self.labels)
class Histogram:
BUCKETS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
def __init__(self, name: str, labels: Dict[str, str] = None):
self.name = name
self.labels = labels or {}
self._values = []
self._lock = threading.Lock()
def observe(self, value: float):
with self._lock:
self._values.append(value)
def collect(self) -> List[Metric]:
with self._lock:
values = self._values[:]
if not values:
return []
metrics = []
metrics.append(Metric(f'{self.name}_count', len(values), labels=self.labels))
metrics.append(Metric(f'{self.name}_sum', sum(values), labels=self.labels))
sorted_values = sorted(values)
for quantile in [0.5, 0.9, 0.95, 0.99]:
idx = int(len(sorted_values) * quantile)
metrics.append(Metric(f'{self.name}', sorted_values[idx], labels={**self.labels, 'quantile': str(quantile)}))
for bucket in self.BUCKETS:
count = sum(1 for v in values if v <= bucket)
metrics.append(Metric(f'{self.name}_bucket', count, labels={**self.labels, 'le': str(bucket)}))
return metrics
Collector
Create metrics/collector.py:
import threading
from typing import Dict, Callable
from .types import Counter, Gauge, Histogram
class Registry:
def __init__(self):
self._counters: Dict[str, Counter] = {}
self._gauges: Dict[str, Gauge] = {}
self._histograms: Dict[str, Histogram] = {}
self._lock = threading.Lock()
def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
key = self._make_key(name, labels)
with self._lock:
if key not in self._counters:
self._counters[key] = Counter(name, labels)
return self._counters[key]
def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
key = self._make_key(name, labels)
with self._lock:
if key not in self._gauges:
self._gauges[key] = Gauge(name, labels)
return self._gauges[key]
def histogram(self, name: str, labels: Dict[str, str] = None) -> Histogram:
key = self._make_key(name, labels)
with self._lock:
if key not in self._histograms:
self._histograms[key] = Histogram(name, labels)
return self._histograms[key]
def collect(self):
metrics = []
with self._lock:
for counter in self._counters.values():
metrics.append(counter.collect())
for gauge in self._gauges.values():
metrics.append(gauge.collect())
for histogram in self._histograms.values():
metrics.extend(histogram.collect())
return metrics
def _make_key(self, name: str, labels: Dict[str, str] = None) -> str:
if not labels:
return name
label_str = ','.join(f'{k}={v}' for k, v in sorted(labels.items()))
return f'{name}{{{label_str}}}'
Aggregator
Create metrics/aggregator.py:
import time
from typing import Dict, List
from collections import defaultdict
from .types import Metric
class Aggregator:
def __init__(self, window_size: int = 60):
self.window_size = window_size
self._metrics: Dict[str, List[Metric]] = defaultdict(list)
def record(self, metric: Metric):
key = f"{metric.name}{{{','.join(f'{k}={v}' for k,v in sorted(metric.labels.items()))}}}"
self._metrics[key].append(metric)
self._cleanup(key)
def _cleanup(self, key: str):
now = time.time()
cutoff = now - self.window_size
self._metrics[key] = [m for m in self._metrics[key] if m.timestamp > cutoff]
def get_stats(self, name: str) -> Dict:
values = []
for key, metrics in self._metrics.items():
if key.startswith(name):
values.extend([m.value for m in metrics])
if not values:
return {}
return {
'count': len(values),
'sum': sum(values),
'min': min(values),
'max': max(values),
'avg': sum(values) / len(values)
}
def get_rate(self, name: str) -> float:
now = time.time()
cutoff = now - self.window_size
total = 0
for key, metrics in self._metrics.items():
if key.startswith(name):
total += sum(m.value for m in metrics if m.timestamp > cutoff)
return total / self.window_size
Export
Create export formats:
def export_prometheus(metrics: List[Metric]) -> str:
lines = []
for metric in metrics:
labels = ','.join(f'{k}="{v}"' for k, v in metric.labels.items())
if labels:
line = f'{metric.name}{{{labels}}} {metric.value}'
else:
line = f'{metric.name} {metric.value}'
lines.append(line)
return '\n'.join(lines)
def export_json(metrics: List[Metric]) -> str:
import json
return json.dumps([
{
'name': m.name,
'value': m.value,
'timestamp': m.timestamp,
'labels': m.labels
}
for m in metrics
], indent=2)
class MetricsServer:
def __init__(self, registry, port: int = 8080):
self.registry = registry
self.port = port
def start(self):
import http.server
import socketserver
class Handler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/metrics':
metrics = self.server.registry.collect()
body = export_prometheus(metrics)
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(body.encode())
else:
self.send_response(404)
self.end_headers()
with socketserver.TCPServer(('0.0.0.0', self.port), Handler) as httpd:
print(f"Metrics server running on port {self.port}")
httpd.serve_forever()
Testing
from metrics import Registry, export_prometheus
registry = Registry()
counter = registry.counter('requests_total', {'method': 'GET'})
counter.inc()
gauge = registry.gauge('temperature')
gauge.set(72.5)
histogram = registry.histogram('request_duration')
histogram.observe(0.125)
histogram.observe(0.250)
metrics = registry.collect()
print(export_prometheus(metrics))
Summary
You built a complete metrics collection system with counters, gauges, histograms, aggregation, and export capabilities.
Possible Extensions
- Add pushgateway support
- Implement alerting
- Add metric derivation