← Back to Tutorials
Python

Build a File Indexing System

Difficulty: Intermediate Est. Time: ~4 hours

Introduction

File indexing systems scan directories and build searchable indexes of files and their contents. They're used in desktop search tools, code search engines, and document management systems.

In this tutorial, we'll build a complete file indexing system with full-text search, file watching for live updates, and a command-line interface.

What You'll Build
  • Directory scanner
  • Full-text indexer
  • Inverted index for search
  • File system watcher
  • CLI interface
What You'll Learn
  • How search indexes work
  • File system traversal
  • Text tokenization
  • Real-time file monitoring

Core Concepts

Inverted Index

An inverted index maps terms to the documents containing them. This allows fast keyword searches by looking up a term and retrieving all matching documents.

Tokenization

Tokenization breaks text into individual words (tokens). We normalize tokens by converting to lowercase and removing punctuation.

File Watching

File watchers monitor directories for changes and trigger re-indexing when files are added, modified, or deleted.

Project Overview

Our file indexer will support:

Feature Description
Directory Scan Recursive directory traversal
Full-Text Search Search file contents
File Types Text, code, documents
Live Updates Watch for file changes

Prerequisites

  • Python 3.8+ - Installed on your system
  • watchdog - pip install watchdog

File Indexer

Create file_indexer/indexer.py:

import os
import re
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Set
from dataclasses import dataclass, field
import json


@dataclass
class Document:
    path: str
    name: str
    extension: str
    size: int
    modified: datetime
    content: str = ''
    tokens: Set[str] = field(default_factory=set)


class Tokenizer:
    STOPWORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
                 'of', 'with', 'by', 'from', 'as', 'is', 'was', 'are', 'were', 'been',
                 'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would',
                 'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that',
                 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they'}

    @classmethod
    def tokenize(cls, text: str) -> Set[str]:
        text = text.lower()
        words = re.findall(r'\b[a-z0-9]+\b', text)
        tokens = {w for w in words if w not in cls.STOPWORDS and len(w) > 1}
        return tokens


class FileIndexer:
    SUPPORTED_EXTENSIONS = {
        '.txt', '.md', '.py', '.js', '.java', '.c', '.cpp', '.h', '.hpp',
        '.cs', '.go', '.rs', '.rb', '.php', '.swift', '.kt', '.scala',
        '.html', '.css', '.scss', '.json', '.yaml', '.yml', '.xml', '.sql',
        '.sh', '.bash', '.zsh', '.ps1', '.r', '.lua', '.pl', '.toml',
    }

    def __init__(self):
        self.documents: Dict[str, Document] = {}
        self.inverted_index: Dict[str, Set[str]] = {}

    def index_directory(self, root_path: str, extensions: Set[str] = None):
        extensions = extensions or self.SUPPORTED_EXTENSIONS
        root = Path(root_path)
        
        for filepath in root.rglob('*'):
            if filepath.is_file() and filepath.suffix.lower() in extensions:
                self.index_file(str(filepath))
        
        return len(self.documents)

    def index_file(self, filepath: str) -> bool:
        try:
            stat = os.stat(filepath)
            path = Path(filepath)
            
            content = self._read_file_content(filepath)
            
            doc = Document(
                path=str(path.absolute()),
                name=path.name,
                extension=path.suffix.lower(),
                size=stat.st_size,
                modified=datetime.fromtimestamp(stat.st_mtime),
                content=content,
                tokens=Tokenizer.tokenize(content)
            )
            
            self.documents[doc.path] = doc
            self._update_inverted_index(doc)
            
            return True
            
        except Exception as e:
            return False

    def _read_file_content(self, filepath: str) -> str:
        encodings = ['utf-8', 'latin-1', 'cp1252']
        
        for encoding in encodings:
            try:
                with open(filepath, 'r', encoding=encoding) as f:
                    return f.read()
            except UnicodeDecodeError:
                continue
        
        return ''

    def _update_inverted_index(self, doc: Document):
        for token in doc.tokens:
            if token not in self.inverted_index:
                self.inverted_index[token] = set()
            self.inverted_index[token].add(doc.path)

    def remove_document(self, filepath: str):
        if filepath in self.documents:
            doc = self.documents[filepath]
            
            for token in doc.tokens:
                if token in self.inverted_index:
                    self.inverted_index[token].discard(filepath)
                    if not self.inverted_index[token]:
                        del self.inverted_index[token]
            
            del self.documents[filepath]

    def save_index(self, filepath: str):
        data = {
            'documents': {
                path: {
                    'path': doc.path,
                    'name': doc.name,
                    'extension': doc.extension,
                    'size': doc.size,
                    'modified': doc.modified.isoformat(),
                }
                for path, doc in self.documents.items()
            },
            'inverted_index': {
                token: list(docs) 
                for token, docs in self.inverted_index.items()
            }
        }
        
        with open(filepath, 'w') as f:
            json.dump(data, f)

    def load_index(self, filepath: str):
        with open(filepath, 'r') as f:
            data = json.load(f)
        
        self.documents = {}
        for path, doc_data in data['documents'].items():
            content = self._read_file_content(path)
            doc = Document(
                path=doc_data['path'],
                name=doc_data['name'],
                extension=doc_data['extension'],
                size=doc_data['size'],
                modified=datetime.fromisoformat(doc_data['modified']),
                content=content,
                tokens=Tokenizer.tokenize(content)
            )
            self.documents[path] = doc
        
        self.inverted_index = {
            token: set(docs) 
            for token, docs in data['inverted_index'].items()
        }

File Watcher

Create file_indexer/watcher.py:

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler


class FileIndexHandler(FileSystemEventHandler):
    def __init__(self, indexer):
        self.indexer = indexer

    def on_created(self, event):
        if not event.is_directory:
            print(f"File created: {event.src_path}")
            self.indexer.index_file(event.src_path)

    def on_modified(self, event):
        if not event.is_directory:
            print(f"File modified: {event.src_path}")
            self.indexer.index_file(event.src_path)

    def on_deleted(self, event):
        if not event.is_directory:
            print(f"File deleted: {event.src_path}")
            self.indexer.remove_document(event.src_path)

    def on_moved(self, event):
        if not event.is_directory:
            print(f"File moved: {event.src_path} -> {event.dest_path}")
            self.indexer.remove_document(event.src_path)
            self.indexer.index_file(event.dest_path)


class FileWatcher:
    def __init__(self, indexer):
        self.indexer = indexer
        self.observer = Observer()
        self.handler = FileIndexHandler(indexer)

    def watch(self, path: str, recursive: bool = True):
        self.observer.schedule(self.handler, path, recursive=recursive)
        self.observer.start()
        print(f"Watching {path} for changes...")

    def watch_and_search(self, path: str, query: str, interval: int = 60):
        self.watch(path)
        
        try:
            while True:
                time.sleep(interval)
                results = self.indexer.search(query)
                print(f"\nFound {len(results)} results for '{query}':")
                for result in results[:5]:
                    print(f"  {result.document.path}")
        except KeyboardInterrupt:
            self.stop()

    def stop(self):
        self.observer.stop()
        self.observer.join()
        print("Stopped watching")

CLI Interface

Create command-line interface:

import argparse
import sys
from pathlib import Path
from file_indexer import FileIndexer, SearchEngine, FileWatcher


def main():
    parser = argparse.ArgumentParser(description='File Indexing System')
    subparsers = parser.add_subparsers(dest='command')

    index_parser = subparsers.add_parser('index', help='Index a directory')
    index_parser.add_argument('path', help='Directory to index')
    index_parser.add_argument('--save', help='Save index to file')
    index_parser.add_argument('--extensions', nargs='+', help='File extensions to index')

    search_parser = subparsers.add_parser('search', help='Search indexed files')
    search_parser.add_argument('query', help='Search query')
    search_parser.add_argument('--limit', type=int, default=10, help='Max results')
    search_parser.add_argument('--load', help='Load index from file')

    watch_parser = subparsers.add_parser('watch', help='Watch directory for changes')
    watch_parser.add_argument('path', help='Directory to watch')
    watch_parser.add_argument('--query', help='Search query to run periodically')

    args = parser.parse_args()

    if args.command == 'index':
        indexer = FileIndexer()
        
        extensions = set(args.extensions) if args.extensions else None
        count = indexer.index_directory(args.path, extensions)
        
        print(f"Indexed {count} files")
        
        if args.save:
            indexer.save_index(args.save)
            print(f"Index saved to {args.save}")

    elif args.command == 'search':
        indexer = FileIndexer()
        
        if args.load:
            indexer.load_index(args.load)
            print(f"Loaded index from {args.load}")
        else:
            print("No index loaded. Use --load or run 'index' first.")
            sys.exit(1)
        
        engine = SearchEngine(indexer)
        results = engine.search(args.query, args.limit)
        
        print(f"\nFound {len(results)} results:")
        for result in results:
            print(f"\n{result.document.name}")
            print(f"  Path: {result.document.path}")
            print(f"  Score: {result.score:.2f}")
            print(f"  Matches: {', '.join(result.matches[:3])}")

    elif args.command == 'watch':
        indexer = FileIndexer()
        
        print("Initial indexing...")
        count = indexer.index_directory(args.path)
        print(f"Indexed {count} files")
        
        watcher = FileWatcher(indexer)
        
        if args.query:
            watcher.watch_and_search(args.path, args.query)
        else:
            watcher.watch(args.path)
            print("Watching for changes. Press Ctrl+C to stop.")
            try:
                import time
                while True:
                    time.sleep(1)
            except KeyboardInterrupt:
                watcher.stop()

    else:
        parser.print_help()


if __name__ == '__main__':
    main()

Testing the Indexer

Use the CLI:

# Index a directory
python cli.py index ./my-project --save index.json

# Search indexed files
python cli.py search "function" --load index.json

# Watch directory for changes
python cli.py watch ./my-project --query "class"

# Filter by extension
python cli.py index ./project --extensions .py .js
# Programmatic usage
from file_indexer import FileIndexer, SearchEngine

indexer = FileIndexer()
indexer.index_directory('./my-project')

engine = SearchEngine(indexer)
results = engine.search('python')

for result in results:
    print(f"{result.document.name} (score: {result.score})")
    print(f"  Path: {result.document.path}")

# Filter by extension
py_files = engine.search_by_extension('.py')
print(f"Found {len(py_files)} Python files")

# Search by filename
readmes = engine.search_by_name('README')
print(f"Found {len(readmes)} README files")

Summary

Congratulations! You've built a complete file indexing system. Here's what you learned:

  • Directory Scanning - How to traverse directories
  • Tokenization - How to process text into tokens
  • Inverted Index - How to build a search index
  • File Watching - How to monitor file changes

Possible Extensions

  • Add fuzzy matching
  • Implement ranking algorithms
  • Add document snippets
  • Implement parallel indexing