← Back to Tutorials
Python

Build a Data Pipeline

Difficulty: Intermediate Est. Time: ~4 hours

Introduction

Data pipelines process, transform, and move data from source to destination. They're the backbone of data engineering, ETL processes, and real-time data processing systems.

What You'll Build
  • Pipeline framework
  • Data transformations
  • Parallel execution
  • Error handling

Core Concepts

ETL

Extract, Transform, Load - the fundamental pattern of data pipelines.

Data Flow

Data flows through stages where each stage transforms or filters the data.

Prerequisites

  • Python 3.8+
  • Basic Python knowledge

Pipeline Framework

Create pipeline/core.py:

from typing import List, Callable, Any, Dict, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import logging


logger = logging.getLogger(__name__)


@dataclass
class Stage:
    name: str
    transform: Callable
    parallel: bool = False
    max_workers: int = 4
    error_handler: Optional[Callable] = None


class Pipeline:
    def __init__(self, name: str = "pipeline"):
        self.name = name
        self.stages: List[Stage] = []
        self.source: Optional[Callable] = None
        self.sink: Optional[Callable] = None
    
    def source_from(self, source_fn: Callable):
        self.source = source_fn
        return self
    
    def stage(self, name: str, transform: Callable, parallel: bool = False, 
              max_workers: int = 4, error_handler: Callable = None):
        stage = Stage(name, transform, parallel, max_workers, error_handler)
        self.stages.append(stage)
        return self
    
    def sink_to(self, sink_fn: Callable):
        self.sink = sink_fn
        return self
    
    def run(self, input_data: Any = None) -> Any:
        logger.info(f"Starting pipeline: {self.name}")
        
        if self.source:
            data = self.source()
        else:
            data = input_data
        
        for stage in self.stages:
            logger.info(f"Executing stage: {stage.name}")
            
            try:
                if stage.parallel and isinstance(data, list):
                    data = self._parallel_transform(stage, data)
                else:
                    data = self._transform(stage, data)
            except Exception as e:
                logger.error(f"Stage {stage.name} failed: {e}")
                
                if stage.error_handler:
                    data = stage.error_handler(e, data)
                else:
                    raise
        
        if self.sink:
            self.sink(data)
        
        logger.info(f"Pipeline {self.name} completed")
        return data
    
    def _transform(self, stage: Stage, data: Any) -> Any:
        if isinstance(data, list):
            return [stage.transform(item) for item in data]
        elif isinstance(data, dict):
            return stage.transform(data)
        else:
            return stage.transform(data)
    
    def _parallel_transform(self, stage: Stage, data: List) -> List:
        with ThreadPoolExecutor(max_workers=stage.max_workers) as executor:
            results = list(executor.map(stage.transform, data))
        return results


class PipelineBuilder:
    def __init__(self):
        self._stages = []
        self._source = None
        self._sink = None
    
    def read(self, source_fn: Callable):
        self._source = source_fn
        return self
    
    def transform(self, name: str, transform_fn: Callable, **kwargs):
        self._stages.append(Stage(name, transform_fn, **kwargs))
        return self
    
    def write(self, sink_fn: Callable):
        self._sink = sink_fn
        return self
    
    def build(self, name: str = "pipeline") -> Pipeline:
        pipeline = Pipeline(name)
        pipeline.source = self._source
        pipeline.stages = self._stages
        pipeline.sink = self._sink
        return pipeline

Transformations

def filter_fn(data):
    return data['value'] > 10


def map_fn(data):
    return {
        'id': data['id'],
        'name': data['name'].upper(),
        'value': data['value'] * 2
    }


def enrich_fn(data):
    return {
        **data,
        'timestamp': datetime.now().isoformat(),
        'processed': True
    }


def aggregate_fn(data_list):
    return {
        'count': len(data_list),
        'total': sum(d['value'] for d in data_list),
        'average': sum(d['value'] for d in data_list) / len(data_list)
    }


def validate_fn(data):
    required = ['id', 'name', 'value']
    for field in required:
        if field not in data:
            raise ValueError(f"Missing required field: {field}")
    return data


def clean_fn(data):
    return {
        k: v.strip() if isinstance(v, str) else v 
        for k, v in data.items()
    }

Execution

from pipeline import Pipeline, PipelineBuilder

# Method 1: Fluent API
result = (PipelineBuilder()
    .read(lambda: [
        {'id': 1, 'name': 'apple', 'value': 10},
        {'id': 2, 'name': 'banana', 'value': 20},
        {'id': 3, 'name': 'cherry', 'value': 30}
    ])
    .transform('filter', filter_fn)
    .transform('map', map_fn)
    .transform('enrich', enrich_fn)
    .build('fruit-pipeline')
    .run())


# Method 2: Direct pipeline
pipeline = Pipeline('user-pipeline')

pipeline.source_from(get_users_from_db)
pipeline.stage('validate', validate_fn)
pipeline.stage('clean', clean_fn)
pipeline.stage('enrich', enrich_fn)
pipeline.stage('aggregate', lambda data: aggregate_fn(data), parallel=True)
pipeline.sink_to(save_to_warehouse)
pipeline.run()


# Method 3: With error handling
def handle_error(error, data):
    logger.error(f"Error: {error}")
    return {'error': str(error), 'data': data}

pipeline.stage('risky_transform', risky_fn, error_handler=handle_error)

Testing

import pytest
from pipeline import Pipeline, filter_fn, map_fn

def test_filter_stage():
    pipeline = Pipeline('test')
    pipeline.source_from(lambda: [
        {'id': 1, 'value': 5},
        {'id': 2, 'value': 15}
    ])
    pipeline.stage('filter', filter_fn)
    
    result = pipeline.run()
    
    assert len(result) == 1
    assert result[0]['id'] == 2

def test_map_stage():
    pipeline = Pipeline('test')
    pipeline.source_from(lambda: [{'id': 1, 'value': 10}])
    pipeline.stage('map', map_fn)
    
    result = pipeline.run()
    
    assert result[0]['value'] == 20
    assert result[0]['name'] == ''

def test_pipeline_errors():
    pipeline = Pipeline('test')
    pipeline.source_from(lambda: [{'id': 1}])
    pipeline.stage('fail', lambda x: x['missing'])
    
    with pytest.raises(KeyError):
        pipeline.run()

Summary

You built a data pipeline framework with stages, transformations, parallel execution, and error handling.

Possible Extensions

  • Add windowing for streaming data
  • Implement checkpointing
  • Add monitoring and metrics