← Back to Tutorials
Python

Build a Simple Database Engine

Difficulty: Advanced Est. Time: ~5 hours

Introduction

Database engines power every application that stores data. In this tutorial, we'll build a simple but functional database engine from scratch.

We'll create "MiniDB" - an embedded key-value store with SQL-like query support, indexing, and basic transactions.

What You'll Build
  • A custom storage engine
  • SQL-like query parser
  • Index system for fast lookups
  • Transaction support
  • Query optimization
What You'll Learn
  • How databases work internally
  • Storage engine design
  • Query parsing and execution
  • B-Tree indexing
  • ACID transactions

Core Concepts

Let's understand how database engines work.

Database Architecture

  • Storage Layer - Manages reading/writing data to disk
  • Query Parser - Converts SQL to executable operations
  • Query Engine - Executes queries and returns results
  • Index Manager - Maintains indexes for fast lookups
  • Transaction Manager - Handles ACID properties

ACID Properties

  • Atomicity - Transactions are all-or-nothing
  • Consistency - Database stays in valid state
  • Isolation - Concurrent transactions don't interfere
  • Durability - Committed data is never lost

Project Setup

Bash
# Create project directory
mkdir minidb
cd minidb

# Create virtual environment
python -m venv venv
source venv/bin/activate

# Install dependencies
pip install pyparsing

Project Structure

File Structure
minidb/
├── minidb/
│   ├── __init__.py
│   ├── storage.py
│   ├── parser.py
│   ├── engine.py
│   ├── index.py
│   └── transaction.py
├── main.py
└── requirements.txt

Storage Layer

Let's create the storage engine.

Python
# minidb/storage.py
import os
import struct
import json
from pathlib import Path
from typing import Any, Optional, List, Dict

class Page:
    PAGE_SIZE = 4096
    
    __init__(self, page_id: int = 0):
        self.page_id = page_id
        self.data = bytearray(self.PAGE_SIZE)
    
    read(self, offset: int, size: int) -> bytes:
        return bytes(self.data[offset:offset + size])
    
    write(self, offset: int, data: bytes):
        self.data[offset:offset + len(data)] = data


class StorageEngine:
    __init__(self, db_path: str):
        self.db_path = Path(db_path)
        self.db_path.mkdir(parents=True, exist_ok=True)
        self.data_file = self.db_path / "data.db"
        self.log_file = self.db_path / "wal.log"
        
        if not self.data_file.exists():
            self._init_db()
    
    _init_db(self):
        with open(self.data_file, 'wb') as f:
            f.write(struct.pack('>I', 1))
    
    write_record(self, key: str, value: Dict) -> int:
        data = json.dumps(value).encode('utf-8')
        
        with open(self.data_file, 'ab') as f:
            f.write(struct.pack('>I', len(key)))
            f.write(key.encode('utf-8'))
            f.write(struct.pack('>I', len(data)))
            f.write(data)
            return f.tell()
    
    read_all_records(self) -> Dict[str, Dict]:
        records = {}
        
        with open(self.data_file, 'rb') as f:
            f.read(4)
            
            while True:
                key_len_data = f.read(4)
                if len(key_len_data) < 4:
                    break
                
                key_len = struct.unpack('>I', key_len_data)[0]
                key = f.read(key_len).decode('utf-8')
                
                val_len = struct.unpack('>I', f.read(4))[0]
                data = f.read(val_len)
                value = json.loads(data.decode('utf-8'))
                
                records[key] = value
        
        return records
    
    get(self, key: str) -> Optional[Dict]:
        records = self.read_all_records()
        return records.get(key)
    
    put(self, key: str, value: Dict):
        self.write_record(key, value)
    
    delete(self, key: str):
        records = self.read_all_records()
        if key in records:
            del records[key]
            self._rewrite_db(records)
    
    _rewrite_db(self, records: Dict):
        with open(self.data_file, 'wb') as f:
            f.write(struct.pack('>I', 1))
            for key, value in records.items():
                data = json.dumps(value).encode('utf-8')
                f.write(struct.pack('>I', len(key)))
                f.write(key.encode('utf-8'))
                f.write(struct.pack('>I', len(data)))
                f.write(data)

Query Parser

Let's create a simple SQL-like query parser.

Python
# minidb/parser.py
import re
from typing import Dict, List, Any, Optional

class Query:
    __init__(self, operation: str, table: str, 
                 conditions: Dict = None, 
                 values: Dict = None,
                 fields: List[str] = None):
        self.operation = operation
        self.table = table
        self.conditions = conditions or {}
        self.values = values or {}
        self.fields = fields or []


class QueryParser:
    __init__(self):
        self.operations = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE']
    
    parse(self, sql: str) -> Query:
        sql = sql.strip()
        
        for op in self.operations:
            if sql.upper().startswith(op):
                return getattr(self, f"_parse_${op.lower()}_")(sql)
        
        raise ValueError(f"Unknown operation: ${sql}")
    
    _parse_select_(self, sql: str) -> Query:
        pattern = r'SELECT\s+(.*?)\s+FROM\s+(\w+)(?:\s+WHERE\s+(.*))?'
        match = re.match(pattern, sql, re.IGNORECASE)
        
        if not match:
            raise ValueError("Invalid SELECT syntax")
        
        fields_str, table, conditions_str = match.groups()
        
        fields = [f.strip() for f in fields_str.split(',')] if fields_str.strip() != '*' else []
        conditions = self._parse_conditions(conditions_str) if conditions_str else {}
        
        return Query('SELECT', table, conditions, fields=fields)
    
    _parse_insert_(self, sql: str) -> Query:
        pattern = r'INSERT\s+INTO\s+(\w+)\s*\((.*?)\)\s*VALUES\s*\((.*?)\)'
        match = re.match(pattern, sql, re.IGNORECASE)
        
        if not match:
            raise ValueError("Invalid INSERT syntax")
        
        table, fields_str, values_str = match.groups()
        
        fields = [f.strip() for f in fields_str.split(',')]
        values = [v.strip().strip("'\"") for v in values_str.split(',')]
        
        values_dict = dict(zip(fields, values))
        
        return Query('INSERT', table, values=values_dict)
    
    _parse_update_(self, sql: str) -> Query:
        pattern = r'UPDATE\s+(\w+)\s+SET\s+(.*?)(?:\s+WHERE\s+(.*))?'
        match = re.match(pattern, sql, re.IGNORECASE)
        
        if not match:
            raise ValueError("Invalid UPDATE syntax")
        
        table, set_str, conditions_str = match.groups()
        
        set_pairs = [s.strip() for s in set_str.split(',')]
        values = {}
        for pair in set_pairs:
            key, val = pair.split('=')
            values[key.strip()] = val.strip().strip("'\"")
        
        conditions = self._parse_conditions(conditions_str) if conditions_str else {}
        
        return Query('UPDATE', table, conditions, values)
    
    _parse_delete_(self, sql: str) -> Query:
        pattern = r'DELETE\s+FROM\s+(\w+)(?:\s+WHERE\s+(.*))?'
        match = re.match(pattern, sql, re.IGNORECASE)
        
        if not match:
            raise ValueError("Invalid DELETE syntax")
        
        table, conditions_str = match.groups()
        conditions = self._parse_conditions(conditions_str) if conditions_str else {}
        
        return Query('DELETE', table, conditions)
    
    _parse_conditions(self, cond_str: str) -> Dict:
        conditions = {}
        
        if not cond_str:
            return conditions
        
        pattern = r'(\w+)\s*(=|!=|<|>|<=|>=|LIKE)\s*["\']?([^"\']+)["\']?'
        
        for match in re.finditer(pattern, cond_str):
            field, op, value = match.groups()
            conditions[field] = {'op': op, 'value': value}
        
        return conditions

Query Engine

Let's create the query execution engine.

Python
# minidb/engine.py
from typing import List, Dict, Any, Optional
from .storage import StorageEngine
from .parser import Query, QueryParser

class QueryEngine:
    __init__(self, storage: StorageEngine):
        self.storage = storage
        self.parser = QueryParser()
        self.tables: Dict[str, Dict] = {}
    
    execute(self, sql: str) -> Any:
        query = self.parser.parse(sql)
        
        if query.operation == 'SELECT':
            return self._execute_select(query)
        elif query.operation == 'INSERT':
            return self._execute_insert(query)
        elif query.operation == 'UPDATE':
            return self._execute_update(query)
        elif query.operation == 'DELETE':
            return self._execute_delete(query)
        elif query.operation == 'CREATE':
            return self._execute_create(query)
    
    _execute_select(self, query: Query) -> List[Dict]:
        table_name = f"${query.table}_table"
        
        if table_name not in self.tables:
            return []
        
        records = self.tables[table_name]
        
        results = []
        for record in records.values():
            if self._matches_conditions(record, query.conditions):
                if query.fields:
                    results.append({k: v for k, v in record.items() if k in query.fields})
                else:
                    results.append(record)
        
        return results
    
    _execute_insert(self, query: Query) -> Dict:
        table_name = f"${query.table}_table"
        
        if table_name not in self.tables:
            self.tables[table_name] = {}
        
        records = self.tables[table_name]
        
        record_id = len(records) + 1
        record = {'id': record_id, **query.values}
        
        records[record_id] = record
        
        return {'inserted': 1, 'id': record_id}
    
    _execute_update(self, query: Query) -> Dict:
        table_name = f"${query.table}_table"
        
        if table_name not in self.tables:
            return {'updated': 0}
        
        records = self.tables[table_name]
        updated = 0
        
        for record in records.values():
            if self._matches_conditions(record, query.conditions):
                for key, value in query.values.items():
                    record[key] = value
                updated += 1
        
        return {'updated': updated}
    
    _execute_delete(self, query: Query) -> Dict:
        table_name = f"${query.table}_table"
        
        if table_name not in self.tables:
            return {'deleted': 0}
        
        records = self.tables[table_name]
        deleted = 0
        
        to_delete = [
            rid for rid, record in records.items()
            if self._matches_conditions(record, query.conditions)
        ]
        
        for rid in to_delete:
            del records[rid]
            deleted += 1
        
        return {'deleted': deleted}
    
    _execute_create(self, query: Query) -> Dict:
        table_name = f"${query.table}_table"
        
        if table_name in self.tables:
            return {'created': 0, 'error': 'Table already exists'}
        
        self.tables[table_name] = {}
        
        return {'created': 1}
    
    _matches_conditions(self, record: Dict, conditions: Dict) -> bool:
        if not conditions:
            return True
        
        for field, condition in conditions.items():
            if field not in record:
                return False
            
            op = condition['op']
            value = condition['value']
            record_value = str(record[field])
            
            if op == '=' and record_value != value:
                return False
            if op == '!=' and record_value == value:
                return False
            if op == 'LIKE':
                import re
                pattern = value.replace('%', '.*')
                if not re.match(pattern, record_value):
                    return False
        
        return True

Indexing

Let's add B-Tree indexing for fast lookups.

Python
# minidb/index.py
from typing import List, Optional, Any

class BTreeNode:
    __init__(self, leaf: bool = True):
        self.leaf = leaf
        self.keys: List[Any] = []
        self.values: List[Any] = []
        self.children: List['BTreeNode'] = []


class BTree:
    __init__(self, degree: int = 3):
        self.degree = degree
        self.root = BTreeNode(leaf=True)
    
    search(self, key: Any) -> Optional[Any]:
        return self._search(self.root, key)
    
    _search(self, node: BTreeNode, key: Any) -> Optional[Any]:
        i = 0
        while i < len(node.keys) and key > node.keys[i]:
            i += 1
        
        if i < len(node.keys) and key == node.keys[i]:
            return node.values[i]
        
        if node.leaf:
            return None
        
        return self._search(node.children[i], key)
    
    insert(self, key: Any, value: Any):
        if len(self.root.keys) == 2 * self.degree - 1:
            new_root = BTreeNode(leaf=False)
            new_root.children.append(self.root)
            self._split_child(new_root, 0)
            self.root = new_root
        
        self._insert_non_full(self.root, key, value)
    
    _split_child(self, parent: BTreeNode, index: int):
        child = parent.children[index]
        new_child = BTreeNode(leaf=child.leaf)
        
        mid_key = child.keys[self.degree - 1]
        mid_value = child.values[self.degree - 1]
        
        new_child.keys = child.keys[self.degree:]
        new_child.values = child.values[self.degree:]
        
        if not child.leaf:
            new_child.children = child.children[self.degree:]
        
        child.keys = child.keys[:self.degree - 1]
        child.values = child.values[:self.degree - 1]
        
        if not child.leaf:
            child.children = child.children[:self.degree]
        
        parent.children.insert(index + 1, new_child)
        parent.keys.insert(index, mid_key)
        parent.values.insert(index, mid_value)
    
    _insert_non_full(self, node: BTreeNode, key: Any, value: Any):
        i = len(node.keys) - 1
        
        if node.leaf:
            while i >= 0 and key < node.keys[i]:
                i -= 1
            
            node.keys.insert(i + 1, key)
            node.values.insert(i + 1, value)
        else:
            while i >= 0 and key < node.keys[i]:
                i -= 1
            
            i += 1
            
            if len(node.children[i].keys) == 2 * self.degree - 1:
                self._split_child(node, i)
                
                if key > node.keys[i]:
                    i += 1
            
            self._insert_non_full(node.children[i], key, value)


class IndexManager:
    __init__(self):
        self.indexes: Dict[str, BTree] = {}
    
    create_index(self, table: str, field: str):
        index_name = f"${table}_${field}_idx"
        self.indexes[index_name] = BTree()
        return index_name
    
    insert(self, index_name: str, key: Any, value: Any):
        if index_name in self.indexes:
            self.indexes[index_name].insert(key, value)
    
    search(self, index_name: str, key: Any) -> Optional[Any]:
        if index_name in self.indexes:
            return self.indexes[index_name].search(key)
        return None

Transactions

Let's add transaction support with ACID properties.

Python
# minidb/transaction.py
from typing import Dict, Any, List
from enum import Enum
import threading

class TransactionState(Enum):
    ACTIVE = "active"
    COMMITTED = "committed"
    ABORTED = "aborted"


class Transaction:
    __init__(self, tx_id: int):
        self.tx_id = tx_id
        self.state = TransactionState.ACTIVE
        self.operations: List[Dict] = []
        self.locks: set = set()
    
    add_operation(self, operation: Dict):
        if self.state == TransactionState.ACTIVE:
            self.operations.append(operation)
    
    commit(self):
        self.state = TransactionState.COMMITTED
    
    abort(self):
        self.state = TransactionState.ABORTED
        self.operations.clear()


class TransactionManager:
    __init__(self):
        self.transactions: Dict[int, Transaction] = {}
        self.lock = threading.Lock()
        self.next_tx_id = 1
        self.write_ahead_log: List[str] = []
    
    begin(self) -> Transaction:
        with self.lock:
            tx = Transaction(self.next_tx_id)
            self.transactions[self.next_tx_id] = tx
            self.next_tx_id += 1
            return tx
    
    commit(self, tx_id: int) -> bool:
        with self.lock:
            if tx_id not in self.transactions:
                return False
            
            tx = self.transactions[tx_id]
            
            if tx.state != TransactionState.ACTIVE:
                return False
            
            self._write_log(f"COMMIT ${tx_id}")
            
            tx.commit()
            
            return True
    
    abort(self, tx_id: int) -> bool:
        with self.lock:
            if tx_id not in self.transactions:
                return False
            
            tx = self.transactions[tx_id]
            
            if tx.state != TransactionState.ACTIVE:
                return False
            
            self._write_log(f"ABORT ${tx_id}")
            
            tx.abort()
            
            return True
    
    _write_log(self, message: str):
        self.write_ahead_log.append(message)
        
        if len(self.write_ahead_log) > 1000:
            self.write_ahead_log = self.write_ahead_log[-500:]
    
    get_transaction(self, tx_id: int) -> Transaction:
        return self.transactions.get(tx_id)

Testing

Python
# main.py
from minidb.storage import StorageEngine
from minidb.engine import QueryEngine

engine = QueryEngine(StorageEngine("./testdb"))

# Create table
result = engine.execute("CREATE TABLE users")
print("Create:", result)

# Insert data
result = engine.execute("INSERT INTO users (name, email) VALUES ('John', 'john@example.com')")
print("Insert:", result)

result = engine.execute("INSERT INTO users (name, email) VALUES ('Jane', 'jane@example.com')")
print("Insert:", result)

# Select all
result = engine.execute("SELECT * FROM users")
print("Select all:", result)

# Select with condition
result = engine.execute("SELECT * FROM users WHERE name = 'John'")
print("Select with WHERE:", result)

# Update
result = engine.execute("UPDATE users SET email = 'john.new@example.com' WHERE name = 'John'")
print("Update:", result)

# Delete
result = engine.execute("DELETE FROM users WHERE name = 'Jane'")
print("Delete:", result)
Testing Checklist
  • CREATE TABLE works
  • INSERT adds records
  • SELECT returns data
  • WHERE conditions filter correctly
  • UPDATE modifies records
  • DELETE removes records

Summary

Congratulations! You've built a simple database engine.

What You Built

  • Storage Engine - Persistent data storage
  • Query Parser - SQL-like query parsing
  • Query Engine - Query execution
  • B-Tree Index - Fast lookups
  • Transaction Manager - ACID support

Next Steps

  • Add JOIN operations
  • Implement more index types
  • Add query optimization
  • Implement isolation levels

Continue Learning

Try these tutorials:

  • Build a Search Engine
  • Build a Caching System
  • Build a Background Job Queue