Build a Background Job Queue
Introduction
Background job queues are essential for processing time-consuming tasks asynchronously. They help improve application responsiveness and handle heavy workloads efficiently.
In this tutorial, we'll build "JobQ" - a background job queue system with worker pools, retries, scheduling, and persistence.
What You'll Build
- A threaded job queue
- Worker pool for parallel processing
- Job persistence
- Scheduled jobs
- Retry mechanism
What You'll Learn
- Queue-based architectures
- Thread pool patterns
- Job scheduling
- Error handling
- Message persistence
Core Concepts
Let's understand job queue architecture.
Queue Architecture
- Producer - Enqueues jobs
- Queue - Stores pending jobs
- Worker - Processes jobs
- Consumer - Retrieves results
Job Lifecycle
- Pending - Waiting in queue
- Processing - Being executed
- Completed - Successfully finished
- Failed - Error occurred
- Retry - Scheduled for retry
Project Setup
Bash
# Create project directory
mkdir jobq
cd jobq
# Create virtual environment
python -m venv venv
source venv/bin/activate
# Install dependencies
pip install redis pickle5
Project Structure
File Structure
jobq/
├── jobq/
│ ├── __init__.py
│ ├── job.py
│ ├── queue.py
│ ├── worker.py
│ ├── scheduler.py
│ └── store.py
├── main.py
└── requirements.txt
Job Queue
Let's create the job and queue classes.
Python
# jobq/job.py
import uuid
import time
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Optional, Dict
import pickle
class JobStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
RETRY = "retry"
class Job:
__init__(self,
func: Callable,
args: tuple = (),
kwargs: Dict = None,
id: str = None,
priority: int = 0,
max_retries: int = 3,
timeout: int = 300):
self.id = id or str(uuid.uuid4())
self.func = func
self.args = args
self.kwargs = kwargs or {}
self.priority = priority
self.max_retries = max_retries
self.timeout = timeout
self.status = JobStatus.PENDING
self.created_at = datetime.now()
self.started_at = None
self.completed_at = None
self.result = None
self.error = None
self.retry_count = 0
execute(self) -> Any:
self.status = JobStatus.PROCESSING
self.started_at = datetime.now()
try:
import signal
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException("Job timed out")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(self.timeout)
self.result = self.func(*self.args, **self.kwargs)
signal.alarm(0)
self.status = JobStatus.COMPLETED
self.completed_at = datetime.now()
return self.result
except Exception as e:
self.error = str(e)
if self.retry_count < self.max_retries:
self.status = JobStatus.RETRY
self.retry_count += 1
else:
self.status = JobStatus.FAILED
self.completed_at = datetime.now()
raise
to_dict(self) -> Dict:
return {
'id': self.id,
'status': self.status.value,
'priority': self.priority,
'created_at': self.created_at.isoformat(),
'started_at': self.started_at.isoformat() if self.started_at else None,
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
'result': str(self.result)[:100],
'error': self.error,
'retry_count': self.retry_count
}
serialize(self) -> bytes:
func_name = self.func.__name__ if callable(self.func) else self.func
return pickle.dumps({
'func': func_name,
'args': self.args,
'kwargs': self.kwargs,
'id': self.id,
'priority': self.priority,
'max_retries': self.max_retries,
'timeout': self.timeout
})
Python
# jobq/queue.py
import queue
import threading
from typing import Optional, List
from .job import Job
class JobQueue:
__init__(self, maxsize: int = 0):
self.queue = queue.PriorityQueue(maxsize=maxsize)
self.lock = threading.Lock()
self.jobs: dict = {}
self.job_conditions: dict = {}
enqueue(self, job: Job) -> str:
with self.lock:
self.jobs[job.id] = job
if job.id not in self.job_conditions:
self.job_conditions[job.id] = threading.Condition()
self.queue.put((job.priority, job.id, job))
return job.id
dequeue(self, block: bool = True, timeout: Optional[float] = None) -> Optional[Job]:
try:
priority, job_id, job = self.queue.get(block=block, timeout=timeout)
return job
except queue.Empty:
return None
get_job(self, job_id: str) -> Optional[Job]:
with self.lock:
return self.jobs.get(job_id)
complete_job(self, job_id: str):
if job_id in self.job_conditions:
with self.job_conditions[job_id]:
self.job_conditions[job_id].notify_all()
get_status(self, job_id: str) -> Optional[str]:
job = self.get_job(job_id)
return job.status.value if job else None
wait_for_completion(self, job_id: str, timeout: Optional[float] = None) -> bool:
if job_id not in self.job_conditions:
return False
with self.job_conditions[job_id]:
return self.job_conditions[job_id].wait(timeout=timeout)
get_stats(self) -> dict:
stats = {
'pending': 0,
'processing': 0,
'completed': 0,
'failed': 0,
'retry': 0
}
with self.lock:
for job in self.jobs.values():
stats[job.status.value] = stats.get(job.status.value, 0) + 1
return stats
size(self) -> int:
return self.queue.qsize()
Worker Pool
Let's create the worker pool.
Python
# jobq/worker.py
import threading
import time
from typing import Optional, Callable
from .queue import JobQueue
from .job import Job, JobStatus
class Worker:
__init__(self, queue: JobQueue, worker_id: int = 0):
self.queue = queue
self.worker_id = worker_id
self.running = False
self.thread: Optional[threading.Thread] = None
start(self):
self.running = True
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
stop(self):
self.running = False
if self.thread:
self.thread.join()
_run(self):
while self.running:
job = self.queue.dequeue(timeout=1)
if job is None:
continue
self._process_job(job)
_process_job(self, job: Job):
try:
print(f"Worker {${self.worker_id} processing job ${job.id}")
result = job.execute()
if job.status == JobStatus.COMPLETED:
print(f"Job ${job.id} completed successfully")
elif job.status == JobStatus.RETRY:
print(f"Job ${job.id} scheduled for retry ({${job.retry_count}/{${job.max_retries})")
self.queue.enqueue(job)
except Exception as e:
print(f"Job ${job.id} failed: ${e}")
finally:
self.queue.complete_job(job.id)
class WorkerPool:
__init__(self, queue: JobQueue, num_workers: int = 4):
self.queue = queue
self.num_workers = num_workers
self.workers: list = []
start(self):
for i in range(self.num_workers):
worker = Worker(self.queue, worker_id=i)
worker.start()
self.workers.append(worker)
print(f"Started ${self.num_workers} workers")
stop(self):
for worker in self.workers:
worker.stop()
self.workers.clear()
print("All workers stopped")
Persistence
Let's add job persistence with Redis.
Python
# jobq/store.py
import json
import redis
from typing import Optional, List, Dict
from datetime import datetime
from .job import Job, JobStatus
class JobStore:
__init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
self.redis = redis.Redis(
host=redis_host,
port=redis_port,
db=0,
decode_responses=False
)
self.prefix = "jobq:"
_make_key(self, key: str) -> str:
return f"${self.prefix}${key}"
save_job(self, job: Job):
key = self._make_key(f"job:${job.id}")
data = {
'id': job.id,
'func': job.func.__name__ if callable(job.func) else str(job.func),
'args': str(job.args),
'kwargs': str(job.kwargs),
'priority': job.priority,
'max_retries': job.max_retries,
'timeout': job.timeout,
'status': job.status.value,
'created_at': job.created_at.isoformat(),
'retry_count': job.retry_count,
'result': str(job.result) if job.result else None,
'error': job.error
}
self.redis.set(key, json.dumps(data))
get_job(self, job_id: str) -> Optional[Dict]:
key = self._make_key(f"job:${job_id}")
data = self.redis.get(key)
if data:
return json.loads(data)
return None
get_all_jobs(self, limit: int = 100) -> List[Dict]:
keys = self.redis.keys(self._make_key("job:*"))
jobs = []
for key in keys[:limit]:
data = self.redis.get(key)
if data:
jobs.append(json.loads(data))
return jobs
delete_job(self, job_id: str):
key = self._make_key(f"job:${job_id}")
self.redis.delete(key)
get_stats(self) -> Dict:
keys = self.redis.keys(self._make_key("job:*"))
stats = {
'total': 0,
'completed': 0,
'failed': 0,
'pending': 0,
'processing': 0
}
for key in keys:
data = self.redis.get(key)
if data:
job = json.loads(data)
stats['total'] += 1
status = job.get('status', 'pending')
stats[status] = stats.get(status, 0) + 1
return stats
Job Scheduling
Let's add scheduled job support.
Python
# jobq/scheduler.py
import threading
import time
from datetime import datetime, timedelta
from typing import Callable, Optional, Dict, List
from enum import Enum
class ScheduleType(Enum):
ONCE = "once"
INTERVAL = "interval"
CRON = "cron"
class ScheduledJob:
__init__(self,
job_id: str,
func: Callable,
args: tuple = (),
kwargs: Dict = None,
schedule_type: ScheduleType = ScheduleType.ONCE,
interval: int = None,
run_at: datetime = None):
self.job_id = job_id
self.func = func
self.args = args
self.kwargs = kwargs or {}
self.schedule_type = schedule_type
self.interval = interval
self.run_at = run_at
self.enabled = True
self.last_run = None
self.next_run = run_at
should_run(self) -> bool:
if not self.enabled:
return False
if self.next_run and datetime.now() >= self.next_run:
return True
return False
mark_run(self):
self.last_run = datetime.now()
if self.schedule_type == ScheduleType.INTERVAL and self.interval:
self.next_run = datetime.now() + timedelta(seconds=self.interval)
elif self.schedule_type == ScheduleType.ONCE:
self.enabled = False
class Scheduler:
__init__(self, queue):
self.queue = queue
self.scheduled_jobs: Dict[str, ScheduledJob] = {}
self.running = False
self.thread: Optional[threading.Thread] = None
schedule_once(self, job_id: str, func: Callable, run_at: datetime,
args: tuple = (), kwargs: Dict = None) -> str:
job = ScheduledJob(
job_id, func, args, kwargs,
schedule_type=ScheduleType.ONCE,
run_at=run_at
)
self.scheduled_jobs[job_id] = job
return job_id
schedule_interval(self, job_id: str, func: Callable, interval: int,
args: tuple = (), kwargs: Dict = None) -> str:
job = ScheduledJob(
job_id, func, args, kwargs,
schedule_type=ScheduleType.INTERVAL,
interval=interval,
run_at=datetime.now()
)
self.scheduled_jobs[job_id] = job
return job_id
cancel(self, job_id: str) -> bool:
if job_id in self.scheduled_jobs:
self.scheduled_jobs[job_id].enabled = False
del self.scheduled_jobs[job_id]
return True
return False
start(self):
self.running = True
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
print("Scheduler started")
stop(self):
self.running = False
if self.thread:
self.thread.join()
print("Scheduler stopped")
_run(self):
while self.running:
for job_id, job in list(self.scheduled_jobs.items()):
if job.should_run():
from .job import Job
new_job = Job(job.func, job.args, job.kwargs, id=job_id)
self.queue.enqueue(new_job)
job.mark_run()
time.sleep(1)
Testing
Python
# main.py
from jobq.queue import JobQueue
from jobq.worker import WorkerPool
from jobq.job import Job
from jobq.scheduler import Scheduler
from datetime import datetime, timedelta
import time
# Define some tasks
def send_email(to, subject):
print(f"Sending email to ${to}: ${subject}")
time.sleep(2)
return "Email sent"
def process_data(data):
print(f"Processing data: ${data}")
time.sleep(1)
return f"Processed ${data} successfully"
# Create queue and workers
queue = JobQueue()
pool = WorkerPool(queue, num_workers=2)
pool.start()
# Enqueue some jobs
job1 = Job(send_email, ("user@example.com", "Hello"), priority=1)
job2 = Job(process_data, ("report.csv",), priority=2)
job3 = Job(process_data, ("image.png",), priority=3)
queue.enqueue(job1)
queue.enqueue(job2)
queue.enqueue(job3)
# Schedule a job
scheduler = Scheduler(queue)
scheduler.schedule_interval("periodic_task", process_data, interval=10, args=("scheduled",))
scheduler.start()
# Wait for jobs to complete
time.sleep(5)
# Print stats
print("\nQueue stats:", queue.get_stats())
# Stop everything
pool.stop()
scheduler.stop()
Testing Checklist
- Jobs are queued correctly
- Workers process jobs
- Retry mechanism works
- Scheduled jobs run
- Stats are tracked
Summary
Congratulations! You've built a complete background job queue system.
What You Built
- Job Queue - Priority-based queue
- Workers - Thread pool for processing
- Persistence - Redis-based storage
- Scheduler - Job scheduling
- Retry Logic - Automatic retries
Next Steps
- Add distributed workers
- Implement job priorities
- Add job dependencies
- Implement dead letter queue