Build a Message Queue
Introduction
Message queues are the backbone of asynchronous communication in modern distributed systems. They enable decoupled communication between services, handling background jobs, and managing workloads.
In this tutorial, we'll build "MsgQueue" - a message queue system with persistent storage, pub/sub messaging, and clustering support.
What You'll Build
- A message queue server
- Persistent message storage
- Publish/Subscribe system
- Message acknowledgment
- Queue clustering
What You'll Learn
- Message queue architecture
- Message serialization
- Pub/Sub patterns
- Distributed systems communication
- Network programming
Core Concepts
Let's understand the fundamentals of message queues.
Queue Models
- Point-to-Point - One producer, one consumer
- Publish/Subscribe - One producer, multiple consumers
- Request/Reply - Synchronous messaging
Delivery Guarantees
- At-most-once - May lose messages
- At-least-once - May duplicate messages
- Exactly-once - Guaranteed once delivery
Message Patterns
- FIFO - First in, first out
- Priority - Higher priority first
- Delayed - Schedule for later
Project Setup
Bash
# Create project directory
mkdir msgqueue
cd msgqueue
# Create virtual environment
python -m venv venv
source venv/bin/activate
# Install dependencies
pip install asyncio aiofiles jsonpickle
Project Structure
File Structure
msgqueue/
├── msgqueue/
│ ├── __init__.py
│ ├── message.py
│ ├── queue.py
│ ├── pubsub.py
│ ├── storage.py
│ └── server.py
├── client.py
└── requirements.txt
Queue Implementation
Let's create the core message queue.
Python
# msgqueue/message.py
import uuid
import time
from datetime import datetime
from enum import Enum
from typing import Any, Optional, Dict
import json
class MessagePriority(Enum):
LOW = 0
NORMAL = 1
HIGH = 2
class Message:
__init__(self,
queue_name: str,
body: Any,
message_id: str = None,
priority: MessagePriority = MessagePriority.NORMAL,
ttl: int = None,
delay: int = 0,
headers: Dict = None):
self.id = message_id or str(uuid.uuid4())
self.queue_name = queue_name
self.body = body
self.priority = priority
self.ttl = ttl
self.delay = delay
self.headers = headers or {}
self.created_at = datetime.now()
self.available_at = datetime.now() + timedelta(seconds=delay)
self.acknowledged = False
self.ack_id = None
self.retry_count = 0
self.max_retries = 3
to_dict(self) -> Dict:
return {
'id': self.id,
'queue_name': self.queue_name,
'body': self.body,
'priority': self.priority.value,
'ttl': self.ttl,
'delay': self.delay,
'created_at': self.created_at.isoformat(),
'available_at': self.available_at.isoformat(),
'acknowledged': self.acknowledged,
'retry_count': self.retry_count
}
to_json(self) -> str:
return json.dumps(self.to_dict())
to_bytes(self) -> bytes:
return self.to_json().encode('utf-8')
acknowledge(self):
self.acknowledged = True
is_expired(self) -> bool:
if self.ttl:
return datetime.now() > self.created_at + timedelta(seconds=self.ttl)
return False
is_available(self) -> bool:
return datetime.now() >= self.available_at
Python
# msgqueue/queue.py
import asyncio
from collections import deque
from threading import Lock
from typing import List, Optional, Callable
from datetime import datetime, timedelta
from .message import Message, MessagePriority
import uuid
class MessageQueue:
__init__(self, name: str, max_size: int = 10000):
self.name = name
self.max_size = max_size
self._messages = deque()
self._pending = {}
self._lock = Lock()
self._waiting_consumers = []
self.subscribers = []
enqueue(self, message: Message) -> bool:
with self.lock:
if len(self._messages) >= self.max_size:
return False
self._messages.append(message)
return True
dequeue(self, timeout: int = 0) -> Optional[Message]:
with self.lock:
now = datetime.now()
for i, msg in enumerate(self._messages):
if msg.is_available() and not msg.acknowledged:
if msg.is_expired():
continue
msg.ack_id = str(uuid.uuid4())
self._pending[msg.ack_id] = msg
return self._messages[i]
return None
acknowledge(self, ack_id: str) -> bool:
with self.lock:
if ack_id in self._pending:
del self._pending[ack_id]
return True
return False
reject(self, ack_id: str, requeue: bool = True) -> bool:
with self.lock:
if ack_id in self._pending:
msg = self._pending[ack_id]
del self._pending[ack_id]
if requeue and msg.retry_count < msg.max_retries:
msg.retry_count += 1
msg.available_at = datetime.now() + timedelta(seconds=2 ** msg.retry_count)
self._messages.append(msg)
return True
return False
size(self) -> int:
with self.lock:
return len(self.messages)
purge(self):
with self.lock:
self._messages.clear()
self._pending.clear()
get_stats(self) -> dict:
with self.lock:
return {
'name': self.name,
'total_messages': len(self._messages),
'pending': len(self._pending),
'available': sum(1 for m in self._messages if m.is_available())
}
Message Types
Let's add support for different message types.
Python
# msgqueue/message_types.py
from enum import Enum
from typing import Any, Dict
import json
import pickle
class MessageType(Enum):
TEXT = "text"
JSON = "json"
BINARY = "binary"
COMMAND = "command"
EVENT = "event"
class MessageSerializer:
__init__(self):
self.encoders = {
MessageType.TEXT: self._encode_text,
MessageType.JSON: self._encode_json,
MessageType.BINARY: self._encode_binary,
MessageType.COMMAND: self._encode_json,
MessageType.EVENT: self._encode_json
}
self.decoders = {
MessageType.TEXT: self._decode_text,
MessageType.JSON: self._decode_json,
MessageType.BINARY: self._decode_binary,
MessageType.COMMAND: self._decode_json,
MessageType.EVENT: self._decode_json
}
encode(self, message_type: MessageType, data: Any) -> bytes:
encoder = self.encoders.get(message_type, self._encode_json)
return encoder(data)
decode(self, message_type: MessageType, data: bytes) -> Any:
decoder = self.decoders.get(message_type, self._decode_json)
return decoder(data)
_encode_text(self, data: Any) -> bytes:
return str(data).encode('utf-8')
_decode_text(self, data: bytes) -> str:
return data.decode('utf-8')
_encode_json(self, data: Any) -> bytes:
return json.dumps(data).encode('utf-8')
_decode_json(self, data: bytes) -> Any:
return json.loads(data.decode('utf-8'))
_encode_binary(self, data: Any) -> bytes:
return pickle.dumps(data)
_decode_binary(self, data: bytes) -> Any:
return pickle.loads(data)
class MessageBuilder:
__init__(self):
self.serializer = MessageSerializer()
create_message(self, queue_name: str, data: Any,
message_type: MessageType = MessageType.JSON,
**kwargs) -> Message:
from .message import Message
encoded_body = self.serializer.encode(message_type, data)
message = Message(
queue_name=queue_name,
body=encoded_body,
**kwargs
)
message.headers['message_type'] = message_type.value
return message
decode_message(self, message: Message) -> Any:
message_type = MessageType(
message.headers.get('message_type', MessageType.JSON.value)
)
return self.serializer.decode(message_type, message.body)
Pub/Sub System
Let's implement publish/subscribe messaging.
Python
# msgqueue/pubsub.py
import asyncio
from typing import List, Dict, Callable, Any
from collections import defaultdict
from threading import Lock
import uuid
class Subscription:
__init__(self, subscriber_id: str, pattern: str = None):
self.id = str(uuid.uuid4())
self.subscriber_id = subscriber_id
self.pattern = pattern
self.callback: Callable = None
self.active = True
matches(self, channel: str) -> bool:
if self.pattern:
import re
return re.match(self.pattern, channel)
return True
class PubSubManager:
__init__(self):
self._channels: Dict[str, List[Subscription]] = defaultdict(list)
self._subscribers: Dict[str, Subscription] = {}
self._lock = Lock()
self.message_history: Dict[str, List[dict]] = defaultdict(list)
self.history_limit = 100
subscribe(self, subscriber_id: str, channel: str,
callback: Callable = None, pattern: str = None) -> Subscription:
with self.lock:
sub = Subscription(subscriber_id, pattern)
sub.callback = callback
self._channels[channel].append(sub)
self._subscribers[sub.id] = sub
return sub
unsubscribe(self, subscription_id: str) -> bool:
with self.lock:
if subscription_id not in self._subscribers:
return False
sub = self._subscribers[subscription_id]
sub.active = False
for channel, subs in self._channels.items():
self._channels[channel] = [s for s in subs if s.id != subscription_id]
del self._subscribers[subscription_id]
return True
async publish(self, channel: str, message: Any) -> int:
delivered = 0
with self.lock:
message_data = {
'channel': channel,
'message': message,
'timestamp': str(asyncio.get_event_loop().time())
}
self.message_history[channel].append(message_data)
if len(self.message_history[channel]) > self.history_limit:
self.message_history[channel] = self.message_history[channel][-self.history_limit:]
with self.lock:
subscriptions = self._channels.get(channel, []).copy()
for sub in subscriptions:
if not sub.active:
continue
if sub.callback:
try:
if asyncio.iscoroutinefunction(sub.callback):
await sub.callback(message)
else:
sub.callback(message)
delivered += 1
except Exception as e:
print(f"Error delivering to ${sub.id}: ${e}")
return delivered
get_channel_history(self, channel: str, limit: int = 10) -> List[dict]:
with self.lock:
return self.message_history.get(channel, [])[-limit:]
get_subscriber_count(self, channel: str) -> int:
with self.lock:
return sum(1 for s in self._channels.get(channel, []) if s.active)
get_stats(self) -> dict:
with self.lock:
return {
'total_channels': len(self._channels),
'total_subscriptions': len(self._subscribers),
'channels': {
ch: len(subs)
for ch, subs in self._channels.items()
}
}
Message Persistence
Let's add message persistence to disk.
Python
# msgqueue/storage.py
import asyncio
import aiofiles
import os
import json
from typing import List, Optional
from datetime import datetime
from .message import Message
import uuid
class MessageStore:
__init__(self, storage_path: str = "./msgstore"):
self.storage_path = storage_path
self.queues_path = os.path.join(storage_path, "queues")
self.messages_path = os.path.join(storage_path, "messages")
os.makedirs(self.queues_path, exist_ok=True)
os.makedirs(self.messages_path, exist_ok=True)
async save_message(self, message: Message):
message_file = os.path.join(self.messages_path, f"${message.id}.json")
async with aiofiles.open(message_file, 'w') as f:
await f.write(json.dumps(message.to_dict()))
async load_message(self, message_id: str) -> Optional[Message]:
message_file = os.path.join(self.messages_path, f"${message_id}.json")
if not os.path.exists(message_file):
return None
async with aiofiles.open(message_file, 'r') as f:
content = await f.read()
data = json.loads(content)
msg = Message(
queue_name=data['queue_name'],
body=data['body'],
message_id=data['id'],
ttl=data.get('ttl'),
delay=data.get('delay')
)
return msg
async delete_message(self, message_id: str):
message_file = os.path.join(self.messages_path, f"${message_id}.json")
if os.path.exists(message_file):
os.remove(message_file)
async get_queue_messages(self, queue_name: str) -> List[Message]:
messages = []
for filename in os.listdir(self.messages_path):
if not filename.endswith('.json'):
continue
message_id = filename[:-5]
msg = await self.load_message(message_id)
if msg and msg.queue_name == queue_name:
messages.append(msg)
return messages
async save_queue_state(self, queue_name: str, state: dict):
queue_file = os.path.join(self.queues_path, f"${queue_name}.json")
async with aiofiles.open(queue_file, 'w') as f:
await f.write(json.dumps(state))
async load_queue_state(self, queue_name: str) -> Optional[dict]:
queue_file = os.path.join(self.queues_path, f"${queue_name}.json")
if not os.path.exists(queue_file):
return None
async with aiofiles.open(queue_file, 'r') as f:
content = await f.read()
return json.loads(content)
async cleanup_expired(self)):
for filename in os.listdir(self.messages_path):
if not filename.endswith('.json'):
continue
message_id = filename[:-5]
msg = await self.load_message(message_id)
if msg and msg.is_expired():
await self.delete_message(message_id)
Queue Clustering
Let's add clustering support for high availability.
Python
# msgqueue/cluster.py
import asyncio
from typing import Dict, List, Optional
import aiohttp
import hashlib
class QueueCluster:
__init__(self, cluster_id: str, nodes: List[dict]):
self.cluster_id = cluster_id
self.nodes = nodes
self.node_index = {n['id']: n for n in nodes}
self.replication_factor = 2
_get_partition(self, queue_name: str) -> int:
hash_value = int(hashlib.md5(queue_name.encode()).hexdigest(), 16)
return hash_value % len(self.nodes)
get_primary_node(self, queue_name: str) -> Optional[dict]:
partition = self._get_partition(queue_name)
return self.nodes[partition]
get_replica_nodes(self, queue_name: str) -> List[dict]:
partition = self._get_partition(queue_name)
replicas = []
for i in range(1, self.replication_factor + 1):
replica_idx = (partition + i) % len(self.nodes)
replicas.append(self.nodes[replica_idx])
return replicas
async forward_to_node(self, node: dict, method: str, path: str, data: dict = None):
url = f"http://${node['host']}:${node['port']}${path}"
try:
async with aiohttp.ClientSession() as session:
if method == 'GET':
async with session.get(url, timeout=5) as resp:
return await resp.json()
elif method == 'POST':
async with session.post(url, json=data, timeout=5) as resp:
return await resp.json()
except Exception as e:
print(f"Error forwarding to node: ${e}")
return None
async enqueue(self, queue_name: str, message: dict) -> bool:
primary = self.get_primary_node(queue_name)
replicas = self.get_replica_nodes(queue_name)
result = await self.forward_to_node(primary, 'POST',
'/queue/enqueue', message)
for replica in replicas:
await self.forward_to_node(replica, 'POST',
'/queue/replicate', message)
return result is not None
async get_queue_stats(self, queue_name: str) -> dict:
primary = self.get_primary_node(queue_name)
return await self.forward_to_node(primary, 'GET',
f'/queue/${queue_name}/stats')
Client API
Let's create an easy-to-use client library.
Python
# client.py
import asyncio
import aiohttp
from typing import Any, Optional, Callable
class MessageQueueClient:
__init__(self, host: str = 'localhost', port: int = 5000):
self.base_url = f"http://${host}:${port}"
self.session: Optional[aiohttp.ClientSession] = None
async connect(self):
self.session = aiohttp.ClientSession()
async close(self):
if self.session:
await self.session.close()
async enqueue(self, queue: str, message: Any,
priority: int = 1, ttl: int = None, delay: int = 0):
payload = {
'queue': queue,
'message': message,
'priority': priority,
'ttl': ttl,
'delay': delay
}
async with self.session.post(
f"${self.base_url}/queue/enqueue",
json=payload
) as resp:
return await resp.json()
async dequeue(self, queue: str, timeout: int = 0) -> Optional[dict]:
async with self.session.get(
f"${self.base_url}/queue/dequeue?${queue}&timeout=${timeout}"
) as resp:
if resp.status == 200:
return await resp.json()
return None
async acknowledge(self, ack_id: str):
async with self.session.post(
f"${self.base_url}/queue/ack",
json={'ack_id': ack_id}
) as resp:
return await resp.json()
async publish(self, channel: str, message: Any):
payload = {
'channel': channel,
'message': message
}
async with self.session.post(
f"${self.base_url}/pubsub/publish",
json=payload
) as resp:
return await resp.json()
async subscribe(self, channel: str, callback: Callable):
async with self.session.ws_connect(
f"${self.base_url}/pubsub/subscribe?${channel}"
) as ws:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await callback(msg.data)
async __aenter__(self):
await self.connect()
return self
async __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
Testing
Python
# test_msgqueue.py
from msgqueue.queue import MessageQueue
from msgqueue.message import Message, MessagePriority
from msgqueue.pubsub import PubSubManager
# Test Queue
print("=== Testing Message Queue ===")
queue = MessageQueue("orders", max_size=100)
queue.enqueue(Message("orders", {"item": "Laptop", "qty": 1}))
queue.enqueue(Message("orders", {"item": "Mouse", "qty": 2}, priority=MessagePriority.HIGH))
queue.enqueue(Message("orders", {"item": "Keyboard", "qty": 1}))
print(f"Queue size: ${queue.size()}")
msg = queue.dequeue()
print(f"Dequeued: ${msg.body} (ack_id: ${msg.ack_id})")
queue.acknowledge(msg.ack_id)
print("Message acknowledged")
# Test Pub/Sub
print("\n=== Testing Pub/Sub ===")
pubsub = PubSubManager()
async def handle_message(msg):
print(f"Received: ${msg}")
asyncio.run(pubsub.publish("notifications", "Hello World!"))
sub = pubsub.subscribe("user1", "notifications", handle_message)
asyncio.run(pubsub.publish("notifications", "New message"))
print(f"Subscribers: ${pubsub.get_subscriber_count("notifications")}")
print(pubsub.get_stats())
Testing Checklist
- Messages can be enqueued and dequeued
- Acknowledgment works correctly
- Pub/Sub delivers messages to subscribers
- Message priorities are respected
- Stats are tracked correctly
Summary
Congratulations! You've built a complete message queue system.
What You Built
- Message Queue - FIFO with priority support
- Message Types - JSON, binary, text serialization
- Pub/Sub System - Channel-based messaging
- Persistence - Message storage to disk
- Clustering - Multi-node support
- Client Library - Easy-to-use API
Next Steps
- Add message filtering
- Implement dead letter queues
- Add message partitioning
- Implement scheduled messages