← Back to Tutorials
Python

Build a Web Crawler from Scratch

Difficulty: Intermediate Est. Time: ~4 hours

Introduction

Web crawlers are the backbone of search engines and data collection systems. They systematically browse the web, collecting information from millions of pages.

In this tutorial, we'll build a web crawler from scratch that can fetch pages, extract links, manage URLs, and store the collected data.

What You'll Build
  • A multi-threaded web crawler
  • HTML parser with link extraction
  • URL queue management system
  • Politeness policies to respect servers
  • Data storage for crawled content
What You'll Learn
  • How web crawlers work
  • HTTP requests and response handling
  • HTML parsing with BeautifulSoup
  • URL normalization and deduplication
  • Robots.txt compliance

Core Concepts

Before building, let's understand the key concepts behind web crawling.

How Web Crawlers Work

A web crawler starts with a list of URLs to visit, called the seed URLs. As it visits each URL, it extracts all links on that page and adds them to the queue. This process repeats, creating a breadth-first or depth-first traversal of the web.

Key Components

  • URL Frontier - The queue of URLs to visit
  • Fetcher - Downloads web pages
  • Parser - Extracts content and links
  • Duplicate Filter - Prevents revisiting URLs
  • Storage - Saves crawled data

Crawler Ethics

Responsible crawlers respect:

  • Robots.txt files - Site owner preferences
  • Rate limiting - Don't overload servers
  • User agents - Identify your crawler
  • Copyright - Don't misuse collected data

Project Setup

Bash
# Create project directory
mkdir web-crawler
cd web-crawler

# Create virtual environment
python -m venv venv

# Activate virtual environment
# Windows:
venv\Scripts\activate
# Mac/Linux:
source venv/bin/activate

# Install dependencies
pip install requests beautifulsoup4 lxml urllib3 robots
pip install colorama tqdm

Project Structure

File Structure
web-crawler/
├── crawler/
│   ├── __init__.py
│   ├── fetcher.py
│   ├── parser.py
│   ├── frontier.py
│   ├── robots.py
│   └── storage.py
├── main.py
├── requirements.txt
└── data/

Fetching Web Pages

Let's create the fetcher module to download web pages.

Python
# crawler/fetcher.py
import requests
from urllib.parse import urlparse, urljoin
import time
from typing import Optional, Dict

class Fetcher:
    __init__(self, delay: float = 1.0, timeout: int = 30):
        self.delay = delay
        self.timeout = timeout
        self.last_request_time = {}
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'MyWebCrawler/1.0 (Educational Purpose)'
        })
    
    fetch(self, url: str) -> Optional[Dict]:
        if not self._can_fetch(url):
            return None
        
        self._wait_if_needed(url)
        
        try:
            response = self.session.get(
                url,
                timeout=self.timeout,
                allow_redirects=True
            )
            self.last_request_time[self._get_domain(url)] = time.time()
            
            if response.status_code == 200:
                return {
                    'url': response.url,
                    'status_code': response.status_code,
                    'content': response.content,
                    'text': response.text,
                    'headers': dict(response.headers),
                    'encoding': response.encoding
                }
            else:
                return {
                    'url': url,
                    'status_code': response.status_code,
                    'error': f"HTTP ${response.status_code}"
                }
        except requests.RequestException as e:
            return {
                'url': url,
                'error': str(e)
            }
    
    _can_fetch(self, url: str) -> bool:
        domain = self._get_domain(url)
        if domain not in self.last_request_time:
            return True
        return True
    
    _wait_if_needed(self, url: str):
        domain = self._get_domain(url)
        if domain in self.last_request_time:
            elapsed = time.time() - self.last_request_time[domain]
            if elapsed < self.delay:
                time.sleep(self.delay - elapsed)
    
    _get_domain(self, url: str) -> str:
        return urlparse(url).netloc
    
    close(self):
        self.session.close()

Parsing HTML

Now let's create the parser to extract content and links from HTML.

Python
# crawler/parser.py
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse, urlunparse
from typing import List, Dict, Set
import re

class Parser:
    __init__(self):
        self.soup = None
        self.base_url = None
    
    parse(self, html: str, base_url: str) -> Dict:
        self.soup = BeautifulSoup(html, 'lxml')
        self.base_url = base_url
        
        return {
            'title': self._extract_title(),
            'headings': self._extract_headings(),
            'links': self._extract_links(),
            'images': self._extract_images(),
            'metadata': self._extract_metadata(),
            'text': self._extract_text()
        }
    
    _extract_title(self) -> str:
        title_tag = self.soup.find('title')
        return title_tag.get_text(strip=True) if title_tag else ''
    
    _extract_headings(self) -> Dict[str, List[str]]:
        headings = {}
        for level in range(1, 7):
            tags = self.soup.find_all(f'h{level}')
            headings[f'h{level}'] = [tag.get_text(strip=True) for tag in tags]
        return headings
    
    _extract_links(self) -> List[Dict[str, str]]:
        links = []
        for a_tag in self.soup.find_all('a', href=True):
            href = a_tag['href']
            absolute_url = urljoin(self.base_url, href)
            normalized_url = self._normalize_url(absolute_url)
            
            if normalized_url:
                links.append({
                    'url': normalized_url,
                    'text': a_tag.get_text(strip=True),
                    'title': a_tag.get('title', '')
                })
        return links
    
    _extract_images(self) -> List[Dict[str, str]]:
        images = []
        for img in self.soup.find_all('img'):
            src = img.get('src', '')
            if src:
                absolute_url = urljoin(self.base_url, src)
                images.append({
                    'url': absolute_url,
                    'alt': img.get('alt', '')
                })
        return images
    
    _extract_metadata(self) -> Dict[str, str]:
        metadata = {}
        
        meta_tags = self.soup.find_all('meta')
        for tag in meta_tags:
            name = tag.get('name') or tag.get('property')
            content = tag.get('content')
            if name and content:
                metadata[name] = content
        
        return metadata
    
    _extract_text(self) -> str:
        for script in self.soup(['script', 'style']):
            script.decompose()
        text = self.soup.get_text(separator=' ', strip=True)
        return re.sub(r'\s+', ' ', text)
    
    _normalize_url(self, url: str) -> str:
        parsed = urlparse(url)
        
        if parsed.scheme not in ['http', 'https']:
            return None
        
        netloc = parsed.netloc.lower()
        
        path = parsed.path
        if path == '':
            path = '/'
        
        normalized = urlunparse((
            parsed.scheme,
            netloc,
            path,
            parsed.params,
            parsed.query,
            ''
        ))
        
        return normalized

URL Queue Management

Let's create the URL frontier to manage the crawl queue.

Python
# crawler/frontier.py
from collections import deque
from threading import Lock
from typing import Optional, Set
import time

class URLFrontier:
    __init__(self):
        self.queue = deque()
        self.in_progress: Set[str] = set()
        self.completed: Set[str] = set()
        self.failed: dict = {}
        self.lock = Lock()
        self.domain_times: dict = {}
        self.domain_delays: dict = {}
    
    add(self, url: str, priority: int = 0):
        with self.lock:
            if url not in self.completed and url not in self.in_progress:
                self.queue.append((priority, time.time(), url))
    
    add_many(self, urls: list):
        for url in urls:
            self.add(url)
    
    get_next(self) -> Optional[str]:
        with self.lock:
            while self.queue:
                priority, timestamp, url = self.queue.popleft()
                
                if url not in self.completed and url not in self.in_progress:
                    self.in_progress.add(url)
                    return url
            
            return None
    
    mark_completed(self, url: str):
        with self.lock:
            if url in self.in_progress:
                self.in_progress.remove(url)
            self.completed.add(url)
    
    mark_failed(self, url: str, error: str = None):
        with self.lock:
            if url in self.in_progress:
                self.in_progress.remove(url)
            self.failed[url] = {
                'error': error,
                'timestamp': time.time()
            }
    
    is_empty(self) -> bool:
        with self.lock:
            return len(self.queue) == 0
    
    size(self) -> int:
        with self.lock:
            return len(self.queue)
    
    get_stats(self) -> dict:
        with self.lock:
            return {
                'queued': len(self.queue),
                'in_progress': len(self.in_progress),
                'completed': len(self.completed),
                'failed': len(self.failed)
            }

Crawler Politeness

Let's implement robots.txt compliance and rate limiting.

Python
# crawler/robots.py
import requests
from urllib.parse import urlparse, urljoin
from typing import Optional, Dict
import time
import logging

class RobotFileParser:
    __init__(self):
        self.rules: Dict[str, list] = {}
        self.sitemaps: list = []
        default_delay: int = 1
    
    can_fetch(self, url: str, user_agent: str = '*') -> bool:
        parsed = urlparse(url)
        domain = parsed.netloc
        
        if domain not in self.rules:
            return True
        
        path = parsed.path or '/'
        if not path.startswith('/'):
            path = '/' + path
        
        rules = self.rules.get(domain, [])
        
        for rule in rules:
            if rule['pattern'].match(path):
                return rule['allowed']
        
        return True
    
    get_delay(self, user_agent: str = '*') -> int:
        return self.default_delay


class RobotsChecker:
    __init__(self):
        self.parsers: Dict[str, RobotFileParser] = {}
        self.cache: Dict[str, tuple] = {}
        self.session = requests.Session()
    
    can_fetch(self, url: str, user_agent: str = '*') -> bool:
        parsed = urlparse(url)
        domain = parsed.netloc
        
        if domain not in self.parsers:
            self._fetch_robots_txt(domain)
        
        parser = self.parsers.get(domain)
        if parser:
            return parser.can_fetch(url, user_agent)
        
        return True
    
    _fetch_robots_txt(self, domain: str):
        if domain in self.cache:
            timestamp, parser = self.cache[domain]
            if time.time() - timestamp < 3600:
                self.parsers[domain] = parser
                return
        
        parser = RobotFileParser()
        
        try:
            robots_url = f"https://${domain}/robots.txt"
            response = self.session.get(robots_url, timeout=10)
            
            if response.status_code == 200:
                parser = self._parse_robots_txt(response.text)
        except Exception as e:
            logging.warning(f"Failed to fetch robots.txt: ${e}")
        
        self.parsers[domain] = parser
        self.cache[domain] = (time.time(), parser)
    
    _parse_robots_txt(self, content: str) -> RobotFileParser:
        parser = RobotFileParser()
        current_user_agent = None
        
        for line in content.splitlines():
            line = line.strip()
            
            if not line or line.startswith('#'):
                continue
            
            if ':' not in line:
                continue
            
            key, value = line.split(':', 1)
            key = key.strip().lower()
            value = value.strip()
            
            if key == 'user-agent':
                current_user_agent = value
            elif key == 'disallow':
                if current_user_agent:
                    import re
                    pattern = re.escape(value) if value else r'.*'
                    pattern = pattern.replace(r'\*', '.*')
                    if current_user_agent not in parser.rules:
                        parser.rules[current_user_agent] = []
                    parser.rules[current_user_agent].append({
                        'pattern': re.compile('^' + pattern),
                        'allowed': False
                    })
            elif key == 'allow':
                if current_user_agent:
                    import re
                    pattern = re.escape(value).replace(r'\*', '.*')
                    if current_user_agent not in parser.rules:
                        parser.rules[current_user_agent] = []
                    parser.rules[current_user_agent].append({
                        'pattern': re.compile('^' + pattern),
                        'allowed': True
                    })
        
        return parser
    
    close(self):
        self.session.close()
Ethical Crawling
  • Always respect robots.txt
  • Add delays between requests
  • Identify your crawler with User-Agent
  • Don't crawl more than necessary

Data Storage

Let's create a storage system for crawled data.

Python
# crawler/storage.py
import json
import os
from datetime import datetime
from typing import Dict, List
import sqlite3

class CrawlStorage:
    __init__(self, db_path: str = 'crawled_data.db'):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self._create_tables()
    
    _create_tables(self):
        cursor = self.conn.cursor()
        
        cursor.execute('''CREATE TABLE IF NOT EXISTS pages (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            url TEXT UNIQUE NOT NULL,
            title TEXT,
            content TEXT,
            fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            status_code INTEGER,
            error TEXT
        )''')
        
        cursor.execute('''CREATE TABLE IF NOT EXISTS links (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            source_url TEXT NOT NULL,
            target_url TEXT NOT NULL,
            link_text TEXT,
            fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (source_url) REFERENCES pages(url)
        )''')
        
        cursor.execute('''CREATE INDEX IF NOT EXISTS idx_url ON pages(url)''')
        cursor.execute('''CREATE INDEX IF NOT EXISTS idx_links ON links(source_url)''')
        
        self.conn.commit()
    
    save_page(self, url: str, data: Dict):
        cursor = self.conn.cursor()
        
        cursor.execute('''INSERT OR REPLACE INTO pages 
            (url, title, content, status_code, error) 
            VALUES (?, ?, ?, ?, ?)''',
            (
                url,
                data.get('title', ''),
                data.get('text', ''),
                data.get('status_code'),
                data.get('error')
            )
        )
        
        if 'links' in data:
            for link in data['links']:
                cursor.execute('''INSERT INTO links (source_url, target_url, link_text) 
                    VALUES (?, ?, ?)''',
                    (url, link.get('url'), link.get('text'))
                )
        
        self.conn.commit()
    
    get_page(self, url: str) -> Dict:
        cursor = self.conn.cursor()
        cursor.execute(SELECT * FROM pages WHERE url = ?, (url,))
        row = cursor.fetchone()
        
        if row:
            return {
                'id': row[0],
                'url': row[1],
                'title': row[2],
                'content': row[3],
                'fetched_at': row[4],
                'status_code': row[5],
                'error': row[6]
            }
        return None
    
    get_all_pages(self, limit: int = 100) -> List[Dict]:
        cursor = self.conn.cursor()
        cursor.execute(SELECT * FROM pages LIMIT ?, (limit,))
        
        return [{
            'id': row[0],
            'url': row[1],
            'title': row[2],
            'fetched_at': row[4]
        } for row in cursor.fetchall()]
    
    close(self):
        self.conn.close()

Testing

Let's put it all together in the main crawler script.

Python
# main.py
from crawler.fetcher import Fetcher
from crawler.parser import Parser
from crawler.frontier import URLFrontier
from crawler.robots import RobotsChecker
from crawler.link_filter import LinkFilter
from crawler.storage import CrawlStorage
import time

class WebCrawler:
    __init__(self, start_urls, max_pages=100, delay=1.0):
        self.fetcher = Fetcher(delay=delay)
        self.parser = Parser()
        self.frontier = URLFrontier()
        self.robots_checker = RobotsChecker()
        self.link_filter = LinkFilter()
        self.storage = CrawlStorage()
        self.max_pages = max_pages
        self.crawled_count = 0
        
        for url in start_urls:
            self.frontier.add(url)
    
    run(self):
        print(f"Starting crawl with ${self.max_pages} max pages...")
        
        while self.crawled_count < self.max_pages and not self.frontier.is_empty():
            url = self.frontier.get_next()
            
            if not url:
                break
            
            if not self.robots_checker.can_fetch(url):
                print(f"Blocked by robots.txt: ${url}")
                self.frontier.mark_completed(url)
                continue
            
            print(f"Crawling: ${url}")
            
            result = self.fetcher.fetch(url)
            
            if result and 'error' not in result:
                parsed_data = self.parser.parse(
                    result['text'], 
                    result['url']
                )
                
                self.storage.save_page(url, parsed_data)
                
                valid_links = self.link_filter.filter_links(
                    parsed_data.get('links', [])
                )
                self.frontier.add_many(valid_links)
                
                self.crawled_count += 1
                print(f"Crawled ${self.crawled_count}/${self.max_pages} pages")
            else:
                error = result.get('error', 'Unknown error') if result else 'Fetch failed'
                print(f"Failed: ${url} - ${error}")
                self.frontier.mark_failed(url, error)
            
            stats = self.frontier.get_stats()
            print(f"Queue: ${stats['queued']}, Completed: ${stats['completed']}")
        
        print("Crawl complete!")
        self.close()
    
    close(self):
        self.fetcher.close()
        self.robots_checker.close()
        self.storage.close()


if __name__ == '__main__':
    crawler = WebCrawler(
        start_urls=['https://example.com'],
        max_pages=10,
        delay=1.0
    )
    crawler.run()
Testing Checklist
  • Crawler starts from seed URLs
  • Links are extracted and queued
  • Robots.txt is respected
  • Rate limiting prevents server overload
  • Data is stored in database

Summary

Congratulations! You've built a complete web crawler from scratch.

What You Built

  • Fetcher - Downloads web pages with rate limiting
  • Parser - Extracts content, links, and metadata
  • URL Frontier - Manages crawl queue
  • Robots Checker - Respects robots.txt
  • Link Filter - Filters and deduplicates URLs
  • Storage - SQLite database for crawled data

Next Steps

  • Add multi-threading for faster crawling
  • Implement depth limiting
  • Add support for JavaScript-rendered pages
  • Implement distributed crawling

Continue Learning

Try these tutorials:

  • Build a Web Scraper
  • Build a Simple Search Engine
  • Build a Log Monitoring System