← Back to Tutorials
Python

Build a Message Queue

Difficulty: Advanced Est. Time: ~5 hours

Introduction

Message queues are the backbone of asynchronous communication in modern distributed systems. They enable decoupled communication between services, handling background jobs, and managing workloads.

In this tutorial, we'll build "MsgQueue" - a message queue system with persistent storage, pub/sub messaging, and clustering support.

What You'll Build
  • A message queue server
  • Persistent message storage
  • Publish/Subscribe system
  • Message acknowledgment
  • Queue clustering
What You'll Learn
  • Message queue architecture
  • Message serialization
  • Pub/Sub patterns
  • Distributed systems communication
  • Network programming

Core Concepts

Let's understand the fundamentals of message queues.

Queue Models

  • Point-to-Point - One producer, one consumer
  • Publish/Subscribe - One producer, multiple consumers
  • Request/Reply - Synchronous messaging

Delivery Guarantees

  • At-most-once - May lose messages
  • At-least-once - May duplicate messages
  • Exactly-once - Guaranteed once delivery

Message Patterns

  • FIFO - First in, first out
  • Priority - Higher priority first
  • Delayed - Schedule for later

Project Setup

Bash
# Create project directory
mkdir msgqueue
cd msgqueue

# Create virtual environment
python -m venv venv
source venv/bin/activate

# Install dependencies
pip install asyncio aiofiles jsonpickle

Project Structure

File Structure
msgqueue/
├── msgqueue/
│   ├── __init__.py
│   ├── message.py
│   ├── queue.py
│   ├── pubsub.py
│   ├── storage.py
│   └── server.py
├── client.py
└── requirements.txt

Queue Implementation

Let's create the core message queue.

Python
# msgqueue/message.py
import uuid
import time
from datetime import datetime
from enum import Enum
from typing import Any, Optional, Dict
import json

class MessagePriority(Enum):
    LOW = 0
    NORMAL = 1
    HIGH = 2


class Message:
    __init__(self, 
                 queue_name: str,
                 body: Any,
                 message_id: str = None,
                 priority: MessagePriority = MessagePriority.NORMAL,
                 ttl: int = None,
                 delay: int = 0,
                 headers: Dict = None):
        
        self.id = message_id or str(uuid.uuid4())
        self.queue_name = queue_name
        self.body = body
        self.priority = priority
        self.ttl = ttl
        self.delay = delay
        self.headers = headers or {}
        
        self.created_at = datetime.now()
        self.available_at = datetime.now() + timedelta(seconds=delay)
        self.acknowledged = False
        self.ack_id = None
        self.retry_count = 0
        self.max_retries = 3
    
    to_dict(self) -> Dict:
        return {
            'id': self.id,
            'queue_name': self.queue_name,
            'body': self.body,
            'priority': self.priority.value,
            'ttl': self.ttl,
            'delay': self.delay,
            'created_at': self.created_at.isoformat(),
            'available_at': self.available_at.isoformat(),
            'acknowledged': self.acknowledged,
            'retry_count': self.retry_count
        }
    
    to_json(self) -> str:
        return json.dumps(self.to_dict())
    
    to_bytes(self) -> bytes:
        return self.to_json().encode('utf-8')
    
    acknowledge(self):
        self.acknowledged = True
    
    is_expired(self) -> bool:
        if self.ttl:
            return datetime.now() > self.created_at + timedelta(seconds=self.ttl)
        return False
    
    is_available(self) -> bool:
        return datetime.now() >= self.available_at
Python
# msgqueue/queue.py
import asyncio
from collections import deque
from threading import Lock
from typing import List, Optional, Callable
from datetime import datetime, timedelta
from .message import Message, MessagePriority
import uuid

class MessageQueue:
    __init__(self, name: str, max_size: int = 10000):
        self.name = name
        self.max_size = max_size
        
        self._messages = deque()
        self._pending = {}
        self._lock = Lock()
        
        self._waiting_consumers = []
        self.subscribers = []
    
    enqueue(self, message: Message) -> bool:
        with self.lock:
            if len(self._messages) >= self.max_size:
                return False
            
            self._messages.append(message)
            return True
    
    dequeue(self, timeout: int = 0) -> Optional[Message]:
        with self.lock:
            now = datetime.now()
            
            for i, msg in enumerate(self._messages):
                if msg.is_available() and not msg.acknowledged:
                    if msg.is_expired():
                        continue
                    
                    msg.ack_id = str(uuid.uuid4())
                    self._pending[msg.ack_id] = msg
                    
                    return self._messages[i]
            
            return None
    
    acknowledge(self, ack_id: str) -> bool:
        with self.lock:
            if ack_id in self._pending:
                del self._pending[ack_id]
                return True
            return False
    
    reject(self, ack_id: str, requeue: bool = True) -> bool:
        with self.lock:
            if ack_id in self._pending:
                msg = self._pending[ack_id]
                del self._pending[ack_id]
                
                if requeue and msg.retry_count < msg.max_retries:
                    msg.retry_count += 1
                    msg.available_at = datetime.now() + timedelta(seconds=2 ** msg.retry_count)
                    self._messages.append(msg)
                return True
            return False
    
    size(self) -> int:
        with self.lock:
            return len(self.messages)
    
    purge(self):
        with self.lock:
            self._messages.clear()
            self._pending.clear()
    
    get_stats(self) -> dict:
        with self.lock:
            return {
                'name': self.name,
                'total_messages': len(self._messages),
                'pending': len(self._pending),
                'available': sum(1 for m in self._messages if m.is_available())
            }

Message Types

Let's add support for different message types.

Python
# msgqueue/message_types.py
from enum import Enum
from typing import Any, Dict
import json
import pickle

class MessageType(Enum):
    TEXT = "text"
    JSON = "json"
    BINARY = "binary"
    COMMAND = "command"
    EVENT = "event"


class MessageSerializer:
    __init__(self):
        self.encoders = {
            MessageType.TEXT: self._encode_text,
            MessageType.JSON: self._encode_json,
            MessageType.BINARY: self._encode_binary,
            MessageType.COMMAND: self._encode_json,
            MessageType.EVENT: self._encode_json
        }
        
        self.decoders = {
            MessageType.TEXT: self._decode_text,
            MessageType.JSON: self._decode_json,
            MessageType.BINARY: self._decode_binary,
            MessageType.COMMAND: self._decode_json,
            MessageType.EVENT: self._decode_json
        }
    
    encode(self, message_type: MessageType, data: Any) -> bytes:
        encoder = self.encoders.get(message_type, self._encode_json)
        return encoder(data)
    
    decode(self, message_type: MessageType, data: bytes) -> Any:
        decoder = self.decoders.get(message_type, self._decode_json)
        return decoder(data)
    
    _encode_text(self, data: Any) -> bytes:
        return str(data).encode('utf-8')
    
    _decode_text(self, data: bytes) -> str:
        return data.decode('utf-8')
    
    _encode_json(self, data: Any) -> bytes:
        return json.dumps(data).encode('utf-8')
    
    _decode_json(self, data: bytes) -> Any:
        return json.loads(data.decode('utf-8'))
    
    _encode_binary(self, data: Any) -> bytes:
        return pickle.dumps(data)
    
    _decode_binary(self, data: bytes) -> Any:
        return pickle.loads(data)


class MessageBuilder:
    __init__(self):
        self.serializer = MessageSerializer()
    
    create_message(self, queue_name: str, data: Any, 
                       message_type: MessageType = MessageType.JSON,
                       **kwargs) -> Message:
        from .message import Message
        
        encoded_body = self.serializer.encode(message_type, data)
        
        message = Message(
            queue_name=queue_name,
            body=encoded_body,
            **kwargs
        )
        
        message.headers['message_type'] = message_type.value
        
        return message
    
    decode_message(self, message: Message) -> Any:
        message_type = MessageType(
            message.headers.get('message_type', MessageType.JSON.value)
        )
        
        return self.serializer.decode(message_type, message.body)

Pub/Sub System

Let's implement publish/subscribe messaging.

Python
# msgqueue/pubsub.py
import asyncio
from typing import List, Dict, Callable, Any
from collections import defaultdict
from threading import Lock
import uuid

class Subscription:
    __init__(self, subscriber_id: str, pattern: str = None):
        self.id = str(uuid.uuid4())
        self.subscriber_id = subscriber_id
        self.pattern = pattern
        self.callback: Callable = None
        self.active = True
    
    matches(self, channel: str) -> bool:
        if self.pattern:
            import re
            return re.match(self.pattern, channel)
        return True


class PubSubManager:
    __init__(self):
        self._channels: Dict[str, List[Subscription]] = defaultdict(list)
        self._subscribers: Dict[str, Subscription] = {}
        self._lock = Lock()
        
        self.message_history: Dict[str, List[dict]] = defaultdict(list)
        self.history_limit = 100
    
    subscribe(self, subscriber_id: str, channel: str, 
                   callback: Callable = None, pattern: str = None) -> Subscription:
        with self.lock:
            sub = Subscription(subscriber_id, pattern)
            sub.callback = callback
            
            self._channels[channel].append(sub)
            self._subscribers[sub.id] = sub
            
            return sub
    
    unsubscribe(self, subscription_id: str) -> bool:
        with self.lock:
            if subscription_id not in self._subscribers:
                return False
            
            sub = self._subscribers[subscription_id]
            sub.active = False
            
            for channel, subs in self._channels.items():
                self._channels[channel] = [s for s in subs if s.id != subscription_id]
            
            del self._subscribers[subscription_id]
            
            return True
    
    async publish(self, channel: str, message: Any) -> int:
        delivered = 0
        
        with self.lock:
            message_data = {
                'channel': channel,
                'message': message,
                'timestamp': str(asyncio.get_event_loop().time())
            }
            
            self.message_history[channel].append(message_data)
            
            if len(self.message_history[channel]) > self.history_limit:
                self.message_history[channel] = self.message_history[channel][-self.history_limit:]
        
        with self.lock:
            subscriptions = self._channels.get(channel, []).copy()
        
        for sub in subscriptions:
            if not sub.active:
                continue
            
            if sub.callback:
                try:
                    if asyncio.iscoroutinefunction(sub.callback):
                        await sub.callback(message)
                    else:
                        sub.callback(message)
                    delivered += 1
                except Exception as e:
                    print(f"Error delivering to ${sub.id}: ${e}")
        
        return delivered
    
    get_channel_history(self, channel: str, limit: int = 10) -> List[dict]:
        with self.lock:
            return self.message_history.get(channel, [])[-limit:]
    
    get_subscriber_count(self, channel: str) -> int:
        with self.lock:
            return sum(1 for s in self._channels.get(channel, []) if s.active)
    
    get_stats(self) -> dict:
        with self.lock:
            return {
                'total_channels': len(self._channels),
                'total_subscriptions': len(self._subscribers),
                'channels': {
                    ch: len(subs) 
                    for ch, subs in self._channels.items()
                }
            }

Message Persistence

Let's add message persistence to disk.

Python
# msgqueue/storage.py
import asyncio
import aiofiles
import os
import json
from typing import List, Optional
from datetime import datetime
from .message import Message
import uuid

class MessageStore:
    __init__(self, storage_path: str = "./msgstore"):
        self.storage_path = storage_path
        self.queues_path = os.path.join(storage_path, "queues")
        self.messages_path = os.path.join(storage_path, "messages")
        
        os.makedirs(self.queues_path, exist_ok=True)
        os.makedirs(self.messages_path, exist_ok=True)
    
    async save_message(self, message: Message):
        message_file = os.path.join(self.messages_path, f"${message.id}.json")
        
        async with aiofiles.open(message_file, 'w') as f:
            await f.write(json.dumps(message.to_dict()))
    
    async load_message(self, message_id: str) -> Optional[Message]:
        message_file = os.path.join(self.messages_path, f"${message_id}.json")
        
        if not os.path.exists(message_file):
            return None
        
        async with aiofiles.open(message_file, 'r') as f:
            content = await f.read()
            data = json.loads(content)
            
            msg = Message(
                queue_name=data['queue_name'],
                body=data['body'],
                message_id=data['id'],
                ttl=data.get('ttl'),
                delay=data.get('delay')
            )
            
            return msg
    
    async delete_message(self, message_id: str):
        message_file = os.path.join(self.messages_path, f"${message_id}.json")
        
        if os.path.exists(message_file):
            os.remove(message_file)
    
    async get_queue_messages(self, queue_name: str) -> List[Message]:
        messages = []
        
        for filename in os.listdir(self.messages_path):
            if not filename.endswith('.json'):
                continue
            
            message_id = filename[:-5]
            msg = await self.load_message(message_id)
            
            if msg and msg.queue_name == queue_name:
                messages.append(msg)
        
        return messages
    
    async save_queue_state(self, queue_name: str, state: dict):
        queue_file = os.path.join(self.queues_path, f"${queue_name}.json")
        
        async with aiofiles.open(queue_file, 'w') as f:
            await f.write(json.dumps(state))
    
    async load_queue_state(self, queue_name: str) -> Optional[dict]:
        queue_file = os.path.join(self.queues_path, f"${queue_name}.json")
        
        if not os.path.exists(queue_file):
            return None
        
        async with aiofiles.open(queue_file, 'r') as f:
            content = await f.read()
            return json.loads(content)
    
    async cleanup_expired(self)):
        for filename in os.listdir(self.messages_path):
            if not filename.endswith('.json'):
                continue
            
            message_id = filename[:-5]
            msg = await self.load_message(message_id)
            
            if msg and msg.is_expired():
                await self.delete_message(message_id)

Queue Clustering

Let's add clustering support for high availability.

Python
# msgqueue/cluster.py
import asyncio
from typing import Dict, List, Optional
import aiohttp
import hashlib

class QueueCluster:
    __init__(self, cluster_id: str, nodes: List[dict]):
        self.cluster_id = cluster_id
        self.nodes = nodes
        self.node_index = {n['id']: n for n in nodes}
        
        self.replication_factor = 2
    
    _get_partition(self, queue_name: str) -> int:
        hash_value = int(hashlib.md5(queue_name.encode()).hexdigest(), 16)
        return hash_value % len(self.nodes)
    
    get_primary_node(self, queue_name: str) -> Optional[dict]:
        partition = self._get_partition(queue_name)
        return self.nodes[partition]
    
    get_replica_nodes(self, queue_name: str) -> List[dict]:
        partition = self._get_partition(queue_name)
        replicas = []
        
        for i in range(1, self.replication_factor + 1):
            replica_idx = (partition + i) % len(self.nodes)
            replicas.append(self.nodes[replica_idx])
        
        return replicas
    
    async forward_to_node(self, node: dict, method: str, path: str, data: dict = None):
        url = f"http://${node['host']}:${node['port']}${path}"
        
        try:
            async with aiohttp.ClientSession() as session:
                if method == 'GET':
                    async with session.get(url, timeout=5) as resp:
                        return await resp.json()
                elif method == 'POST':
                    async with session.post(url, json=data, timeout=5) as resp:
                        return await resp.json()
        except Exception as e:
            print(f"Error forwarding to node: ${e}")
            return None
    
    async enqueue(self, queue_name: str, message: dict) -> bool:
        primary = self.get_primary_node(queue_name)
        replicas = self.get_replica_nodes(queue_name)
        
        result = await self.forward_to_node(primary, 'POST', 
                                              '/queue/enqueue', message)
        
        for replica in replicas:
            await self.forward_to_node(replica, 'POST',
                                       '/queue/replicate', message)
        
        return result is not None
    
    async get_queue_stats(self, queue_name: str) -> dict:
        primary = self.get_primary_node(queue_name)
        
        return await self.forward_to_node(primary, 'GET', 
                                              f'/queue/${queue_name}/stats')

Client API

Let's create an easy-to-use client library.

Python
# client.py
import asyncio
import aiohttp
from typing import Any, Optional, Callable

class MessageQueueClient:
    __init__(self, host: str = 'localhost', port: int = 5000):
        self.base_url = f"http://${host}:${port}"
        self.session: Optional[aiohttp.ClientSession] = None
    
    async connect(self):
        self.session = aiohttp.ClientSession()
    
    async close(self):
        if self.session:
            await self.session.close()
    
    async enqueue(self, queue: str, message: Any, 
                  priority: int = 1, ttl: int = None, delay: int = 0):
        payload = {
            'queue': queue,
            'message': message,
            'priority': priority,
            'ttl': ttl,
            'delay': delay
        }
        
        async with self.session.post(
            f"${self.base_url}/queue/enqueue",
            json=payload
        ) as resp:
            return await resp.json()
    
    async dequeue(self, queue: str, timeout: int = 0) -> Optional[dict]:
        async with self.session.get(
            f"${self.base_url}/queue/dequeue?${queue}&timeout=${timeout}"
        ) as resp:
            if resp.status == 200:
                return await resp.json()
            return None
    
    async acknowledge(self, ack_id: str):
        async with self.session.post(
            f"${self.base_url}/queue/ack",
            json={'ack_id': ack_id}
        ) as resp:
            return await resp.json()
    
    async publish(self, channel: str, message: Any):
        payload = {
            'channel': channel,
            'message': message
        }
        
        async with self.session.post(
            f"${self.base_url}/pubsub/publish",
            json=payload
        ) as resp:
            return await resp.json()
    
    async subscribe(self, channel: str, callback: Callable):
        async with self.session.ws_connect(
            f"${self.base_url}/pubsub/subscribe?${channel}"
        ) as ws:
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    await callback(msg.data)
    
    async __aenter__(self):
        await self.connect()
        return self
    
    async __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

Testing

Python
# test_msgqueue.py
from msgqueue.queue import MessageQueue
from msgqueue.message import Message, MessagePriority
from msgqueue.pubsub import PubSubManager

# Test Queue
print("=== Testing Message Queue ===")

queue = MessageQueue("orders", max_size=100)

queue.enqueue(Message("orders", {"item": "Laptop", "qty": 1}))
queue.enqueue(Message("orders", {"item": "Mouse", "qty": 2}, priority=MessagePriority.HIGH))
queue.enqueue(Message("orders", {"item": "Keyboard", "qty": 1}))

print(f"Queue size: ${queue.size()}")

msg = queue.dequeue()
print(f"Dequeued: ${msg.body} (ack_id: ${msg.ack_id})")

queue.acknowledge(msg.ack_id)
print("Message acknowledged")

# Test Pub/Sub
print("\n=== Testing Pub/Sub ===")

pubsub = PubSubManager()

async def handle_message(msg):
    print(f"Received: ${msg}")

asyncio.run(pubsub.publish("notifications", "Hello World!"))

sub = pubsub.subscribe("user1", "notifications", handle_message)

asyncio.run(pubsub.publish("notifications", "New message"))

print(f"Subscribers: ${pubsub.get_subscriber_count("notifications")}")
print(pubsub.get_stats())
Testing Checklist
  • Messages can be enqueued and dequeued
  • Acknowledgment works correctly
  • Pub/Sub delivers messages to subscribers
  • Message priorities are respected
  • Stats are tracked correctly

Summary

Congratulations! You've built a complete message queue system.

What You Built

  • Message Queue - FIFO with priority support
  • Message Types - JSON, binary, text serialization
  • Pub/Sub System - Channel-based messaging
  • Persistence - Message storage to disk
  • Clustering - Multi-node support
  • Client Library - Easy-to-use API

Next Steps

  • Add message filtering
  • Implement dead letter queues
  • Add message partitioning
  • Implement scheduled messages

Continue Learning

Try these tutorials:

  • Build a Distributed Cache
  • Build a Load Balancer
  • Build an API Gateway