← Back to Tutorials
Python

Build a Background Job Queue

Difficulty: Intermediate Est. Time: ~4 hours

Introduction

Background job queues are essential for processing time-consuming tasks asynchronously. They help improve application responsiveness and handle heavy workloads efficiently.

In this tutorial, we'll build "JobQ" - a background job queue system with worker pools, retries, scheduling, and persistence.

What You'll Build
  • A threaded job queue
  • Worker pool for parallel processing
  • Job persistence
  • Scheduled jobs
  • Retry mechanism
What You'll Learn
  • Queue-based architectures
  • Thread pool patterns
  • Job scheduling
  • Error handling
  • Message persistence

Core Concepts

Let's understand job queue architecture.

Queue Architecture

  • Producer - Enqueues jobs
  • Queue - Stores pending jobs
  • Worker - Processes jobs
  • Consumer - Retrieves results

Job Lifecycle

  • Pending - Waiting in queue
  • Processing - Being executed
  • Completed - Successfully finished
  • Failed - Error occurred
  • Retry - Scheduled for retry

Project Setup

Bash
# Create project directory
mkdir jobq
cd jobq

# Create virtual environment
python -m venv venv
source venv/bin/activate

# Install dependencies
pip install redis pickle5

Project Structure

File Structure
jobq/
├── jobq/
│   ├── __init__.py
│   ├── job.py
│   ├── queue.py
│   ├── worker.py
│   ├── scheduler.py
│   └── store.py
├── main.py
└── requirements.txt

Job Queue

Let's create the job and queue classes.

Python
# jobq/job.py
import uuid
import time
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Optional, Dict
import pickle

class JobStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRY = "retry"


class Job:
    __init__(self, 
                 func: Callable, 
                 args: tuple = (),
                 kwargs: Dict = None,
                 id: str = None,
                 priority: int = 0,
                 max_retries: int = 3,
                 timeout: int = 300):
        
        self.id = id or str(uuid.uuid4())
        self.func = func
        self.args = args
        self.kwargs = kwargs or {}
        self.priority = priority
        self.max_retries = max_retries
        self.timeout = timeout
        
        self.status = JobStatus.PENDING
        self.created_at = datetime.now()
        self.started_at = None
        self.completed_at = None
        self.result = None
        self.error = None
        self.retry_count = 0
    
    execute(self) -> Any:
        self.status = JobStatus.PROCESSING
        self.started_at = datetime.now()
        
        try:
            import signal
            
            class TimeoutException(Exception):
                pass
            
            def timeout_handler(signum, frame):
                raise TimeoutException("Job timed out")
            
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(self.timeout)
            
            self.result = self.func(*self.args, **self.kwargs)
            
            signal.alarm(0)
            
            self.status = JobStatus.COMPLETED
            self.completed_at = datetime.now()
            
            return self.result
            
        except Exception as e:
            self.error = str(e)
            
            if self.retry_count < self.max_retries:
                self.status = JobStatus.RETRY
                self.retry_count += 1
            else:
                self.status = JobStatus.FAILED
            
            self.completed_at = datetime.now()
            raise
    
    to_dict(self) -> Dict:
        return {
            'id': self.id,
            'status': self.status.value,
            'priority': self.priority,
            'created_at': self.created_at.isoformat(),
            'started_at': self.started_at.isoformat() if self.started_at else None,
            'completed_at': self.completed_at.isoformat() if self.completed_at else None,
            'result': str(self.result)[:100],
            'error': self.error,
            'retry_count': self.retry_count
        }
    
    serialize(self) -> bytes:
        func_name = self.func.__name__ if callable(self.func) else self.func
        return pickle.dumps({
            'func': func_name,
            'args': self.args,
            'kwargs': self.kwargs,
            'id': self.id,
            'priority': self.priority,
            'max_retries': self.max_retries,
            'timeout': self.timeout
        })
Python
# jobq/queue.py
import queue
import threading
from typing import Optional, List
from .job import Job

class JobQueue:
    __init__(self, maxsize: int = 0):
        self.queue = queue.PriorityQueue(maxsize=maxsize)
        self.lock = threading.Lock()
        self.jobs: dict = {}
        self.job_conditions: dict = {}
    
    enqueue(self, job: Job) -> str:
        with self.lock:
            self.jobs[job.id] = job
            if job.id not in self.job_conditions:
                self.job_conditions[job.id] = threading.Condition()
        
        self.queue.put((job.priority, job.id, job))
        
        return job.id
    
    dequeue(self, block: bool = True, timeout: Optional[float] = None) -> Optional[Job]:
        try:
            priority, job_id, job = self.queue.get(block=block, timeout=timeout)
            return job
        except queue.Empty:
            return None
    
    get_job(self, job_id: str) -> Optional[Job]:
        with self.lock:
            return self.jobs.get(job_id)
    
    complete_job(self, job_id: str):
        if job_id in self.job_conditions:
            with self.job_conditions[job_id]:
                self.job_conditions[job_id].notify_all()
    
    get_status(self, job_id: str) -> Optional[str]:
        job = self.get_job(job_id)
        return job.status.value if job else None
    
    wait_for_completion(self, job_id: str, timeout: Optional[float] = None) -> bool:
        if job_id not in self.job_conditions:
            return False
        
        with self.job_conditions[job_id]:
            return self.job_conditions[job_id].wait(timeout=timeout)
    
    get_stats(self) -> dict:
        stats = {
            'pending': 0,
            'processing': 0,
            'completed': 0,
            'failed': 0,
            'retry': 0
        }
        
        with self.lock:
            for job in self.jobs.values():
                stats[job.status.value] = stats.get(job.status.value, 0) + 1
        
        return stats
    
    size(self) -> int:
        return self.queue.qsize()

Worker Pool

Let's create the worker pool.

Python
# jobq/worker.py
import threading
import time
from typing import Optional, Callable
from .queue import JobQueue
from .job import Job, JobStatus

class Worker:
    __init__(self, queue: JobQueue, worker_id: int = 0):
        self.queue = queue
        self.worker_id = worker_id
        self.running = False
        self.thread: Optional[threading.Thread] = None
    
    start(self):
        self.running = True
        self.thread = threading.Thread(target=self._run, daemon=True)
        self.thread.start()
    
    stop(self):
        self.running = False
        if self.thread:
            self.thread.join()
    
    _run(self):
        while self.running:
            job = self.queue.dequeue(timeout=1)
            
            if job is None:
                continue
            
            self._process_job(job)
    
    _process_job(self, job: Job):
        try:
            print(f"Worker {${self.worker_id} processing job ${job.id}")
            
            result = job.execute()
            
            if job.status == JobStatus.COMPLETED:
                print(f"Job ${job.id} completed successfully")
            elif job.status == JobStatus.RETRY:
                print(f"Job ${job.id} scheduled for retry ({${job.retry_count}/{${job.max_retries})")
                self.queue.enqueue(job)
            
        except Exception as e:
            print(f"Job ${job.id} failed: ${e}")
        
        finally:
            self.queue.complete_job(job.id)


class WorkerPool:
    __init__(self, queue: JobQueue, num_workers: int = 4):
        self.queue = queue
        self.num_workers = num_workers
        self.workers: list = []
    
    start(self):
        for i in range(self.num_workers):
            worker = Worker(self.queue, worker_id=i)
            worker.start()
            self.workers.append(worker)
        
        print(f"Started ${self.num_workers} workers")
    
    stop(self):
        for worker in self.workers:
            worker.stop()
        
        self.workers.clear()
        print("All workers stopped")

Persistence

Let's add job persistence with Redis.

Python
# jobq/store.py
import json
import redis
from typing import Optional, List, Dict
from datetime import datetime
from .job import Job, JobStatus

class JobStore:
    __init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
        self.redis = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=0,
            decode_responses=False
        )
        self.prefix = "jobq:"
    
    _make_key(self, key: str) -> str:
        return f"${self.prefix}${key}"
    
    save_job(self, job: Job):
        key = self._make_key(f"job:${job.id}")
        
        data = {
            'id': job.id,
            'func': job.func.__name__ if callable(job.func) else str(job.func),
            'args': str(job.args),
            'kwargs': str(job.kwargs),
            'priority': job.priority,
            'max_retries': job.max_retries,
            'timeout': job.timeout,
            'status': job.status.value,
            'created_at': job.created_at.isoformat(),
            'retry_count': job.retry_count,
            'result': str(job.result) if job.result else None,
            'error': job.error
        }
        
        self.redis.set(key, json.dumps(data))
    
    get_job(self, job_id: str) -> Optional[Dict]:
        key = self._make_key(f"job:${job_id}")
        
        data = self.redis.get(key)
        
        if data:
            return json.loads(data)
        return None
    
    get_all_jobs(self, limit: int = 100) -> List[Dict]:
        keys = self.redis.keys(self._make_key("job:*"))
        
        jobs = []
        for key in keys[:limit]:
            data = self.redis.get(key)
            if data:
                jobs.append(json.loads(data))
        
        return jobs
    
    delete_job(self, job_id: str):
        key = self._make_key(f"job:${job_id}")
        self.redis.delete(key)
    
    get_stats(self) -> Dict:
        keys = self.redis.keys(self._make_key("job:*"))
        
        stats = {
            'total': 0,
            'completed': 0,
            'failed': 0,
            'pending': 0,
            'processing': 0
        }
        
        for key in keys:
            data = self.redis.get(key)
            if data:
                job = json.loads(data)
                stats['total'] += 1
                status = job.get('status', 'pending')
                stats[status] = stats.get(status, 0) + 1
        
        return stats

Job Scheduling

Let's add scheduled job support.

Python
# jobq/scheduler.py
import threading
import time
from datetime import datetime, timedelta
from typing import Callable, Optional, Dict, List
from enum import Enum

class ScheduleType(Enum):
    ONCE = "once"
    INTERVAL = "interval"
    CRON = "cron"


class ScheduledJob:
    __init__(self, 
                 job_id: str,
                 func: Callable,
                 args: tuple = (),
                 kwargs: Dict = None,
                 schedule_type: ScheduleType = ScheduleType.ONCE,
                 interval: int = None,
                 run_at: datetime = None):
        
        self.job_id = job_id
        self.func = func
        self.args = args
        self.kwargs = kwargs or {}
        self.schedule_type = schedule_type
        self.interval = interval
        self.run_at = run_at
        self.enabled = True
        self.last_run = None
        self.next_run = run_at
    
    should_run(self) -> bool:
        if not self.enabled:
            return False
        
        if self.next_run and datetime.now() >= self.next_run:
            return True
        
        return False
    
    mark_run(self):
        self.last_run = datetime.now()
        
        if self.schedule_type == ScheduleType.INTERVAL and self.interval:
            self.next_run = datetime.now() + timedelta(seconds=self.interval)
        elif self.schedule_type == ScheduleType.ONCE:
            self.enabled = False


class Scheduler:
    __init__(self, queue):
        self.queue = queue
        self.scheduled_jobs: Dict[str, ScheduledJob] = {}
        self.running = False
        self.thread: Optional[threading.Thread] = None
    
    schedule_once(self, job_id: str, func: Callable, run_at: datetime, 
                       args: tuple = (), kwargs: Dict = None) -> str:
        job = ScheduledJob(
            job_id, func, args, kwargs,
            schedule_type=ScheduleType.ONCE,
            run_at=run_at
        )
        self.scheduled_jobs[job_id] = job
        return job_id
    
    schedule_interval(self, job_id: str, func: Callable, interval: int,
                           args: tuple = (), kwargs: Dict = None) -> str:
        job = ScheduledJob(
            job_id, func, args, kwargs,
            schedule_type=ScheduleType.INTERVAL,
            interval=interval,
            run_at=datetime.now()
        )
        self.scheduled_jobs[job_id] = job
        return job_id
    
    cancel(self, job_id: str) -> bool:
        if job_id in self.scheduled_jobs:
            self.scheduled_jobs[job_id].enabled = False
            del self.scheduled_jobs[job_id]
            return True
        return False
    
    start(self):
        self.running = True
        self.thread = threading.Thread(target=self._run, daemon=True)
        self.thread.start()
        print("Scheduler started")
    
    stop(self):
        self.running = False
        if self.thread:
            self.thread.join()
        print("Scheduler stopped")
    
    _run(self):
        while self.running:
            for job_id, job in list(self.scheduled_jobs.items()):
                if job.should_run():
                    from .job import Job
                    new_job = Job(job.func, job.args, job.kwargs, id=job_id)
                    self.queue.enqueue(new_job)
                    job.mark_run()
            
            time.sleep(1)

Testing

Python
# main.py
from jobq.queue import JobQueue
from jobq.worker import WorkerPool
from jobq.job import Job
from jobq.scheduler import Scheduler
from datetime import datetime, timedelta
import time

# Define some tasks
def send_email(to, subject):
    print(f"Sending email to ${to}: ${subject}")
    time.sleep(2)
    return "Email sent"

def process_data(data):
    print(f"Processing data: ${data}")
    time.sleep(1)
    return f"Processed ${data} successfully"

# Create queue and workers
queue = JobQueue()
pool = WorkerPool(queue, num_workers=2)
pool.start()

# Enqueue some jobs
job1 = Job(send_email, ("user@example.com", "Hello"), priority=1)
job2 = Job(process_data, ("report.csv",), priority=2)
job3 = Job(process_data, ("image.png",), priority=3)

queue.enqueue(job1)
queue.enqueue(job2)
queue.enqueue(job3)

# Schedule a job
scheduler = Scheduler(queue)
scheduler.schedule_interval("periodic_task", process_data, interval=10, args=("scheduled",))
scheduler.start()

# Wait for jobs to complete
time.sleep(5)

# Print stats
print("\nQueue stats:", queue.get_stats())

# Stop everything
pool.stop()
scheduler.stop()
Testing Checklist
  • Jobs are queued correctly
  • Workers process jobs
  • Retry mechanism works
  • Scheduled jobs run
  • Stats are tracked

Summary

Congratulations! You've built a complete background job queue system.

What You Built

  • Job Queue - Priority-based queue
  • Workers - Thread pool for processing
  • Persistence - Redis-based storage
  • Scheduler - Job scheduling
  • Retry Logic - Automatic retries

Next Steps

  • Add distributed workers
  • Implement job priorities
  • Add job dependencies
  • Implement dead letter queue

Continue Learning

Try these tutorials:

  • Build a REST API with JWT
  • Build a Caching System
  • Build a Real-Time Notification System