Build a File Indexing System
Introduction
File indexing systems scan directories and build searchable indexes of files and their contents. They're used in desktop search tools, code search engines, and document management systems.
In this tutorial, we'll build a complete file indexing system with full-text search, file watching for live updates, and a command-line interface.
- Directory scanner
- Full-text indexer
- Inverted index for search
- File system watcher
- CLI interface
- How search indexes work
- File system traversal
- Text tokenization
- Real-time file monitoring
Core Concepts
Inverted Index
An inverted index maps terms to the documents containing them. This allows fast keyword searches by looking up a term and retrieving all matching documents.
Tokenization
Tokenization breaks text into individual words (tokens). We normalize tokens by converting to lowercase and removing punctuation.
File Watching
File watchers monitor directories for changes and trigger re-indexing when files are added, modified, or deleted.
Project Overview
Our file indexer will support:
| Feature | Description |
|---|---|
| Directory Scan | Recursive directory traversal |
| Full-Text Search | Search file contents |
| File Types | Text, code, documents |
| Live Updates | Watch for file changes |
Prerequisites
- Python 3.8+ - Installed on your system
- watchdog - pip install watchdog
File Indexer
Create file_indexer/indexer.py:
import os
import re
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Set
from dataclasses import dataclass, field
import json
@dataclass
class Document:
path: str
name: str
extension: str
size: int
modified: datetime
content: str = ''
tokens: Set[str] = field(default_factory=set)
class Tokenizer:
STOPWORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
'of', 'with', 'by', 'from', 'as', 'is', 'was', 'are', 'were', 'been',
'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would',
'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that',
'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they'}
@classmethod
def tokenize(cls, text: str) -> Set[str]:
text = text.lower()
words = re.findall(r'\b[a-z0-9]+\b', text)
tokens = {w for w in words if w not in cls.STOPWORDS and len(w) > 1}
return tokens
class FileIndexer:
SUPPORTED_EXTENSIONS = {
'.txt', '.md', '.py', '.js', '.java', '.c', '.cpp', '.h', '.hpp',
'.cs', '.go', '.rs', '.rb', '.php', '.swift', '.kt', '.scala',
'.html', '.css', '.scss', '.json', '.yaml', '.yml', '.xml', '.sql',
'.sh', '.bash', '.zsh', '.ps1', '.r', '.lua', '.pl', '.toml',
}
def __init__(self):
self.documents: Dict[str, Document] = {}
self.inverted_index: Dict[str, Set[str]] = {}
def index_directory(self, root_path: str, extensions: Set[str] = None):
extensions = extensions or self.SUPPORTED_EXTENSIONS
root = Path(root_path)
for filepath in root.rglob('*'):
if filepath.is_file() and filepath.suffix.lower() in extensions:
self.index_file(str(filepath))
return len(self.documents)
def index_file(self, filepath: str) -> bool:
try:
stat = os.stat(filepath)
path = Path(filepath)
content = self._read_file_content(filepath)
doc = Document(
path=str(path.absolute()),
name=path.name,
extension=path.suffix.lower(),
size=stat.st_size,
modified=datetime.fromtimestamp(stat.st_mtime),
content=content,
tokens=Tokenizer.tokenize(content)
)
self.documents[doc.path] = doc
self._update_inverted_index(doc)
return True
except Exception as e:
return False
def _read_file_content(self, filepath: str) -> str:
encodings = ['utf-8', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(filepath, 'r', encoding=encoding) as f:
return f.read()
except UnicodeDecodeError:
continue
return ''
def _update_inverted_index(self, doc: Document):
for token in doc.tokens:
if token not in self.inverted_index:
self.inverted_index[token] = set()
self.inverted_index[token].add(doc.path)
def remove_document(self, filepath: str):
if filepath in self.documents:
doc = self.documents[filepath]
for token in doc.tokens:
if token in self.inverted_index:
self.inverted_index[token].discard(filepath)
if not self.inverted_index[token]:
del self.inverted_index[token]
del self.documents[filepath]
def save_index(self, filepath: str):
data = {
'documents': {
path: {
'path': doc.path,
'name': doc.name,
'extension': doc.extension,
'size': doc.size,
'modified': doc.modified.isoformat(),
}
for path, doc in self.documents.items()
},
'inverted_index': {
token: list(docs)
for token, docs in self.inverted_index.items()
}
}
with open(filepath, 'w') as f:
json.dump(data, f)
def load_index(self, filepath: str):
with open(filepath, 'r') as f:
data = json.load(f)
self.documents = {}
for path, doc_data in data['documents'].items():
content = self._read_file_content(path)
doc = Document(
path=doc_data['path'],
name=doc_data['name'],
extension=doc_data['extension'],
size=doc_data['size'],
modified=datetime.fromisoformat(doc_data['modified']),
content=content,
tokens=Tokenizer.tokenize(content)
)
self.documents[path] = doc
self.inverted_index = {
token: set(docs)
for token, docs in data['inverted_index'].items()
}
Search Engine
Create file_indexer/search.py:
from typing import List, Dict, Set
from dataclasses import dataclass
from .indexer import Document
@dataclass
class SearchResult:
document: Document
score: float
matches: List[str]
class SearchEngine:
def __init__(self, indexer):
self.indexer = indexer
def search(self, query: str, limit: int = 10) -> List[SearchResult]:
tokens = self._parse_query(query)
if not tokens:
return []
doc_scores = self._calculate_scores(tokens)
results = []
for path, (score, matches) in doc_scores.items():
if path in self.indexer.documents:
doc = self.indexer.documents[path]
results.append(SearchResult(doc, score, matches))
results.sort(key=lambda r: r.score, reverse=True)
return results[:limit]
def _parse_query(self, query: str) -> Set[str]:
import re
query = query.lower()
tokens = set(re.findall(r'\b[a-z0-9]+\b', query))
return tokens
def _calculate_scores(self, tokens: Set[str]) -> Dict[str, tuple]:
scores = {}
for token in tokens:
if token in self.indexer.inverted_index:
matching_docs = self.indexer.inverted_index[token]
idf = self._calculate_idf(len(matching_docs))
for doc_path in matching_docs:
doc = self.indexer.documents[doc_path]
tf = doc.content.lower().split().count(token)
tf_idf = tf * idf
if doc_path not in scores:
scores[doc_path] = (0, [])
old_score, old_matches = scores[doc_path]
old_matches.append(token)
scores[doc_path] = (old_score + tf_idf, old_matches)
return scores
def _calculate_idf(self, doc_count: int) -> float:
total_docs = len(self.indexer.documents)
if doc_count == 0:
return 0
return total_docs / doc_count
def search_by_extension(self, extension: str) -> List[Document]:
results = []
for doc in self.indexer.documents.values():
if doc.extension == extension:
results.append(doc)
return results
def search_by_name(self, name_query: str) -> List[Document]:
results = []
name_query = name_query.lower()
for doc in self.indexer.documents.values():
if name_query in doc.name.lower():
results.append(doc)
return results
def search_regex(self, pattern: str) -> List[SearchResult]:
import re
results = []
for doc in self.indexer.documents.values():
matches = re.findall(pattern, doc.content, re.IGNORECASE)
if matches:
results.append(SearchResult(doc, len(matches), matches[:5]))
results.sort(key=lambda r: r.score, reverse=True)
return results
File Watcher
Create file_indexer/watcher.py:
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileIndexHandler(FileSystemEventHandler):
def __init__(self, indexer):
self.indexer = indexer
def on_created(self, event):
if not event.is_directory:
print(f"File created: {event.src_path}")
self.indexer.index_file(event.src_path)
def on_modified(self, event):
if not event.is_directory:
print(f"File modified: {event.src_path}")
self.indexer.index_file(event.src_path)
def on_deleted(self, event):
if not event.is_directory:
print(f"File deleted: {event.src_path}")
self.indexer.remove_document(event.src_path)
def on_moved(self, event):
if not event.is_directory:
print(f"File moved: {event.src_path} -> {event.dest_path}")
self.indexer.remove_document(event.src_path)
self.indexer.index_file(event.dest_path)
class FileWatcher:
def __init__(self, indexer):
self.indexer = indexer
self.observer = Observer()
self.handler = FileIndexHandler(indexer)
def watch(self, path: str, recursive: bool = True):
self.observer.schedule(self.handler, path, recursive=recursive)
self.observer.start()
print(f"Watching {path} for changes...")
def watch_and_search(self, path: str, query: str, interval: int = 60):
self.watch(path)
try:
while True:
time.sleep(interval)
results = self.indexer.search(query)
print(f"\nFound {len(results)} results for '{query}':")
for result in results[:5]:
print(f" {result.document.path}")
except KeyboardInterrupt:
self.stop()
def stop(self):
self.observer.stop()
self.observer.join()
print("Stopped watching")
CLI Interface
Create command-line interface:
import argparse
import sys
from pathlib import Path
from file_indexer import FileIndexer, SearchEngine, FileWatcher
def main():
parser = argparse.ArgumentParser(description='File Indexing System')
subparsers = parser.add_subparsers(dest='command')
index_parser = subparsers.add_parser('index', help='Index a directory')
index_parser.add_argument('path', help='Directory to index')
index_parser.add_argument('--save', help='Save index to file')
index_parser.add_argument('--extensions', nargs='+', help='File extensions to index')
search_parser = subparsers.add_parser('search', help='Search indexed files')
search_parser.add_argument('query', help='Search query')
search_parser.add_argument('--limit', type=int, default=10, help='Max results')
search_parser.add_argument('--load', help='Load index from file')
watch_parser = subparsers.add_parser('watch', help='Watch directory for changes')
watch_parser.add_argument('path', help='Directory to watch')
watch_parser.add_argument('--query', help='Search query to run periodically')
args = parser.parse_args()
if args.command == 'index':
indexer = FileIndexer()
extensions = set(args.extensions) if args.extensions else None
count = indexer.index_directory(args.path, extensions)
print(f"Indexed {count} files")
if args.save:
indexer.save_index(args.save)
print(f"Index saved to {args.save}")
elif args.command == 'search':
indexer = FileIndexer()
if args.load:
indexer.load_index(args.load)
print(f"Loaded index from {args.load}")
else:
print("No index loaded. Use --load or run 'index' first.")
sys.exit(1)
engine = SearchEngine(indexer)
results = engine.search(args.query, args.limit)
print(f"\nFound {len(results)} results:")
for result in results:
print(f"\n{result.document.name}")
print(f" Path: {result.document.path}")
print(f" Score: {result.score:.2f}")
print(f" Matches: {', '.join(result.matches[:3])}")
elif args.command == 'watch':
indexer = FileIndexer()
print("Initial indexing...")
count = indexer.index_directory(args.path)
print(f"Indexed {count} files")
watcher = FileWatcher(indexer)
if args.query:
watcher.watch_and_search(args.path, args.query)
else:
watcher.watch(args.path)
print("Watching for changes. Press Ctrl+C to stop.")
try:
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
watcher.stop()
else:
parser.print_help()
if __name__ == '__main__':
main()
Testing the Indexer
Use the CLI:
# Index a directory
python cli.py index ./my-project --save index.json
# Search indexed files
python cli.py search "function" --load index.json
# Watch directory for changes
python cli.py watch ./my-project --query "class"
# Filter by extension
python cli.py index ./project --extensions .py .js
# Programmatic usage
from file_indexer import FileIndexer, SearchEngine
indexer = FileIndexer()
indexer.index_directory('./my-project')
engine = SearchEngine(indexer)
results = engine.search('python')
for result in results:
print(f"{result.document.name} (score: {result.score})")
print(f" Path: {result.document.path}")
# Filter by extension
py_files = engine.search_by_extension('.py')
print(f"Found {len(py_files)} Python files")
# Search by filename
readmes = engine.search_by_name('README')
print(f"Found {len(readmes)} README files")
Summary
Congratulations! You've built a complete file indexing system. Here's what you learned:
- Directory Scanning - How to traverse directories
- Tokenization - How to process text into tokens
- Inverted Index - How to build a search index
- File Watching - How to monitor file changes
Possible Extensions
- Add fuzzy matching
- Implement ranking algorithms
- Add document snippets
- Implement parallel indexing