Build a Task Scheduler
Introduction
Task schedulers automate the execution of jobs at specific times or intervals. They're used for background processing, maintenance tasks, report generation, and system monitoring.
In this tutorial, we'll build a complete task scheduler with multiple trigger types, job persistence, and scheduling features similar to cron or Celery Beat.
- Event-driven scheduler
- Multiple trigger types
- Job persistence
- Job execution engine
- Scheduling API
- How task schedulers work
- Different scheduling strategies
- Thread pool execution
- Job state management
Core Concepts
Triggers
Triggers determine when a job should run:
- Date/Time - Run once at specific datetime
- Interval - Run repeatedly at fixed intervals
- Cron - Run based on cron expression
Jobs
Jobs are the units of work to be executed. They contain the function to run and metadata like name, description, and retry policy.
Execution
Jobs can be executed synchronously or in a thread pool for concurrent processing.
Project Overview
Our scheduler will support:
| Feature | Description |
|---|---|
| Date Trigger | One-time execution |
| Interval Trigger | Fixed interval execution |
| Cron Trigger | Cron expression scheduling |
| Thread Pool | Concurrent job execution |
Prerequisites
- Python 3.8+ - Installed on your system
- Basic threading knowledge
Task Scheduler
Create scheduler/scheduler.py:
import threading
import time
from datetime import datetime, timedelta
from typing import Callable, Dict, List, Optional, Any
from dataclasses import dataclass, field
from queue import PriorityQueue
import uuid
class Trigger:
def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
raise NotImplementedError
def __repr__(self):
return self.__class__.__name__
class DateTrigger(Trigger):
def __init__(self, run_date: datetime):
self.run_date = run_date
def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
if last_fire_time is None:
return self.run_date
return None
class IntervalTrigger(Trigger):
def __init__(self, weeks: int = 0, days: int = 0, hours: int = 0,
minutes: int = 0, seconds: int = 0, start_date: datetime = None):
self.interval = timedelta(weeks=weeks, days=days, hours=hours,
minutes=minutes, seconds=seconds)
self.start_date = start_date or datetime.now()
def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
if last_fire_time is None:
next_time = self.start_date
else:
next_time = last_fire_time + self.interval
if next_time < datetime.now():
next_time = datetime.now() + self.interval
return next_time
class CronTrigger(Trigger):
def __init__(self, expression: str):
self.expression = expression
self._parse_expression()
def _parse_expression(self):
parts = self.expression.split()
if len(parts) != 5:
raise ValueError("Cron expression must have 5 parts")
self.minute, self.hour, self.day, self.month, self.dow = parts
def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
from croniter import croniter
base = last_fire_time or datetime.now()
cron = croniter(self.expression, base)
return cron.get_next(datetime)
Trigger Types
Create additional triggers:
class DailyTrigger(Trigger):
def __init__(self, hour: int = 0, minute: int = 0):
self.hour = hour
self.minute = minute
def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
now = datetime.now()
if last_fire_time:
next_time = last_fire_time + timedelta(days=1)
else:
today = now.replace(hour=self.hour, minute=self.minute, second=0, microsecond=0)
if today <= now:
next_time = today + timedelta(days=1)
else:
next_time = today
return next_time
class WeeklyTrigger(Trigger):
def __init__(self, day_of_week: int = 0, hour: int = 0, minute: int = 0):
self.day_of_week = day_of_week
self.hour = hour
self.minute = minute
def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> None:
now = datetime.now()
days_ahead = self.day_of_week - now.weekday()
if days_ahead <= 0:
days_ahead += 7
next_time = now + timedelta(days=days_ahead)
next_time = next_time.replace(hour=self.hour, minute=self.minute, second=0, microsecond=0)
if last_fire_time:
next_time = last_fire_time + timedelta(weeks=1)
return next_time
class MonthlyTrigger(Trigger):
def __init__(self, day: int = 1, hour: int = 0, minute: int = 0):
self.day = day
self.hour = hour
self.minute = minute
def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
now = datetime.now()
if last_fire_time:
month = last_fire_time.month + 1
year = last_fire_time.year
if month > 12:
month = 1
year += 1
else:
month = now.month
year = now.year
try:
next_time = datetime(year, month, self.day, self.hour, self.minute)
except ValueError:
next_time = datetime(year, month + 1 if month < 12 else 1, 1, self.hour, self.minute)
if next_time < now:
return self.get_next_fire_time(next_time)
return next_time
Job Management
Create job and executor:
@dataclass
class Job:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ''
func: Callable = None
args: tuple = ()
kwargs: dict = field(default_factory=dict)
trigger: Trigger = None
max_instances: int = 1
next_run_time: Optional[datetime] = None
job_state: str = 'pending'
result: Any = None
error: Optional[Exception] = None
run_count: int = 0
max_runs: Optional[int] = None
class JobExecutor:
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self.pool = threading.ThreadPoolExecutor(max_workers=max_workers)
def execute(self, job: Job):
future = self.pool.submit(self._run_job, job)
return future
def _run_job(self, job: Job):
try:
job.job_state = 'running'
job.result = job.func(*job.args, **job.kwargs)
job.job_state = 'completed'
job.error = None
except Exception as e:
job.job_state = 'failed'
job.error = e
finally:
job.run_count += 1
return job.result
def shutdown(self, wait: bool = True):
self.pool.shutdown(wait=wait)
Persistence
Add job store for persistence:
import json
import os
from typing import List
class JobStore:
def __init__(self, storage_path: str = '.scheduler_jobs.json'):
self.storage_path = storage_path
self.jobs: Dict[str, Job] = {}
def add(self, job: Job):
self.jobs[job.id] = job
self.save()
def remove(self, job_id: str):
if job_id in self.jobs:
del self.jobs[job_id]
self.save()
def get(self, job_id: str) -> Optional[Job]:
return self.jobs.get(job_id)
def get_all(self) -> List[Job]:
return list(self.jobs.values())
def get_next_jobs(self, limit: int = 10) -> List[Job]:
pending = [j for j in self.jobs.values() if j.job_state == 'pending']
pending.sort(key=lambda j: j.next_run_time or datetime.max)
return pending[:limit]
def save(self):
data = []
for job in self.jobs.values():
data.append({
'id': job.id,
'name': job.name,
'args': job.args,
'kwargs': job.kwargs,
'max_instances': job.max_instances,
'max_runs': job.max_runs,
})
with open(self.storage_path, 'w') as f:
json.dump(data, f)
def load(self):
if not os.path.exists(self.storage_path):
return
with open(self.storage_path, 'r') as f:
data = json.load(f)
for job_data in data:
job = Job(**job_data)
self.jobs[job.id] = job
Testing the Scheduler
Create a complete scheduler application:
from scheduler import Scheduler, Job, IntervalTrigger, DateTrigger, CronTrigger
scheduler = Scheduler()
def send_email(to, subject):
print(f"Sending email to {to}: {subject}")
return "Email sent"
def generate_report():
print("Generating daily report...")
return "Report generated"
def cleanup():
print("Running cleanup task...")
return "Cleanup complete"
job1 = Job(
name='send-welcome-email',
func=send_email,
args=('user@example.com', 'Welcome!'),
trigger=DateTrigger(datetime.now() + timedelta(seconds=10))
)
scheduler.add_job(job1)
job2 = Job(
name='daily-report',
func=generate_report,
trigger=IntervalTrigger(hours=24)
)
scheduler.add_job(job2)
job3 = Job(
name='hourly-check',
func=cleanup,
trigger=IntervalTrigger(hours=1)
)
scheduler.add_job(job3)
job4 = Job(
name='weekly-newsletter',
func=send_email,
args=('subscribers@example.com', 'Weekly Newsletter'),
trigger=CronTrigger('0 9 * * 1')
)
scheduler.add_job(job4)
print(f"Scheduled {len(scheduler.get_jobs())} jobs")
print("Starting scheduler...")
scheduler.start()
time.sleep(30)
scheduler.shutdown()
# Schedule using decorator
from scheduler import Scheduler
scheduler = Scheduler()
@scheduler.scheduled_job(IntervalTrigger(minutes=5))
def check_health():
return "System healthy"
@scheduler.scheduled_job(CronTrigger('0 * * * *'))
def hourly_task():
print("Running hourly task")
scheduler.start()
Summary
Congratulations! You've built a complete task scheduler. Here's what you learned:
- Triggers - How to create different trigger types
- Jobs - How to define and manage jobs
- Execution - How to execute jobs in thread pools
- Persistence - How to save and load job state
Possible Extensions
- Add job dependencies
- Implement distributed scheduling
- Add job monitoring dashboard
- Implement job timeouts