Build a Web Crawler from Scratch
Introduction
Web crawlers are the backbone of search engines and data collection systems. They systematically browse the web, collecting information from millions of pages.
In this tutorial, we'll build a web crawler from scratch that can fetch pages, extract links, manage URLs, and store the collected data.
- A multi-threaded web crawler
- HTML parser with link extraction
- URL queue management system
- Politeness policies to respect servers
- Data storage for crawled content
- How web crawlers work
- HTTP requests and response handling
- HTML parsing with BeautifulSoup
- URL normalization and deduplication
- Robots.txt compliance
Core Concepts
Before building, let's understand the key concepts behind web crawling.
How Web Crawlers Work
A web crawler starts with a list of URLs to visit, called the seed URLs. As it visits each URL, it extracts all links on that page and adds them to the queue. This process repeats, creating a breadth-first or depth-first traversal of the web.
Key Components
- URL Frontier - The queue of URLs to visit
- Fetcher - Downloads web pages
- Parser - Extracts content and links
- Duplicate Filter - Prevents revisiting URLs
- Storage - Saves crawled data
Crawler Ethics
Responsible crawlers respect:
- Robots.txt files - Site owner preferences
- Rate limiting - Don't overload servers
- User agents - Identify your crawler
- Copyright - Don't misuse collected data
Project Setup
# Create project directory
mkdir web-crawler
cd web-crawler
# Create virtual environment
python -m venv venv
# Activate virtual environment
# Windows:
venv\Scripts\activate
# Mac/Linux:
source venv/bin/activate
# Install dependencies
pip install requests beautifulsoup4 lxml urllib3 robots
pip install colorama tqdm
Project Structure
web-crawler/
├── crawler/
│ ├── __init__.py
│ ├── fetcher.py
│ ├── parser.py
│ ├── frontier.py
│ ├── robots.py
│ └── storage.py
├── main.py
├── requirements.txt
└── data/
Fetching Web Pages
Let's create the fetcher module to download web pages.
# crawler/fetcher.py
import requests
from urllib.parse import urlparse, urljoin
import time
from typing import Optional, Dict
class Fetcher:
__init__(self, delay: float = 1.0, timeout: int = 30):
self.delay = delay
self.timeout = timeout
self.last_request_time = {}
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'MyWebCrawler/1.0 (Educational Purpose)'
})
fetch(self, url: str) -> Optional[Dict]:
if not self._can_fetch(url):
return None
self._wait_if_needed(url)
try:
response = self.session.get(
url,
timeout=self.timeout,
allow_redirects=True
)
self.last_request_time[self._get_domain(url)] = time.time()
if response.status_code == 200:
return {
'url': response.url,
'status_code': response.status_code,
'content': response.content,
'text': response.text,
'headers': dict(response.headers),
'encoding': response.encoding
}
else:
return {
'url': url,
'status_code': response.status_code,
'error': f"HTTP ${response.status_code}"
}
except requests.RequestException as e:
return {
'url': url,
'error': str(e)
}
_can_fetch(self, url: str) -> bool:
domain = self._get_domain(url)
if domain not in self.last_request_time:
return True
return True
_wait_if_needed(self, url: str):
domain = self._get_domain(url)
if domain in self.last_request_time:
elapsed = time.time() - self.last_request_time[domain]
if elapsed < self.delay:
time.sleep(self.delay - elapsed)
_get_domain(self, url: str) -> str:
return urlparse(url).netloc
close(self):
self.session.close()
Parsing HTML
Now let's create the parser to extract content and links from HTML.
# crawler/parser.py
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse, urlunparse
from typing import List, Dict, Set
import re
class Parser:
__init__(self):
self.soup = None
self.base_url = None
parse(self, html: str, base_url: str) -> Dict:
self.soup = BeautifulSoup(html, 'lxml')
self.base_url = base_url
return {
'title': self._extract_title(),
'headings': self._extract_headings(),
'links': self._extract_links(),
'images': self._extract_images(),
'metadata': self._extract_metadata(),
'text': self._extract_text()
}
_extract_title(self) -> str:
title_tag = self.soup.find('title')
return title_tag.get_text(strip=True) if title_tag else ''
_extract_headings(self) -> Dict[str, List[str]]:
headings = {}
for level in range(1, 7):
tags = self.soup.find_all(f'h{level}')
headings[f'h{level}'] = [tag.get_text(strip=True) for tag in tags]
return headings
_extract_links(self) -> List[Dict[str, str]]:
links = []
for a_tag in self.soup.find_all('a', href=True):
href = a_tag['href']
absolute_url = urljoin(self.base_url, href)
normalized_url = self._normalize_url(absolute_url)
if normalized_url:
links.append({
'url': normalized_url,
'text': a_tag.get_text(strip=True),
'title': a_tag.get('title', '')
})
return links
_extract_images(self) -> List[Dict[str, str]]:
images = []
for img in self.soup.find_all('img'):
src = img.get('src', '')
if src:
absolute_url = urljoin(self.base_url, src)
images.append({
'url': absolute_url,
'alt': img.get('alt', '')
})
return images
_extract_metadata(self) -> Dict[str, str]:
metadata = {}
meta_tags = self.soup.find_all('meta')
for tag in meta_tags:
name = tag.get('name') or tag.get('property')
content = tag.get('content')
if name and content:
metadata[name] = content
return metadata
_extract_text(self) -> str:
for script in self.soup(['script', 'style']):
script.decompose()
text = self.soup.get_text(separator=' ', strip=True)
return re.sub(r'\s+', ' ', text)
_normalize_url(self, url: str) -> str:
parsed = urlparse(url)
if parsed.scheme not in ['http', 'https']:
return None
netloc = parsed.netloc.lower()
path = parsed.path
if path == '':
path = '/'
normalized = urlunparse((
parsed.scheme,
netloc,
path,
parsed.params,
parsed.query,
''
))
return normalized
Link Extraction
Let's add URL filtering and deduplication.
# crawler/link_filter.py
from urllib.parse import urlparse
from typing import Set, List
class LinkFilter:
__init__(self, allowed_domains: List[str] = None,
disallowed_extensions: List[str] = None):
self.allowed_domains = allowed_domains or []
self.disallowed_extensions = disallowed_extensions or [
'.jpg', '.jpeg', '.png', '.gif', '.svg',
'.pdf', '.zip', '.tar', '.gz',
'.mp3', '.mp4', '.avi', '.mov',
'.exe', '.dmg', '.deb', '.rpm'
]
self.seen_urls: Set[str] = set()
is_valid(self, url: str) -> bool:
if url in self.seen_urls:
return False
try:
parsed = urlparse(url)
if parsed.scheme not ['http', 'https']:
return False
if self.allowed_domains:
if parsed.netloc not in self.allowed_domains:
return False
if self._has_disallowed_extension(parsed.path):
return False
self.seen_urls.add(url)
return True
except Exception:
return False
_has_disallowed_extension(self, path: str) -> bool:
for ext in self.disallowed_extensions:
if path.lower().endswith(ext):
return True
return False
filter_links(self, links: List[Dict]) -> List[str]:
valid_urls = []
for link in links:
url = link.get('url')
if url and self.is_valid(url):
valid_urls.append(url)
return valid_urls
reset(self):
self.seen_urls.clear()
URL Queue Management
Let's create the URL frontier to manage the crawl queue.
# crawler/frontier.py
from collections import deque
from threading import Lock
from typing import Optional, Set
import time
class URLFrontier:
__init__(self):
self.queue = deque()
self.in_progress: Set[str] = set()
self.completed: Set[str] = set()
self.failed: dict = {}
self.lock = Lock()
self.domain_times: dict = {}
self.domain_delays: dict = {}
add(self, url: str, priority: int = 0):
with self.lock:
if url not in self.completed and url not in self.in_progress:
self.queue.append((priority, time.time(), url))
add_many(self, urls: list):
for url in urls:
self.add(url)
get_next(self) -> Optional[str]:
with self.lock:
while self.queue:
priority, timestamp, url = self.queue.popleft()
if url not in self.completed and url not in self.in_progress:
self.in_progress.add(url)
return url
return None
mark_completed(self, url: str):
with self.lock:
if url in self.in_progress:
self.in_progress.remove(url)
self.completed.add(url)
mark_failed(self, url: str, error: str = None):
with self.lock:
if url in self.in_progress:
self.in_progress.remove(url)
self.failed[url] = {
'error': error,
'timestamp': time.time()
}
is_empty(self) -> bool:
with self.lock:
return len(self.queue) == 0
size(self) -> int:
with self.lock:
return len(self.queue)
get_stats(self) -> dict:
with self.lock:
return {
'queued': len(self.queue),
'in_progress': len(self.in_progress),
'completed': len(self.completed),
'failed': len(self.failed)
}
Crawler Politeness
Let's implement robots.txt compliance and rate limiting.
# crawler/robots.py
import requests
from urllib.parse import urlparse, urljoin
from typing import Optional, Dict
import time
import logging
class RobotFileParser:
__init__(self):
self.rules: Dict[str, list] = {}
self.sitemaps: list = []
default_delay: int = 1
can_fetch(self, url: str, user_agent: str = '*') -> bool:
parsed = urlparse(url)
domain = parsed.netloc
if domain not in self.rules:
return True
path = parsed.path or '/'
if not path.startswith('/'):
path = '/' + path
rules = self.rules.get(domain, [])
for rule in rules:
if rule['pattern'].match(path):
return rule['allowed']
return True
get_delay(self, user_agent: str = '*') -> int:
return self.default_delay
class RobotsChecker:
__init__(self):
self.parsers: Dict[str, RobotFileParser] = {}
self.cache: Dict[str, tuple] = {}
self.session = requests.Session()
can_fetch(self, url: str, user_agent: str = '*') -> bool:
parsed = urlparse(url)
domain = parsed.netloc
if domain not in self.parsers:
self._fetch_robots_txt(domain)
parser = self.parsers.get(domain)
if parser:
return parser.can_fetch(url, user_agent)
return True
_fetch_robots_txt(self, domain: str):
if domain in self.cache:
timestamp, parser = self.cache[domain]
if time.time() - timestamp < 3600:
self.parsers[domain] = parser
return
parser = RobotFileParser()
try:
robots_url = f"https://${domain}/robots.txt"
response = self.session.get(robots_url, timeout=10)
if response.status_code == 200:
parser = self._parse_robots_txt(response.text)
except Exception as e:
logging.warning(f"Failed to fetch robots.txt: ${e}")
self.parsers[domain] = parser
self.cache[domain] = (time.time(), parser)
_parse_robots_txt(self, content: str) -> RobotFileParser:
parser = RobotFileParser()
current_user_agent = None
for line in content.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
if ':' not in line:
continue
key, value = line.split(':', 1)
key = key.strip().lower()
value = value.strip()
if key == 'user-agent':
current_user_agent = value
elif key == 'disallow':
if current_user_agent:
import re
pattern = re.escape(value) if value else r'.*'
pattern = pattern.replace(r'\*', '.*')
if current_user_agent not in parser.rules:
parser.rules[current_user_agent] = []
parser.rules[current_user_agent].append({
'pattern': re.compile('^' + pattern),
'allowed': False
})
elif key == 'allow':
if current_user_agent:
import re
pattern = re.escape(value).replace(r'\*', '.*')
if current_user_agent not in parser.rules:
parser.rules[current_user_agent] = []
parser.rules[current_user_agent].append({
'pattern': re.compile('^' + pattern),
'allowed': True
})
return parser
close(self):
self.session.close()
- Always respect robots.txt
- Add delays between requests
- Identify your crawler with User-Agent
- Don't crawl more than necessary
Data Storage
Let's create a storage system for crawled data.
# crawler/storage.py
import json
import os
from datetime import datetime
from typing import Dict, List
import sqlite3
class CrawlStorage:
__init__(self, db_path: str = 'crawled_data.db'):
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self._create_tables()
_create_tables(self):
cursor = self.conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS pages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
title TEXT,
content TEXT,
fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status_code INTEGER,
error TEXT
)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_url TEXT NOT NULL,
target_url TEXT NOT NULL,
link_text TEXT,
fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (source_url) REFERENCES pages(url)
)''')
cursor.execute('''CREATE INDEX IF NOT EXISTS idx_url ON pages(url)''')
cursor.execute('''CREATE INDEX IF NOT EXISTS idx_links ON links(source_url)''')
self.conn.commit()
save_page(self, url: str, data: Dict):
cursor = self.conn.cursor()
cursor.execute('''INSERT OR REPLACE INTO pages
(url, title, content, status_code, error)
VALUES (?, ?, ?, ?, ?)''',
(
url,
data.get('title', ''),
data.get('text', ''),
data.get('status_code'),
data.get('error')
)
)
if 'links' in data:
for link in data['links']:
cursor.execute('''INSERT INTO links (source_url, target_url, link_text)
VALUES (?, ?, ?)''',
(url, link.get('url'), link.get('text'))
)
self.conn.commit()
get_page(self, url: str) -> Dict:
cursor = self.conn.cursor()
cursor.execute(SELECT * FROM pages WHERE url = ?, (url,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'url': row[1],
'title': row[2],
'content': row[3],
'fetched_at': row[4],
'status_code': row[5],
'error': row[6]
}
return None
get_all_pages(self, limit: int = 100) -> List[Dict]:
cursor = self.conn.cursor()
cursor.execute(SELECT * FROM pages LIMIT ?, (limit,))
return [{
'id': row[0],
'url': row[1],
'title': row[2],
'fetched_at': row[4]
} for row in cursor.fetchall()]
close(self):
self.conn.close()
Testing
Let's put it all together in the main crawler script.
# main.py
from crawler.fetcher import Fetcher
from crawler.parser import Parser
from crawler.frontier import URLFrontier
from crawler.robots import RobotsChecker
from crawler.link_filter import LinkFilter
from crawler.storage import CrawlStorage
import time
class WebCrawler:
__init__(self, start_urls, max_pages=100, delay=1.0):
self.fetcher = Fetcher(delay=delay)
self.parser = Parser()
self.frontier = URLFrontier()
self.robots_checker = RobotsChecker()
self.link_filter = LinkFilter()
self.storage = CrawlStorage()
self.max_pages = max_pages
self.crawled_count = 0
for url in start_urls:
self.frontier.add(url)
run(self):
print(f"Starting crawl with ${self.max_pages} max pages...")
while self.crawled_count < self.max_pages and not self.frontier.is_empty():
url = self.frontier.get_next()
if not url:
break
if not self.robots_checker.can_fetch(url):
print(f"Blocked by robots.txt: ${url}")
self.frontier.mark_completed(url)
continue
print(f"Crawling: ${url}")
result = self.fetcher.fetch(url)
if result and 'error' not in result:
parsed_data = self.parser.parse(
result['text'],
result['url']
)
self.storage.save_page(url, parsed_data)
valid_links = self.link_filter.filter_links(
parsed_data.get('links', [])
)
self.frontier.add_many(valid_links)
self.crawled_count += 1
print(f"Crawled ${self.crawled_count}/${self.max_pages} pages")
else:
error = result.get('error', 'Unknown error') if result else 'Fetch failed'
print(f"Failed: ${url} - ${error}")
self.frontier.mark_failed(url, error)
stats = self.frontier.get_stats()
print(f"Queue: ${stats['queued']}, Completed: ${stats['completed']}")
print("Crawl complete!")
self.close()
close(self):
self.fetcher.close()
self.robots_checker.close()
self.storage.close()
if __name__ == '__main__':
crawler = WebCrawler(
start_urls=['https://example.com'],
max_pages=10,
delay=1.0
)
crawler.run()
- Crawler starts from seed URLs
- Links are extracted and queued
- Robots.txt is respected
- Rate limiting prevents server overload
- Data is stored in database
Summary
Congratulations! You've built a complete web crawler from scratch.
What You Built
- Fetcher - Downloads web pages with rate limiting
- Parser - Extracts content, links, and metadata
- URL Frontier - Manages crawl queue
- Robots Checker - Respects robots.txt
- Link Filter - Filters and deduplicates URLs
- Storage - SQLite database for crawled data
Next Steps
- Add multi-threading for faster crawling
- Implement depth limiting
- Add support for JavaScript-rendered pages
- Implement distributed crawling