Build a Simple Database Engine
Introduction
Database engines power every application that stores data. In this tutorial, we'll build a simple but functional database engine from scratch.
We'll create "MiniDB" - an embedded key-value store with SQL-like query support, indexing, and basic transactions.
What You'll Build
- A custom storage engine
- SQL-like query parser
- Index system for fast lookups
- Transaction support
- Query optimization
What You'll Learn
- How databases work internally
- Storage engine design
- Query parsing and execution
- B-Tree indexing
- ACID transactions
Core Concepts
Let's understand how database engines work.
Database Architecture
- Storage Layer - Manages reading/writing data to disk
- Query Parser - Converts SQL to executable operations
- Query Engine - Executes queries and returns results
- Index Manager - Maintains indexes for fast lookups
- Transaction Manager - Handles ACID properties
ACID Properties
- Atomicity - Transactions are all-or-nothing
- Consistency - Database stays in valid state
- Isolation - Concurrent transactions don't interfere
- Durability - Committed data is never lost
Project Setup
Bash
# Create project directory
mkdir minidb
cd minidb
# Create virtual environment
python -m venv venv
source venv/bin/activate
# Install dependencies
pip install pyparsing
Project Structure
File Structure
minidb/
├── minidb/
│ ├── __init__.py
│ ├── storage.py
│ ├── parser.py
│ ├── engine.py
│ ├── index.py
│ └── transaction.py
├── main.py
└── requirements.txt
Storage Layer
Let's create the storage engine.
Python
# minidb/storage.py
import os
import struct
import json
from pathlib import Path
from typing import Any, Optional, List, Dict
class Page:
PAGE_SIZE = 4096
__init__(self, page_id: int = 0):
self.page_id = page_id
self.data = bytearray(self.PAGE_SIZE)
read(self, offset: int, size: int) -> bytes:
return bytes(self.data[offset:offset + size])
write(self, offset: int, data: bytes):
self.data[offset:offset + len(data)] = data
class StorageEngine:
__init__(self, db_path: str):
self.db_path = Path(db_path)
self.db_path.mkdir(parents=True, exist_ok=True)
self.data_file = self.db_path / "data.db"
self.log_file = self.db_path / "wal.log"
if not self.data_file.exists():
self._init_db()
_init_db(self):
with open(self.data_file, 'wb') as f:
f.write(struct.pack('>I', 1))
write_record(self, key: str, value: Dict) -> int:
data = json.dumps(value).encode('utf-8')
with open(self.data_file, 'ab') as f:
f.write(struct.pack('>I', len(key)))
f.write(key.encode('utf-8'))
f.write(struct.pack('>I', len(data)))
f.write(data)
return f.tell()
read_all_records(self) -> Dict[str, Dict]:
records = {}
with open(self.data_file, 'rb') as f:
f.read(4)
while True:
key_len_data = f.read(4)
if len(key_len_data) < 4:
break
key_len = struct.unpack('>I', key_len_data)[0]
key = f.read(key_len).decode('utf-8')
val_len = struct.unpack('>I', f.read(4))[0]
data = f.read(val_len)
value = json.loads(data.decode('utf-8'))
records[key] = value
return records
get(self, key: str) -> Optional[Dict]:
records = self.read_all_records()
return records.get(key)
put(self, key: str, value: Dict):
self.write_record(key, value)
delete(self, key: str):
records = self.read_all_records()
if key in records:
del records[key]
self._rewrite_db(records)
_rewrite_db(self, records: Dict):
with open(self.data_file, 'wb') as f:
f.write(struct.pack('>I', 1))
for key, value in records.items():
data = json.dumps(value).encode('utf-8')
f.write(struct.pack('>I', len(key)))
f.write(key.encode('utf-8'))
f.write(struct.pack('>I', len(data)))
f.write(data)
Query Parser
Let's create a simple SQL-like query parser.
Python
# minidb/parser.py
import re
from typing import Dict, List, Any, Optional
class Query:
__init__(self, operation: str, table: str,
conditions: Dict = None,
values: Dict = None,
fields: List[str] = None):
self.operation = operation
self.table = table
self.conditions = conditions or {}
self.values = values or {}
self.fields = fields or []
class QueryParser:
__init__(self):
self.operations = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE']
parse(self, sql: str) -> Query:
sql = sql.strip()
for op in self.operations:
if sql.upper().startswith(op):
return getattr(self, f"_parse_${op.lower()}_")(sql)
raise ValueError(f"Unknown operation: ${sql}")
_parse_select_(self, sql: str) -> Query:
pattern = r'SELECT\s+(.*?)\s+FROM\s+(\w+)(?:\s+WHERE\s+(.*))?'
match = re.match(pattern, sql, re.IGNORECASE)
if not match:
raise ValueError("Invalid SELECT syntax")
fields_str, table, conditions_str = match.groups()
fields = [f.strip() for f in fields_str.split(',')] if fields_str.strip() != '*' else []
conditions = self._parse_conditions(conditions_str) if conditions_str else {}
return Query('SELECT', table, conditions, fields=fields)
_parse_insert_(self, sql: str) -> Query:
pattern = r'INSERT\s+INTO\s+(\w+)\s*\((.*?)\)\s*VALUES\s*\((.*?)\)'
match = re.match(pattern, sql, re.IGNORECASE)
if not match:
raise ValueError("Invalid INSERT syntax")
table, fields_str, values_str = match.groups()
fields = [f.strip() for f in fields_str.split(',')]
values = [v.strip().strip("'\"") for v in values_str.split(',')]
values_dict = dict(zip(fields, values))
return Query('INSERT', table, values=values_dict)
_parse_update_(self, sql: str) -> Query:
pattern = r'UPDATE\s+(\w+)\s+SET\s+(.*?)(?:\s+WHERE\s+(.*))?'
match = re.match(pattern, sql, re.IGNORECASE)
if not match:
raise ValueError("Invalid UPDATE syntax")
table, set_str, conditions_str = match.groups()
set_pairs = [s.strip() for s in set_str.split(',')]
values = {}
for pair in set_pairs:
key, val = pair.split('=')
values[key.strip()] = val.strip().strip("'\"")
conditions = self._parse_conditions(conditions_str) if conditions_str else {}
return Query('UPDATE', table, conditions, values)
_parse_delete_(self, sql: str) -> Query:
pattern = r'DELETE\s+FROM\s+(\w+)(?:\s+WHERE\s+(.*))?'
match = re.match(pattern, sql, re.IGNORECASE)
if not match:
raise ValueError("Invalid DELETE syntax")
table, conditions_str = match.groups()
conditions = self._parse_conditions(conditions_str) if conditions_str else {}
return Query('DELETE', table, conditions)
_parse_conditions(self, cond_str: str) -> Dict:
conditions = {}
if not cond_str:
return conditions
pattern = r'(\w+)\s*(=|!=|<|>|<=|>=|LIKE)\s*["\']?([^"\']+)["\']?'
for match in re.finditer(pattern, cond_str):
field, op, value = match.groups()
conditions[field] = {'op': op, 'value': value}
return conditions
Query Engine
Let's create the query execution engine.
Python
# minidb/engine.py
from typing import List, Dict, Any, Optional
from .storage import StorageEngine
from .parser import Query, QueryParser
class QueryEngine:
__init__(self, storage: StorageEngine):
self.storage = storage
self.parser = QueryParser()
self.tables: Dict[str, Dict] = {}
execute(self, sql: str) -> Any:
query = self.parser.parse(sql)
if query.operation == 'SELECT':
return self._execute_select(query)
elif query.operation == 'INSERT':
return self._execute_insert(query)
elif query.operation == 'UPDATE':
return self._execute_update(query)
elif query.operation == 'DELETE':
return self._execute_delete(query)
elif query.operation == 'CREATE':
return self._execute_create(query)
_execute_select(self, query: Query) -> List[Dict]:
table_name = f"${query.table}_table"
if table_name not in self.tables:
return []
records = self.tables[table_name]
results = []
for record in records.values():
if self._matches_conditions(record, query.conditions):
if query.fields:
results.append({k: v for k, v in record.items() if k in query.fields})
else:
results.append(record)
return results
_execute_insert(self, query: Query) -> Dict:
table_name = f"${query.table}_table"
if table_name not in self.tables:
self.tables[table_name] = {}
records = self.tables[table_name]
record_id = len(records) + 1
record = {'id': record_id, **query.values}
records[record_id] = record
return {'inserted': 1, 'id': record_id}
_execute_update(self, query: Query) -> Dict:
table_name = f"${query.table}_table"
if table_name not in self.tables:
return {'updated': 0}
records = self.tables[table_name]
updated = 0
for record in records.values():
if self._matches_conditions(record, query.conditions):
for key, value in query.values.items():
record[key] = value
updated += 1
return {'updated': updated}
_execute_delete(self, query: Query) -> Dict:
table_name = f"${query.table}_table"
if table_name not in self.tables:
return {'deleted': 0}
records = self.tables[table_name]
deleted = 0
to_delete = [
rid for rid, record in records.items()
if self._matches_conditions(record, query.conditions)
]
for rid in to_delete:
del records[rid]
deleted += 1
return {'deleted': deleted}
_execute_create(self, query: Query) -> Dict:
table_name = f"${query.table}_table"
if table_name in self.tables:
return {'created': 0, 'error': 'Table already exists'}
self.tables[table_name] = {}
return {'created': 1}
_matches_conditions(self, record: Dict, conditions: Dict) -> bool:
if not conditions:
return True
for field, condition in conditions.items():
if field not in record:
return False
op = condition['op']
value = condition['value']
record_value = str(record[field])
if op == '=' and record_value != value:
return False
if op == '!=' and record_value == value:
return False
if op == 'LIKE':
import re
pattern = value.replace('%', '.*')
if not re.match(pattern, record_value):
return False
return True
Indexing
Let's add B-Tree indexing for fast lookups.
Python
# minidb/index.py
from typing import List, Optional, Any
class BTreeNode:
__init__(self, leaf: bool = True):
self.leaf = leaf
self.keys: List[Any] = []
self.values: List[Any] = []
self.children: List['BTreeNode'] = []
class BTree:
__init__(self, degree: int = 3):
self.degree = degree
self.root = BTreeNode(leaf=True)
search(self, key: Any) -> Optional[Any]:
return self._search(self.root, key)
_search(self, node: BTreeNode, key: Any) -> Optional[Any]:
i = 0
while i < len(node.keys) and key > node.keys[i]:
i += 1
if i < len(node.keys) and key == node.keys[i]:
return node.values[i]
if node.leaf:
return None
return self._search(node.children[i], key)
insert(self, key: Any, value: Any):
if len(self.root.keys) == 2 * self.degree - 1:
new_root = BTreeNode(leaf=False)
new_root.children.append(self.root)
self._split_child(new_root, 0)
self.root = new_root
self._insert_non_full(self.root, key, value)
_split_child(self, parent: BTreeNode, index: int):
child = parent.children[index]
new_child = BTreeNode(leaf=child.leaf)
mid_key = child.keys[self.degree - 1]
mid_value = child.values[self.degree - 1]
new_child.keys = child.keys[self.degree:]
new_child.values = child.values[self.degree:]
if not child.leaf:
new_child.children = child.children[self.degree:]
child.keys = child.keys[:self.degree - 1]
child.values = child.values[:self.degree - 1]
if not child.leaf:
child.children = child.children[:self.degree]
parent.children.insert(index + 1, new_child)
parent.keys.insert(index, mid_key)
parent.values.insert(index, mid_value)
_insert_non_full(self, node: BTreeNode, key: Any, value: Any):
i = len(node.keys) - 1
if node.leaf:
while i >= 0 and key < node.keys[i]:
i -= 1
node.keys.insert(i + 1, key)
node.values.insert(i + 1, value)
else:
while i >= 0 and key < node.keys[i]:
i -= 1
i += 1
if len(node.children[i].keys) == 2 * self.degree - 1:
self._split_child(node, i)
if key > node.keys[i]:
i += 1
self._insert_non_full(node.children[i], key, value)
class IndexManager:
__init__(self):
self.indexes: Dict[str, BTree] = {}
create_index(self, table: str, field: str):
index_name = f"${table}_${field}_idx"
self.indexes[index_name] = BTree()
return index_name
insert(self, index_name: str, key: Any, value: Any):
if index_name in self.indexes:
self.indexes[index_name].insert(key, value)
search(self, index_name: str, key: Any) -> Optional[Any]:
if index_name in self.indexes:
return self.indexes[index_name].search(key)
return None
Transactions
Let's add transaction support with ACID properties.
Python
# minidb/transaction.py
from typing import Dict, Any, List
from enum import Enum
import threading
class TransactionState(Enum):
ACTIVE = "active"
COMMITTED = "committed"
ABORTED = "aborted"
class Transaction:
__init__(self, tx_id: int):
self.tx_id = tx_id
self.state = TransactionState.ACTIVE
self.operations: List[Dict] = []
self.locks: set = set()
add_operation(self, operation: Dict):
if self.state == TransactionState.ACTIVE:
self.operations.append(operation)
commit(self):
self.state = TransactionState.COMMITTED
abort(self):
self.state = TransactionState.ABORTED
self.operations.clear()
class TransactionManager:
__init__(self):
self.transactions: Dict[int, Transaction] = {}
self.lock = threading.Lock()
self.next_tx_id = 1
self.write_ahead_log: List[str] = []
begin(self) -> Transaction:
with self.lock:
tx = Transaction(self.next_tx_id)
self.transactions[self.next_tx_id] = tx
self.next_tx_id += 1
return tx
commit(self, tx_id: int) -> bool:
with self.lock:
if tx_id not in self.transactions:
return False
tx = self.transactions[tx_id]
if tx.state != TransactionState.ACTIVE:
return False
self._write_log(f"COMMIT ${tx_id}")
tx.commit()
return True
abort(self, tx_id: int) -> bool:
with self.lock:
if tx_id not in self.transactions:
return False
tx = self.transactions[tx_id]
if tx.state != TransactionState.ACTIVE:
return False
self._write_log(f"ABORT ${tx_id}")
tx.abort()
return True
_write_log(self, message: str):
self.write_ahead_log.append(message)
if len(self.write_ahead_log) > 1000:
self.write_ahead_log = self.write_ahead_log[-500:]
get_transaction(self, tx_id: int) -> Transaction:
return self.transactions.get(tx_id)
Testing
Python
# main.py
from minidb.storage import StorageEngine
from minidb.engine import QueryEngine
engine = QueryEngine(StorageEngine("./testdb"))
# Create table
result = engine.execute("CREATE TABLE users")
print("Create:", result)
# Insert data
result = engine.execute("INSERT INTO users (name, email) VALUES ('John', 'john@example.com')")
print("Insert:", result)
result = engine.execute("INSERT INTO users (name, email) VALUES ('Jane', 'jane@example.com')")
print("Insert:", result)
# Select all
result = engine.execute("SELECT * FROM users")
print("Select all:", result)
# Select with condition
result = engine.execute("SELECT * FROM users WHERE name = 'John'")
print("Select with WHERE:", result)
# Update
result = engine.execute("UPDATE users SET email = 'john.new@example.com' WHERE name = 'John'")
print("Update:", result)
# Delete
result = engine.execute("DELETE FROM users WHERE name = 'Jane'")
print("Delete:", result)
Testing Checklist
- CREATE TABLE works
- INSERT adds records
- SELECT returns data
- WHERE conditions filter correctly
- UPDATE modifies records
- DELETE removes records
Summary
Congratulations! You've built a simple database engine.
What You Built
- Storage Engine - Persistent data storage
- Query Parser - SQL-like query parsing
- Query Engine - Query execution
- B-Tree Index - Fast lookups
- Transaction Manager - ACID support
Next Steps
- Add JOIN operations
- Implement more index types
- Add query optimization
- Implement isolation levels