Build a Data Pipeline
Introduction
Data pipelines process, transform, and move data from source to destination. They're the backbone of data engineering, ETL processes, and real-time data processing systems.
What You'll Build
- Pipeline framework
- Data transformations
- Parallel execution
- Error handling
Core Concepts
ETL
Extract, Transform, Load - the fundamental pattern of data pipelines.
Data Flow
Data flows through stages where each stage transforms or filters the data.
Prerequisites
- Python 3.8+
- Basic Python knowledge
Pipeline Framework
Create pipeline/core.py:
from typing import List, Callable, Any, Dict, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import logging
logger = logging.getLogger(__name__)
@dataclass
class Stage:
name: str
transform: Callable
parallel: bool = False
max_workers: int = 4
error_handler: Optional[Callable] = None
class Pipeline:
def __init__(self, name: str = "pipeline"):
self.name = name
self.stages: List[Stage] = []
self.source: Optional[Callable] = None
self.sink: Optional[Callable] = None
def source_from(self, source_fn: Callable):
self.source = source_fn
return self
def stage(self, name: str, transform: Callable, parallel: bool = False,
max_workers: int = 4, error_handler: Callable = None):
stage = Stage(name, transform, parallel, max_workers, error_handler)
self.stages.append(stage)
return self
def sink_to(self, sink_fn: Callable):
self.sink = sink_fn
return self
def run(self, input_data: Any = None) -> Any:
logger.info(f"Starting pipeline: {self.name}")
if self.source:
data = self.source()
else:
data = input_data
for stage in self.stages:
logger.info(f"Executing stage: {stage.name}")
try:
if stage.parallel and isinstance(data, list):
data = self._parallel_transform(stage, data)
else:
data = self._transform(stage, data)
except Exception as e:
logger.error(f"Stage {stage.name} failed: {e}")
if stage.error_handler:
data = stage.error_handler(e, data)
else:
raise
if self.sink:
self.sink(data)
logger.info(f"Pipeline {self.name} completed")
return data
def _transform(self, stage: Stage, data: Any) -> Any:
if isinstance(data, list):
return [stage.transform(item) for item in data]
elif isinstance(data, dict):
return stage.transform(data)
else:
return stage.transform(data)
def _parallel_transform(self, stage: Stage, data: List) -> List:
with ThreadPoolExecutor(max_workers=stage.max_workers) as executor:
results = list(executor.map(stage.transform, data))
return results
class PipelineBuilder:
def __init__(self):
self._stages = []
self._source = None
self._sink = None
def read(self, source_fn: Callable):
self._source = source_fn
return self
def transform(self, name: str, transform_fn: Callable, **kwargs):
self._stages.append(Stage(name, transform_fn, **kwargs))
return self
def write(self, sink_fn: Callable):
self._sink = sink_fn
return self
def build(self, name: str = "pipeline") -> Pipeline:
pipeline = Pipeline(name)
pipeline.source = self._source
pipeline.stages = self._stages
pipeline.sink = self._sink
return pipeline
Transformations
def filter_fn(data):
return data['value'] > 10
def map_fn(data):
return {
'id': data['id'],
'name': data['name'].upper(),
'value': data['value'] * 2
}
def enrich_fn(data):
return {
**data,
'timestamp': datetime.now().isoformat(),
'processed': True
}
def aggregate_fn(data_list):
return {
'count': len(data_list),
'total': sum(d['value'] for d in data_list),
'average': sum(d['value'] for d in data_list) / len(data_list)
}
def validate_fn(data):
required = ['id', 'name', 'value']
for field in required:
if field not in data:
raise ValueError(f"Missing required field: {field}")
return data
def clean_fn(data):
return {
k: v.strip() if isinstance(v, str) else v
for k, v in data.items()
}
Execution
from pipeline import Pipeline, PipelineBuilder
# Method 1: Fluent API
result = (PipelineBuilder()
.read(lambda: [
{'id': 1, 'name': 'apple', 'value': 10},
{'id': 2, 'name': 'banana', 'value': 20},
{'id': 3, 'name': 'cherry', 'value': 30}
])
.transform('filter', filter_fn)
.transform('map', map_fn)
.transform('enrich', enrich_fn)
.build('fruit-pipeline')
.run())
# Method 2: Direct pipeline
pipeline = Pipeline('user-pipeline')
pipeline.source_from(get_users_from_db)
pipeline.stage('validate', validate_fn)
pipeline.stage('clean', clean_fn)
pipeline.stage('enrich', enrich_fn)
pipeline.stage('aggregate', lambda data: aggregate_fn(data), parallel=True)
pipeline.sink_to(save_to_warehouse)
pipeline.run()
# Method 3: With error handling
def handle_error(error, data):
logger.error(f"Error: {error}")
return {'error': str(error), 'data': data}
pipeline.stage('risky_transform', risky_fn, error_handler=handle_error)
Testing
import pytest
from pipeline import Pipeline, filter_fn, map_fn
def test_filter_stage():
pipeline = Pipeline('test')
pipeline.source_from(lambda: [
{'id': 1, 'value': 5},
{'id': 2, 'value': 15}
])
pipeline.stage('filter', filter_fn)
result = pipeline.run()
assert len(result) == 1
assert result[0]['id'] == 2
def test_map_stage():
pipeline = Pipeline('test')
pipeline.source_from(lambda: [{'id': 1, 'value': 10}])
pipeline.stage('map', map_fn)
result = pipeline.run()
assert result[0]['value'] == 20
assert result[0]['name'] == ''
def test_pipeline_errors():
pipeline = Pipeline('test')
pipeline.source_from(lambda: [{'id': 1}])
pipeline.stage('fail', lambda x: x['missing'])
with pytest.raises(KeyError):
pipeline.run()
Summary
You built a data pipeline framework with stages, transformations, parallel execution, and error handling.
Possible Extensions
- Add windowing for streaming data
- Implement checkpointing
- Add monitoring and metrics