Build a Distributed Task Queue
Introduction
Distributed task queues are the backbone of modern scalable applications. They allow you to run computationally intensive or time-consuming tasks asynchronously, improving application responsiveness and enabling horizontal scaling.
In this tutorial, we'll build a complete distributed task queue system from scratch, similar to Celery or RQ, but simplified.
- A message broker for task distribution
- Worker nodes that process tasks
- A client API for enqueueing tasks
- Task scheduling and delayed execution
- Retry mechanisms for failed tasks
- Task result storage
- Message queue architectures
- Distributed system design patterns
- Worker pool management
- Task serialization
- At-least-once delivery semantics
- Result backends
Core Concepts
Message Broker
The message broker is the central hub that receives tasks from producers and distributes them to workers. It maintains queues for different task types and handles message routing.
We'll implement a simple broker using Redis as the underlying storage, leveraging its atomic operations and pub/sub capabilities.
Workers and Worker Pools
Workers are processes that consume tasks from the queue and execute them. A worker pool manages multiple worker processes, controlling concurrency and resource usage.
Each worker:
- Connects to the message broker
- Polls for available tasks
- Executes tasks asynchronously
- Reports results or failures
Task Serialization
Tasks must be serialized into a storable format before being sent to the queue. We'll use JSON or pickle for serialization, storing the function name, arguments, and kwargs.
Result Backend
After task completion, results need to be stored somewhere for retrieval. The result backend persists task results, status, and metadata.
Project Overview
Our distributed task queue will have these components:
| Component | Description |
|---|---|
| Broker | Redis-based message broker |
| Worker | Task execution process |
| Client | API for enqueueing tasks |
| Result Backend | Task result storage |
| Scheduler | Delayed task execution |
Prerequisites
- Python 3.8+ - Installed on your system
- Redis - Running locally or accessible
- redis-py - Install with
pip install redis - Basic multiprocessing knowledge
Architecture
Here's how the components interact:
┌─────────────┠┌─────────────┠┌─────────────â”
│ Client │────▶│ Broker │────▶│ Worker │
│ (Producer) │ │ (Redis) │ │ (Consumer) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
â–¼ â–¼
┌─────────────┠┌─────────────â”
│ Result │◀────│ Result │
│ Backend │ │ Storage │
└─────────────┘ └─────────────┘
We'll implement each component as a Python module in a package called taskq.
Message Format
Create taskq/messages.py to define the task message format:
import json
import uuid
from datetime import datetime
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
STARTED = "started"
SUCCESS = "success"
FAILURE = "failure"
RETRY = "retry"
class TaskMessage:
def __init__(
self,
task_id=None,
task_name=None,
args=None,
kwargs=None,
queue="default",
retry_count=0,
max_retries=3,
scheduled_at=None,
):
self.task_id = task_id or str(uuid.uuid4())
self.task_name = task_name
self.args = args or []
self.kwargs = kwargs or {}
self.queue = queue
self.retry_count = retry_count
self.max_retries = max_retries
self.scheduled_at = scheduled_at
self.created_at = datetime.utcnow().isoformat()
def to_json(self):
return json.dumps({
"task_id": self.task_id,
"task_name": self.task_name,
"args": self.args,
"kwargs": self.kwargs,
"queue": self.queue,
"retry_count": self.retry_count,
"max_retries": self.max_retries,
"scheduled_at": self.scheduled_at,
"created_at": self.created_at,
})
@classmethod
def from_json(cls, json_str):
data = json.loads(json_str)
msg = cls(
task_id=data["task_id"],
task_name=data["task_name"],
args=data["args"],
kwargs=data["kwargs"],
queue=data["queue"],
retry_count=data["retry_count"],
max_retries=data["max_retries"],
scheduled_at=data.get("scheduled_at"),
)
msg.created_at = data.get("created_at")
return msg
class TaskResult:
def __init__(self, task_id, status, result=None, error=None):
self.task_id = task_id
self.status = status
self.result = result
self.error = error
self.completed_at = datetime.utcnow().isoformat()
def to_json(self):
return json.dumps({
"task_id": self.task_id,
"status": self.status.value,
"result": self.result,
"error": str(self.error) if self.error else None,
"completed_at": self.completed_at,
})
@classmethod
def from_json(cls, json_str):
data = json.loads(json_str)
return cls(
task_id=data["task_id"],
status=TaskStatus(data["status"]),
result=data.get("result"),
error=data.get("error"),
)
Message Broker
Create taskq/broker.py to handle message distribution:
import redis
import json
import threading
from typing import Callable, Optional
from .messages import TaskMessage, TaskResult, TaskStatus
class Broker:
def __init__(self, redis_url="redis://localhost:6379"):
self.redis = redis.from_url(redis_url, decode_responses=True)
self.queues = set()
self._lock = threading.Lock()
def enqueue(self, task: TaskMessage) -> str:
queue_key = f"queue:{task.queue}"
self.redis.rpush(queue_key, task.to_json())
with self._lock:
self.queues.add(task.queue)
if task.scheduled_at:
scheduled_key = f"scheduled:{task.scheduled_at}"
self.redis.zadd(scheduled_key, {task.to_json(): 0})
return task.task_id
def dequeue(self, queue="default", timeout=0):
queue_key = f"queue:{queue}"
result = self.redis.blpop(queue_key, timeout=timeout)
if result:
_, task_json = result
return TaskMessage.from_json(task_json)
return None
def get_queue_length(self, queue="default") -> int:
queue_key = f"queue:{queue}"
return self.redis.llen(queue_key)
def get_queues(self):
with self._lock:
return list(self.queues)
def get_result(self, task_id: str) -> Optional[TaskResult]:
result_key = f"result:{task_id}"
result_json = self.redis.get(result_key)
if result_json:
return TaskResult.from_json(result_json)
return None
def store_result(self, result: TaskResult, ttl=3600):
result_key = f"result:{result.task_id}"
self.redis.setex(result_key, ttl, result.to_json())
def get_status(self, task_id: str) -> Optional[TaskStatus]:
result = self.get_result(task_id)
return result.status if result else None
def publish(self, channel: str, message: str):
self.redis.publish(channel, message)
def subscribe(self, channel: str):
pubsub = self.redis.pubsub()
pubsub.subscribe(channel)
return pubsub
def check_queue(self, queue="default") -> bool:
queue_key = f"queue:{queue}"
return self.redis.exists(queue_key) > 0
def clear_queue(self, queue="default"):
queue_key = f"queue:{queue}"
self.redis.delete(queue_key)
with self._lock:
self.queues.discard(queue)
broker = Broker()
Worker Nodes
Create taskq/worker.py for task processing:
import multiprocessing
import signal
import time
import traceback
from concurrent.futures import ProcessPoolExecutor
from typing import Callable, Dict, List, Optional
from .broker import Broker
from .messages import TaskMessage, TaskResult, TaskStatus
class Worker:
def __init__(
self,
queues=None,
broker=None,
concurrency=1,
redis_url="redis://localhost:6379",
):
self.queues = queues or ["default"]
self.broker = broker or Broker(redis_url)
self.concurrency = concurrency
self.running = False
self.registered_tasks: Dict[str, Callable] = {}
self.pool: Optional[ProcessPoolExecutor] = None
def register(self, task_name: str):
def decorator(func: Callable):
self.registered_tasks[task_name] = func
return func
return decorator
def start(self):
self.running = True
self.pool = ProcessPoolExecutor(max_workers=self.concurrency)
signal.signal(signal.SIGTERM, self._handle_shutdown)
signal.signal(signal.SIGINT, self._handle_shutdown)
print(f"Worker started, listening on queues: {self.queues}")
while self.running:
for queue in self.queues:
task = self.broker.dequeue(queue, timeout=1)
if task:
self._process_task(task)
def _process_task(self, task: TaskMessage):
print(f"Processing task {task.task_id}: {task.task_name}")
task_func = self.registered_tasks.get(task.task_name)
if not task_func:
error = f"Unknown task: {task.task_name}"
self._store_result(task, TaskStatus.FAILURE, error=error)
return
try:
self._store_result(task, TaskStatus.STARTED)
result = task_func(*task.args, **task.kwargs)
self._store_result(task, TaskStatus.SUCCESS, result=result)
print(f"Task {task.task_id} completed successfully")
except Exception as e:
self._handle_failure(task, e)
def _handle_failure(self, task: TaskMessage, error: Exception):
print(f"Task {task.task_id} failed: {error}")
if task.retry_count < task.max_retries:
task.retry_count += 1
print(f"Retrying task {task.task_id} (attempt {task.retry_count})")
self.broker.enqueue(task)
self._store_result(task, TaskStatus.RETRY)
else:
error_msg = f"{str(error)}\n{traceback.format_exc()}"
self._store_result(task, TaskStatus.FAILURE, error=error_msg)
def _store_result(self, task: TaskMessage, status: TaskStatus, result=None, error=None):
result_obj = TaskResult(task.task_id, status, result, error)
self.broker.store_result(result_obj)
def _handle_shutdown(self, signum, frame):
print("Shutting down worker...")
self.running = False
if self.pool:
self.pool.shutdown(wait=True)
class WorkerPool:
def __init__(self, num_workers, queues=None, **worker_kwargs):
self.num_workers = num_workers
self.queues = queues
self.worker_kwargs = worker_kwargs
self.workers: List[multiprocessing.Process] = []
def start(self):
for i in range(self.num_workers):
worker = multiprocessing.Process(
target=self._run_worker,
args=(i, self.queues),
kwargs=self.worker_kwargs,
)
worker.start()
self.workers.append(worker)
print(f"Started {self.num_workers} workers")
def _run_worker(self, worker_id, queues, **kwargs):
worker = Worker(queues=queues, **kwargs)
worker.start()
def stop(self):
for worker in self.workers:
worker.terminate()
worker.join()
print("All workers stopped")
Client API
Create taskq/client.py for the task queue client:
import functools
import inspect
from typing import Any, Callable, Optional
from datetime import datetime, timedelta
from .broker import Broker
from .messages import TaskMessage, TaskResult
class TaskQueue:
def __init__(self, redis_url="redis://localhost:6379", broker=None):
self.broker = broker or Broker(redis_url)
def task(self, queue="default", max_retries=3):
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def delay(*args, **kwargs):
task = TaskMessage(
task_name=func.__module__ + "." + func.__name__,
args=args,
kwargs=kwargs,
queue=queue,
max_retries=max_retries,
)
return self.broker.enqueue(task)
@functools.wraps(func)
def apply_async(*args, **kwargs):
return delay(*args, **kwargs)
@functools.wraps(func)
def apply_at(dt: datetime, *args, **kwargs):
task = TaskMessage(
task_name=func.__module__ + "." + func.__name__,
args=args,
kwargs=kwargs,
queue=queue,
max_retries=max_retries,
scheduled_at=dt.isoformat(),
)
return self.broker.enqueue(task)
delay.apply_async = apply_async
delay.apply_at = apply_at
delay.apply = apply_async
func.delay = delay
func.apply_async = apply_async
func.apply_at = apply_at
return func
return decorator
def enqueue(
self,
func: Callable,
args: tuple = None,
kwargs: dict = None,
queue: str = "default",
max_retries: int = 3,
scheduled_at: Optional[datetime] = None,
) -> str:
task = TaskMessage(
task_name=func.__module__ + "." + func.__name__,
args=args or (),
kwargs=kwargs or {},
queue=queue,
max_retries=max_retries,
scheduled_at=scheduled_at.isoformat() if scheduled_at else None,
)
return self.broker.enqueue(task)
def get_result(self, task_id: str) -> Optional[TaskResult]:
return self.broker.get_result(task_id)
def get_status(self, task_id: str):
return self.broker.get_status(task_id)
def wait_for_result(self, task_id: str, timeout: int = 60):
import time
start = time.time()
while time.time() - start < timeout:
result = self.get_result(task_id)
if result:
return result
time.sleep(0.1)
raise TimeoutError(f"Timeout waiting for task {task_id}")
def is_ready(self, task_id: str) -> bool:
status = self.get_status(task_id)
return status in (TaskStatus.SUCCESS, TaskStatus.FAILURE)
def is_successful(self, task_id: str) -> bool:
return self.get_status(task_id) == TaskStatus.SUCCESS
queue = TaskQueue()
Fault Tolerance
Create taskq/scheduler.py for handling scheduled tasks:
import time
import threading
from datetime import datetime
from .broker import Broker
from .messages import TaskMessage
class Scheduler:
def __init__(self, redis_url="redis://localhost:6379", broker=None):
self.broker = broker or Broker(redis_url)
self.running = False
self.thread = None
def start(self):
self.running = True
self.thread = threading.Thread(target=self._run)
self.thread.daemon = True
self.thread.start()
print("Scheduler started")
def stop(self):
self.running = False
if self.thread:
self.thread.join()
def _run(self):
while self.running:
self._process_scheduled()
time.sleep(1)
def _process_scheduled(self):
now = datetime.utcnow().isoformat()
for queue in self.broker.get_queues():
scheduled_key = f"scheduled:{queue}"
tasks = self.redis.zrangebyscore(scheduled_key, 0, now)
for task_json in tasks:
task = TaskMessage.from_json(task_json)
self.broker.enqueue(task)
if tasks:
self.redis.zremrangebyscore(scheduled_key, 0, now)
@property
def redis(self):
return self.broker.redis
Fault Tolerance Features
- Automatic Retries - Failed tasks are automatically retried up to max_retries
- Result TTL - Results expire after 1 hour by default
- Graceful Shutdown - Workers finish current task before shutting down
- Scheduled Tasks - Tasks can be scheduled for future execution
Testing the Task Queue
Create example tasks and test the system:
# tasks.py
from taskq import TaskQueue
queue = TaskQueue()
@queue.task()
def add(a, b):
return a + b
@queue.task(max_retries=5)
def process_data(data):
if not data:
raise ValueError("No data provided")
return f"Processed: {data}"
@queue.task(queue="emails")
def send_email(to, subject, body):
print(f"Sending email to {to}: {subject}")
return "Email sent"
# Using the tasks
result_id = add.delay(5, 3)
print(f"Task enqueued: {result_id}")
result_id2 = process_data.delay("Hello World")
print(f"Task enqueued: {result_id2}")
result_id3 = send_email.delay("user@example.com", "Test", "Hello!")
print(f"Task enqueued: {result_id3}")
Run the worker:
# worker.py
from taskq import Worker
worker = Worker(queues=["default", "emails"], concurrency=2)
@worker.register("tasks.add")
def add(a, b):
return a + b
@worker.register("tasks.process_data")
def process_data(data):
if not data:
raise ValueError("No data provided")
return f"Processed: {data}"
@worker.register("tasks.send_email")
def send_email(to, subject, body):
print(f"Sending email to {to}: {subject}")
return "Email sent"
worker.start()
Summary
Congratulations! You've built a complete distributed task queue system. Here's what you learned:
- Message Broker - How to distribute tasks using Redis queues
- Worker Design - How to process tasks asynchronously
- Task Registration - How to register and call tasks
- Result Storage - How to store and retrieve task results
- Retries - How to handle failed tasks with automatic retries
- Scheduling - How to schedule tasks for future execution
Possible Extensions
- Add task dependencies (Task A must complete before Task B)
- Implement task priorities (high/medium/low)
- Add monitoring and metrics
- Implement worker heartbeats for health checks
- Add distributed locking for critical sections
- Implement task timeouts