← Back to Tutorials
Python

Build a Distributed Task Queue

Difficulty: Advanced Est. Time: ~6 hours

Introduction

Distributed task queues are the backbone of modern scalable applications. They allow you to run computationally intensive or time-consuming tasks asynchronously, improving application responsiveness and enabling horizontal scaling.

In this tutorial, we'll build a complete distributed task queue system from scratch, similar to Celery or RQ, but simplified.

What You'll Build
  • A message broker for task distribution
  • Worker nodes that process tasks
  • A client API for enqueueing tasks
  • Task scheduling and delayed execution
  • Retry mechanisms for failed tasks
  • Task result storage
What You'll Learn
  • Message queue architectures
  • Distributed system design patterns
  • Worker pool management
  • Task serialization
  • At-least-once delivery semantics
  • Result backends

Core Concepts

Message Broker

The message broker is the central hub that receives tasks from producers and distributes them to workers. It maintains queues for different task types and handles message routing.

We'll implement a simple broker using Redis as the underlying storage, leveraging its atomic operations and pub/sub capabilities.

Workers and Worker Pools

Workers are processes that consume tasks from the queue and execute them. A worker pool manages multiple worker processes, controlling concurrency and resource usage.

Each worker:

  • Connects to the message broker
  • Polls for available tasks
  • Executes tasks asynchronously
  • Reports results or failures

Task Serialization

Tasks must be serialized into a storable format before being sent to the queue. We'll use JSON or pickle for serialization, storing the function name, arguments, and kwargs.

Result Backend

After task completion, results need to be stored somewhere for retrieval. The result backend persists task results, status, and metadata.

Project Overview

Our distributed task queue will have these components:

Component Description
Broker Redis-based message broker
Worker Task execution process
Client API for enqueueing tasks
Result Backend Task result storage
Scheduler Delayed task execution

Prerequisites

  • Python 3.8+ - Installed on your system
  • Redis - Running locally or accessible
  • redis-py - Install with pip install redis
  • Basic multiprocessing knowledge

Architecture

Here's how the components interact:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Client    │────▶│   Broker    │────▶│   Worker    │
│  (Producer) │     │   (Redis)   │     │  (Consumer) │
└─────────────┘     └─────────────┘     └─────────────┘
                          │                   │
                          â–¼                   â–¼
                    ┌─────────────┐     ┌─────────────┐
                    │   Result    │◀────│   Result    │
                    │   Backend  │     │   Storage   │
                    └─────────────┘     └─────────────┘

We'll implement each component as a Python module in a package called taskq.

Message Format

Create taskq/messages.py to define the task message format:

import json
import uuid
from datetime import datetime
from enum import Enum


class TaskStatus(Enum):
    PENDING = "pending"
    STARTED = "started"
    SUCCESS = "success"
    FAILURE = "failure"
    RETRY = "retry"


class TaskMessage:
    def __init__(
        self,
        task_id=None,
        task_name=None,
        args=None,
        kwargs=None,
        queue="default",
        retry_count=0,
        max_retries=3,
        scheduled_at=None,
    ):
        self.task_id = task_id or str(uuid.uuid4())
        self.task_name = task_name
        self.args = args or []
        self.kwargs = kwargs or {}
        self.queue = queue
        self.retry_count = retry_count
        self.max_retries = max_retries
        self.scheduled_at = scheduled_at
        self.created_at = datetime.utcnow().isoformat()

    def to_json(self):
        return json.dumps({
            "task_id": self.task_id,
            "task_name": self.task_name,
            "args": self.args,
            "kwargs": self.kwargs,
            "queue": self.queue,
            "retry_count": self.retry_count,
            "max_retries": self.max_retries,
            "scheduled_at": self.scheduled_at,
            "created_at": self.created_at,
        })

    @classmethod
    def from_json(cls, json_str):
        data = json.loads(json_str)
        msg = cls(
            task_id=data["task_id"],
            task_name=data["task_name"],
            args=data["args"],
            kwargs=data["kwargs"],
            queue=data["queue"],
            retry_count=data["retry_count"],
            max_retries=data["max_retries"],
            scheduled_at=data.get("scheduled_at"),
        )
        msg.created_at = data.get("created_at")
        return msg


class TaskResult:
    def __init__(self, task_id, status, result=None, error=None):
        self.task_id = task_id
        self.status = status
        self.result = result
        self.error = error
        self.completed_at = datetime.utcnow().isoformat()

    def to_json(self):
        return json.dumps({
            "task_id": self.task_id,
            "status": self.status.value,
            "result": self.result,
            "error": str(self.error) if self.error else None,
            "completed_at": self.completed_at,
        })

    @classmethod
    def from_json(cls, json_str):
        data = json.loads(json_str)
        return cls(
            task_id=data["task_id"],
            status=TaskStatus(data["status"]),
            result=data.get("result"),
            error=data.get("error"),
        )

Message Broker

Create taskq/broker.py to handle message distribution:

import redis
import json
import threading
from typing import Callable, Optional
from .messages import TaskMessage, TaskResult, TaskStatus


class Broker:
    def __init__(self, redis_url="redis://localhost:6379"):
        self.redis = redis.from_url(redis_url, decode_responses=True)
        self.queues = set()
        self._lock = threading.Lock()
    
    def enqueue(self, task: TaskMessage) -> str:
        queue_key = f"queue:{task.queue}"
        self.redis.rpush(queue_key, task.to_json())
        with self._lock:
            self.queues.add(task.queue)
        
        if task.scheduled_at:
            scheduled_key = f"scheduled:{task.scheduled_at}"
            self.redis.zadd(scheduled_key, {task.to_json(): 0})
        
        return task.task_id

    def dequeue(self, queue="default", timeout=0):
        queue_key = f"queue:{queue}"
        result = self.redis.blpop(queue_key, timeout=timeout)
        
        if result:
            _, task_json = result
            return TaskMessage.from_json(task_json)
        return None

    def get_queue_length(self, queue="default") -> int:
        queue_key = f"queue:{queue}"
        return self.redis.llen(queue_key)

    def get_queues(self):
        with self._lock:
            return list(self.queues)

    def get_result(self, task_id: str) -> Optional[TaskResult]:
        result_key = f"result:{task_id}"
        result_json = self.redis.get(result_key)
        
        if result_json:
            return TaskResult.from_json(result_json)
        return None

    def store_result(self, result: TaskResult, ttl=3600):
        result_key = f"result:{result.task_id}"
        self.redis.setex(result_key, ttl, result.to_json())

    def get_status(self, task_id: str) -> Optional[TaskStatus]:
        result = self.get_result(task_id)
        return result.status if result else None

    def publish(self, channel: str, message: str):
        self.redis.publish(channel, message)

    def subscribe(self, channel: str):
        pubsub = self.redis.pubsub()
        pubsub.subscribe(channel)
        return pubsub

    def check_queue(self, queue="default") -> bool:
        queue_key = f"queue:{queue}"
        return self.redis.exists(queue_key) > 0

    def clear_queue(self, queue="default"):
        queue_key = f"queue:{queue}"
        self.redis.delete(queue_key)
        with self._lock:
            self.queues.discard(queue)


broker = Broker()

Worker Nodes

Create taskq/worker.py for task processing:

import multiprocessing
import signal
import time
import traceback
from concurrent.futures import ProcessPoolExecutor
from typing import Callable, Dict, List, Optional

from .broker import Broker
from .messages import TaskMessage, TaskResult, TaskStatus


class Worker:
    def __init__(
        self,
        queues=None,
        broker=None,
        concurrency=1,
        redis_url="redis://localhost:6379",
    ):
        self.queues = queues or ["default"]
        self.broker = broker or Broker(redis_url)
        self.concurrency = concurrency
        self.running = False
        self.registered_tasks: Dict[str, Callable] = {}
        self.pool: Optional[ProcessPoolExecutor] = None

    def register(self, task_name: str):
        def decorator(func: Callable):
            self.registered_tasks[task_name] = func
            return func
        return decorator

    def start(self):
        self.running = True
        self.pool = ProcessPoolExecutor(max_workers=self.concurrency)
        
        signal.signal(signal.SIGTERM, self._handle_shutdown)
        signal.signal(signal.SIGINT, self._handle_shutdown)
        
        print(f"Worker started, listening on queues: {self.queues}")
        
        while self.running:
            for queue in self.queues:
                task = self.broker.dequeue(queue, timeout=1)
                if task:
                    self._process_task(task)

    def _process_task(self, task: TaskMessage):
        print(f"Processing task {task.task_id}: {task.task_name}")
        
        task_func = self.registered_tasks.get(task.task_name)
        
        if not task_func:
            error = f"Unknown task: {task.task_name}"
            self._store_result(task, TaskStatus.FAILURE, error=error)
            return
        
        try:
            self._store_result(task, TaskStatus.STARTED)
            
            result = task_func(*task.args, **task.kwargs)
            
            self._store_result(task, TaskStatus.SUCCESS, result=result)
            print(f"Task {task.task_id} completed successfully")
            
        except Exception as e:
            self._handle_failure(task, e)

    def _handle_failure(self, task: TaskMessage, error: Exception):
        print(f"Task {task.task_id} failed: {error}")
        
        if task.retry_count < task.max_retries:
            task.retry_count += 1
            print(f"Retrying task {task.task_id} (attempt {task.retry_count})")
            self.broker.enqueue(task)
            self._store_result(task, TaskStatus.RETRY)
        else:
            error_msg = f"{str(error)}\n{traceback.format_exc()}"
            self._store_result(task, TaskStatus.FAILURE, error=error_msg)

    def _store_result(self, task: TaskMessage, status: TaskStatus, result=None, error=None):
        result_obj = TaskResult(task.task_id, status, result, error)
        self.broker.store_result(result_obj)

    def _handle_shutdown(self, signum, frame):
        print("Shutting down worker...")
        self.running = False
        if self.pool:
            self.pool.shutdown(wait=True)


class WorkerPool:
    def __init__(self, num_workers, queues=None, **worker_kwargs):
        self.num_workers = num_workers
        self.queues = queues
        self.worker_kwargs = worker_kwargs
        self.workers: List[multiprocessing.Process] = []

    def start(self):
        for i in range(self.num_workers):
            worker = multiprocessing.Process(
                target=self._run_worker,
                args=(i, self.queues),
                kwargs=self.worker_kwargs,
            )
            worker.start()
            self.workers.append(worker)
        print(f"Started {self.num_workers} workers")

    def _run_worker(self, worker_id, queues, **kwargs):
        worker = Worker(queues=queues, **kwargs)
        worker.start()

    def stop(self):
        for worker in self.workers:
            worker.terminate()
            worker.join()
        print("All workers stopped")

Client API

Create taskq/client.py for the task queue client:

import functools
import inspect
from typing import Any, Callable, Optional
from datetime import datetime, timedelta

from .broker import Broker
from .messages import TaskMessage, TaskResult


class TaskQueue:
    def __init__(self, redis_url="redis://localhost:6379", broker=None):
        self.broker = broker or Broker(redis_url)

    def task(self, queue="default", max_retries=3):
        def decorator(func: Callable) -> Callable:
            @functools.wraps(func)
            def delay(*args, **kwargs):
                task = TaskMessage(
                    task_name=func.__module__ + "." + func.__name__,
                    args=args,
                    kwargs=kwargs,
                    queue=queue,
                    max_retries=max_retries,
                )
                return self.broker.enqueue(task)
            
            @functools.wraps(func)
            def apply_async(*args, **kwargs):
                return delay(*args, **kwargs)
            
            @functools.wraps(func)
            def apply_at(dt: datetime, *args, **kwargs):
                task = TaskMessage(
                    task_name=func.__module__ + "." + func.__name__,
                    args=args,
                    kwargs=kwargs,
                    queue=queue,
                    max_retries=max_retries,
                    scheduled_at=dt.isoformat(),
                )
                return self.broker.enqueue(task)
            
            delay.apply_async = apply_async
            delay.apply_at = apply_at
            delay.apply = apply_async
            
            func.delay = delay
            func.apply_async = apply_async
            func.apply_at = apply_at
            
            return func
        return decorator

    def enqueue(
        self,
        func: Callable,
        args: tuple = None,
        kwargs: dict = None,
        queue: str = "default",
        max_retries: int = 3,
        scheduled_at: Optional[datetime] = None,
    ) -> str:
        task = TaskMessage(
            task_name=func.__module__ + "." + func.__name__,
            args=args or (),
            kwargs=kwargs or {},
            queue=queue,
            max_retries=max_retries,
            scheduled_at=scheduled_at.isoformat() if scheduled_at else None,
        )
        return self.broker.enqueue(task)

    def get_result(self, task_id: str) -> Optional[TaskResult]:
        return self.broker.get_result(task_id)

    def get_status(self, task_id: str):
        return self.broker.get_status(task_id)

    def wait_for_result(self, task_id: str, timeout: int = 60):
        import time
        start = time.time()
        
        while time.time() - start < timeout:
            result = self.get_result(task_id)
            if result:
                return result
            time.sleep(0.1)
        
        raise TimeoutError(f"Timeout waiting for task {task_id}")

    def is_ready(self, task_id: str) -> bool:
        status = self.get_status(task_id)
        return status in (TaskStatus.SUCCESS, TaskStatus.FAILURE)

    def is_successful(self, task_id: str) -> bool:
        return self.get_status(task_id) == TaskStatus.SUCCESS


queue = TaskQueue()

Fault Tolerance

Create taskq/scheduler.py for handling scheduled tasks:

import time
import threading
from datetime import datetime
from .broker import Broker
from .messages import TaskMessage


class Scheduler:
    def __init__(self, redis_url="redis://localhost:6379", broker=None):
        self.broker = broker or Broker(redis_url)
        self.running = False
        self.thread = None

    def start(self):
        self.running = True
        self.thread = threading.Thread(target=self._run)
        self.thread.daemon = True
        self.thread.start()
        print("Scheduler started")

    def stop(self):
        self.running = False
        if self.thread:
            self.thread.join()

    def _run(self):
        while self.running:
            self._process_scheduled()
            time.sleep(1)

    def _process_scheduled(self):
        now = datetime.utcnow().isoformat()
        
        for queue in self.broker.get_queues():
            scheduled_key = f"scheduled:{queue}"
            tasks = self.redis.zrangebyscore(scheduled_key, 0, now)
            
            for task_json in tasks:
                task = TaskMessage.from_json(task_json)
                self.broker.enqueue(task)
            
            if tasks:
                self.redis.zremrangebyscore(scheduled_key, 0, now)

    @property
    def redis(self):
        return self.broker.redis

Fault Tolerance Features

  • Automatic Retries - Failed tasks are automatically retried up to max_retries
  • Result TTL - Results expire after 1 hour by default
  • Graceful Shutdown - Workers finish current task before shutting down
  • Scheduled Tasks - Tasks can be scheduled for future execution

Testing the Task Queue

Create example tasks and test the system:

# tasks.py
from taskq import TaskQueue

queue = TaskQueue()

@queue.task()
def add(a, b):
    return a + b

@queue.task(max_retries=5)
def process_data(data):
    if not data:
        raise ValueError("No data provided")
    return f"Processed: {data}"

@queue.task(queue="emails")
def send_email(to, subject, body):
    print(f"Sending email to {to}: {subject}")
    return "Email sent"

# Using the tasks
result_id = add.delay(5, 3)
print(f"Task enqueued: {result_id}")

result_id2 = process_data.delay("Hello World")
print(f"Task enqueued: {result_id2}")

result_id3 = send_email.delay("user@example.com", "Test", "Hello!")
print(f"Task enqueued: {result_id3}")

Run the worker:

# worker.py
from taskq import Worker

worker = Worker(queues=["default", "emails"], concurrency=2)

@worker.register("tasks.add")
def add(a, b):
    return a + b

@worker.register("tasks.process_data")
def process_data(data):
    if not data:
        raise ValueError("No data provided")
    return f"Processed: {data}"

@worker.register("tasks.send_email")
def send_email(to, subject, body):
    print(f"Sending email to {to}: {subject}")
    return "Email sent"

worker.start()

Summary

Congratulations! You've built a complete distributed task queue system. Here's what you learned:

  • Message Broker - How to distribute tasks using Redis queues
  • Worker Design - How to process tasks asynchronously
  • Task Registration - How to register and call tasks
  • Result Storage - How to store and retrieve task results
  • Retries - How to handle failed tasks with automatic retries
  • Scheduling - How to schedule tasks for future execution

Possible Extensions

  • Add task dependencies (Task A must complete before Task B)
  • Implement task priorities (high/medium/low)
  • Add monitoring and metrics
  • Implement worker heartbeats for health checks
  • Add distributed locking for critical sections
  • Implement task timeouts