← Back to Tutorials
Python

Build a Task Scheduler

Difficulty: Intermediate Est. Time: ~4 hours

Introduction

Task schedulers automate the execution of jobs at specific times or intervals. They're used for background processing, maintenance tasks, report generation, and system monitoring.

In this tutorial, we'll build a complete task scheduler with multiple trigger types, job persistence, and scheduling features similar to cron or Celery Beat.

What You'll Build
  • Event-driven scheduler
  • Multiple trigger types
  • Job persistence
  • Job execution engine
  • Scheduling API
What You'll Learn
  • How task schedulers work
  • Different scheduling strategies
  • Thread pool execution
  • Job state management

Core Concepts

Triggers

Triggers determine when a job should run:

  • Date/Time - Run once at specific datetime
  • Interval - Run repeatedly at fixed intervals
  • Cron - Run based on cron expression

Jobs

Jobs are the units of work to be executed. They contain the function to run and metadata like name, description, and retry policy.

Execution

Jobs can be executed synchronously or in a thread pool for concurrent processing.

Project Overview

Our scheduler will support:

Feature Description
Date Trigger One-time execution
Interval Trigger Fixed interval execution
Cron Trigger Cron expression scheduling
Thread Pool Concurrent job execution

Prerequisites

  • Python 3.8+ - Installed on your system
  • Basic threading knowledge

Task Scheduler

Create scheduler/scheduler.py:

import threading
import time
from datetime import datetime, timedelta
from typing import Callable, Dict, List, Optional, Any
from dataclasses import dataclass, field
from queue import PriorityQueue
import uuid


class Trigger:
    def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
        raise NotImplementedError

    def __repr__(self):
        return self.__class__.__name__


class DateTrigger(Trigger):
    def __init__(self, run_date: datetime):
        self.run_date = run_date

    def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
        if last_fire_time is None:
            return self.run_date
        return None


class IntervalTrigger(Trigger):
    def __init__(self, weeks: int = 0, days: int = 0, hours: int = 0, 
                 minutes: int = 0, seconds: int = 0, start_date: datetime = None):
        self.interval = timedelta(weeks=weeks, days=days, hours=hours, 
                                  minutes=minutes, seconds=seconds)
        self.start_date = start_date or datetime.now()

    def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
        if last_fire_time is None:
            next_time = self.start_date
        else:
            next_time = last_fire_time + self.interval
        
        if next_time < datetime.now():
            next_time = datetime.now() + self.interval
        
        return next_time


class CronTrigger(Trigger):
    def __init__(self, expression: str):
        self.expression = expression
        self._parse_expression()

    def _parse_expression(self):
        parts = self.expression.split()
        if len(parts) != 5:
            raise ValueError("Cron expression must have 5 parts")
        
        self.minute, self.hour, self.day, self.month, self.dow = parts

    def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
        from croniter import croniter
        
        base = last_fire_time or datetime.now()
        cron = croniter(self.expression, base)
        return cron.get_next(datetime)

Trigger Types

Create additional triggers:

class DailyTrigger(Trigger):
    def __init__(self, hour: int = 0, minute: int = 0):
        self.hour = hour
        self.minute = minute

    def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
        now = datetime.now()
        
        if last_fire_time:
            next_time = last_fire_time + timedelta(days=1)
        else:
            today = now.replace(hour=self.hour, minute=self.minute, second=0, microsecond=0)
            if today <= now:
                next_time = today + timedelta(days=1)
            else:
                next_time = today
        
        return next_time


class WeeklyTrigger(Trigger):
    def __init__(self, day_of_week: int = 0, hour: int = 0, minute: int = 0):
        self.day_of_week = day_of_week
        self.hour = hour
        self.minute = minute

    def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> None:
        now = datetime.now()
        
        days_ahead = self.day_of_week - now.weekday()
        if days_ahead <= 0:
            days_ahead += 7
        
        next_time = now + timedelta(days=days_ahead)
        next_time = next_time.replace(hour=self.hour, minute=self.minute, second=0, microsecond=0)
        
        if last_fire_time:
            next_time = last_fire_time + timedelta(weeks=1)
        
        return next_time


class MonthlyTrigger(Trigger):
    def __init__(self, day: int = 1, hour: int = 0, minute: int = 0):
        self.day = day
        self.hour = hour
        self.minute = minute

    def get_next_fire_time(self, last_fire_time: Optional[datetime] = None) -> Optional[datetime]:
        now = datetime.now()
        
        if last_fire_time:
            month = last_fire_time.month + 1
            year = last_fire_time.year
            if month > 12:
                month = 1
                year += 1
        else:
            month = now.month
            year = now.year
        
        try:
            next_time = datetime(year, month, self.day, self.hour, self.minute)
        except ValueError:
            next_time = datetime(year, month + 1 if month < 12 else 1, 1, self.hour, self.minute)
        
        if next_time < now:
            return self.get_next_fire_time(next_time)
        
        return next_time

Job Management

Create job and executor:

@dataclass
class Job:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ''
    func: Callable = None
    args: tuple = ()
    kwargs: dict = field(default_factory=dict)
    trigger: Trigger = None
    max_instances: int = 1
    next_run_time: Optional[datetime] = None
    job_state: str = 'pending'
    result: Any = None
    error: Optional[Exception] = None
    run_count: int = 0
    max_runs: Optional[int] = None


class JobExecutor:
    def __init__(self, max_workers: int = 4):
        self.max_workers = max_workers
        self.pool = threading.ThreadPoolExecutor(max_workers=max_workers)

    def execute(self, job: Job):
        future = self.pool.submit(self._run_job, job)
        return future

    def _run_job(self, job: Job):
        try:
            job.job_state = 'running'
            job.result = job.func(*job.args, **job.kwargs)
            job.job_state = 'completed'
            job.error = None
        except Exception as e:
            job.job_state = 'failed'
            job.error = e
        finally:
            job.run_count += 1
        
        return job.result

    def shutdown(self, wait: bool = True):
        self.pool.shutdown(wait=wait)

Persistence

Add job store for persistence:

import json
import os
from typing import List


class JobStore:
    def __init__(self, storage_path: str = '.scheduler_jobs.json'):
        self.storage_path = storage_path
        self.jobs: Dict[str, Job] = {}

    def add(self, job: Job):
        self.jobs[job.id] = job
        self.save()

    def remove(self, job_id: str):
        if job_id in self.jobs:
            del self.jobs[job_id]
            self.save()

    def get(self, job_id: str) -> Optional[Job]:
        return self.jobs.get(job_id)

    def get_all(self) -> List[Job]:
        return list(self.jobs.values())

    def get_next_jobs(self, limit: int = 10) -> List[Job]:
        pending = [j for j in self.jobs.values() if j.job_state == 'pending']
        pending.sort(key=lambda j: j.next_run_time or datetime.max)
        return pending[:limit]

    def save(self):
        data = []
        for job in self.jobs.values():
            data.append({
                'id': job.id,
                'name': job.name,
                'args': job.args,
                'kwargs': job.kwargs,
                'max_instances': job.max_instances,
                'max_runs': job.max_runs,
            })
        
        with open(self.storage_path, 'w') as f:
            json.dump(data, f)

    def load(self):
        if not os.path.exists(self.storage_path):
            return
        
        with open(self.storage_path, 'r') as f:
            data = json.load(f)
        
        for job_data in data:
            job = Job(**job_data)
            self.jobs[job.id] = job

Testing the Scheduler

Create a complete scheduler application:

from scheduler import Scheduler, Job, IntervalTrigger, DateTrigger, CronTrigger

scheduler = Scheduler()

def send_email(to, subject):
    print(f"Sending email to {to}: {subject}")
    return "Email sent"

def generate_report():
    print("Generating daily report...")
    return "Report generated"

def cleanup():
    print("Running cleanup task...")
    return "Cleanup complete"

job1 = Job(
    name='send-welcome-email',
    func=send_email,
    args=('user@example.com', 'Welcome!'),
    trigger=DateTrigger(datetime.now() + timedelta(seconds=10))
)
scheduler.add_job(job1)

job2 = Job(
    name='daily-report',
    func=generate_report,
    trigger=IntervalTrigger(hours=24)
)
scheduler.add_job(job2)

job3 = Job(
    name='hourly-check',
    func=cleanup,
    trigger=IntervalTrigger(hours=1)
)
scheduler.add_job(job3)

job4 = Job(
    name='weekly-newsletter',
    func=send_email,
    args=('subscribers@example.com', 'Weekly Newsletter'),
    trigger=CronTrigger('0 9 * * 1')
)
scheduler.add_job(job4)

print(f"Scheduled {len(scheduler.get_jobs())} jobs")
print("Starting scheduler...")

scheduler.start()

time.sleep(30)

scheduler.shutdown()
# Schedule using decorator
from scheduler import Scheduler

scheduler = Scheduler()

@scheduler.scheduled_job(IntervalTrigger(minutes=5))
def check_health():
    return "System healthy"

@scheduler.scheduled_job(CronTrigger('0 * * * *'))
def hourly_task():
    print("Running hourly task")

scheduler.start()

Summary

Congratulations! You've built a complete task scheduler. Here's what you learned:

  • Triggers - How to create different trigger types
  • Jobs - How to define and manage jobs
  • Execution - How to execute jobs in thread pools
  • Persistence - How to save and load job state

Possible Extensions

  • Add job dependencies
  • Implement distributed scheduling
  • Add job monitoring dashboard
  • Implement job timeouts