Skip to content

Commit aeef1b7

Browse files
feat(protomq): add initial implementation
Within this changeset, we introduce the initial implementation of the Zig-lang MQTT 3 server with Protobuf support. It is fast, it is good, and allows you to reduce your packet size by using binary data. The application logic is tested on Linux and MacOS. Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com>
0 parents  commit aeef1b7

36 files changed

+4174
-0
lines changed

.gitignore

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Zig build artifacts
2+
zig-cache/
3+
zig-out/
4+
.zig-cache/
5+
6+
# Build outputs
7+
*.o
8+
*.a
9+
*.so
10+
*.dylib
11+
*.exe
12+
13+
# IDE files
14+
.vscode/
15+
.idea/
16+
*.swp
17+
*.swo
18+
*~
19+
.DS_Store
20+
21+
# Test outputs
22+
test-results/
23+
coverage/
24+
25+
# Logs
26+
*.log
27+
28+
# Environment files
29+
.env
30+
.env.local
31+
32+
# Dependency cache
33+
deps/
34+
vendor/

LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2026 Gökhan Koçmarlı (github.com/electricalgorithm)
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# ProtoMQ
2+
3+
<p align="center">
4+
<img src="assets/mascot.png" alt="ProtoMQ Mascot" width="300px" />
5+
<br />
6+
<b>Type-safe, bandwidth-efficient MQTT for the rest of us.</b>
7+
<br />
8+
<i>Stop sending bloated JSON over the wire.</i>
9+
</p>
10+
11+
---
12+
- MQTT v3.1.1 packet parsing (CONNECT, PUBLISH, SUBSCRIBE, etc.)
13+
- Thread-safe Topic Broker with wildcard support (`+`, `#`)
14+
- Custom Protobuf Engine with runtime `.proto` schema parsing
15+
- Topic-based Protobuf schema routing
16+
- MQTT CLI client with automatic JSON-to-Protobuf encoding
17+
- Structured diagnostic output for Protobuf payloads
18+
19+
### Building
20+
21+
One have to have Zig 0.15.2 or later installed. One can download it from [here](https://ziglang.org/download/).
22+
23+
```bash
24+
# Build server and client
25+
zig build
26+
27+
# Build and run server
28+
zig build run-server
29+
30+
# Build and run client
31+
zig build run-client
32+
33+
# Run tests
34+
zig build test
35+
36+
# Run integration tests
37+
zig build && \
38+
./tests/cli_test.sh && \
39+
./tests/integration_test.sh && \
40+
./tests/run_pubsub_test.sh
41+
```
42+
43+
### Limitations
44+
45+
For the initial release, we support:
46+
- QoS 0 only (at most once delivery)
47+
- No persistent sessions
48+
- No retained messages
49+
- Single-node deployment
50+
51+
### Performance Results
52+
53+
Verified metrics with 100 concurrent clients on Apple M2 (macOS):
54+
55+
- **Concurrency**: 100+ concurrent connections verified.
56+
- **Latency (p99)**: < 0.3ms (Measured 0.24ms).
57+
- **Memory Footprint**: ~2.4 MB for 100 clients.
58+
59+
For detailed methodology and full results, see [RESULTS.md](benchmarks/RESULTS.md).
60+
61+
### Contributing
62+
63+
This is currently a learning/development project. Contributions will be welcome after the MVP is complete.
64+
65+
### License
66+
67+
The project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
68+
69+
### Resources
70+
71+
- [Zig Documentation](https://ziglang.org/documentation/master/)
72+
- [MQTT v3.1.1 Specification](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html)
73+
- [Protocol Buffers](https://protobuf.dev/)
74+
75+
---
76+
77+
**Note**: This project is under active development. The API and architecture may change significantly.

assets/mascot.png

4.67 MB
Loading

benchmarks/RESULTS.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Benchmark Results
2+
3+
This document records the verified performance metrics of the ProtoMQ Server.
4+
5+
## Environment
6+
- **CPU**: Apple M2 Pro
7+
- **OS**: macOS 26.2 Darwin Kernel 25.2.0 (Using kqueue)
8+
- **Zig Version**: 0.15.2
9+
- **Test Date**: 2026-01-23
10+
11+
## Methodology
12+
Benchmarks were performed using the Python script provided in `benchmarks/mqtt_bench.py` within a virtual environment. The script uses raw async TCP sockets to simulate MQTT clients.
13+
14+
1. **Concurrency Test**: 100 clients connect simultaneously to the server. Connectivity is verified via CONNACK.
15+
2. **Latency Test**: 100 PUBLISH messages are sent on a subscribed topic. Latency is measured from the moment of publication to the moment of receipt by the subscriber (round-trip through the server).
16+
3. **Memory Measurement**: Server RSS (Resident Set Size) is measured using `psutil` while 100 clients are connected.
17+
18+
## Verified Results
19+
20+
| Metric | Result |
21+
|--------|--------|
22+
| Concurrent Connections | **100** |
23+
| p50 Latency | **0.16ms** |
24+
| p99 Latency | **0.24ms** |
25+
| Memory Usage (100 Clients) | **2.41 MB** |
26+
27+
## Reproducing the Results
28+
1. Start the server:
29+
```bash
30+
zig build && ./zig-out/bin/protomq-server
31+
```
32+
2. Create a virtual environment and install dependencies:
33+
```bash
34+
python3 -m venv benchmarks/venv
35+
pip install -r benchmarks/requirements.txt
36+
```
37+
3. In a separate terminal, run the benchmark:
38+
```bash
39+
source benchmarks/venv/bin/activate && python benchmarks/mqtt_bench.py
40+
```

benchmarks/mqtt_bench.py

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
import asyncio
2+
import time
3+
import struct
4+
import psutil
5+
import json
6+
7+
class SimpleMQTTClient:
8+
def __init__(self, host='127.0.0.1', port=1883, client_id='bench-client'):
9+
self.host = host
10+
self.port = port
11+
self.client_id = client_id
12+
self.reader = None
13+
self.writer = None
14+
15+
async def connect(self):
16+
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
17+
18+
# Fixed Header: Connect (0x10), Remaining Length
19+
# Variable Header: Protocol Name (0004 'MQTT'), Protocol Level (04), Connect Flags (02 = Clean Session), Keep Alive (003c = 60s)
20+
# Payload: Client ID
21+
22+
protocol_name = b'\x00\x04MQTT'
23+
protocol_level = b'\x04'
24+
connect_flags = b'\x02'
25+
keep_alive = b'\x00\x3c'
26+
27+
payload = struct.pack('!H', len(self.client_id)) + self.client_id.encode()
28+
variable_header = protocol_name + protocol_level + connect_flags + keep_alive
29+
30+
remaining_length = len(variable_header) + len(payload)
31+
fixed_header = struct.pack('!BB', 0x10, remaining_length)
32+
33+
packet = fixed_header + variable_header + payload
34+
self.writer.write(packet)
35+
await self.writer.drain()
36+
37+
# Wait for CONNACK
38+
connack = await self.reader.readexactly(4)
39+
if connack[0] != 0x20:
40+
raise Exception("Failed to connect")
41+
42+
async def subscribe(self, topic):
43+
# Fixed Header: Subscribe (0x82), Remaining Length
44+
# Variable Header: Packet Identifier (0001)
45+
# Payload: Topic Filter, Requested QoS (0)
46+
47+
packet_id = b'\x00\x01'
48+
payload = struct.pack('!H', len(topic)) + topic.encode() + b'\x00'
49+
remaining_length = len(packet_id) + len(payload)
50+
fixed_header = struct.pack('!BB', 0x82, remaining_length)
51+
52+
packet = fixed_header + packet_id + payload
53+
self.writer.write(packet)
54+
await self.writer.drain()
55+
56+
# Wait for SUBACK
57+
suback = await self.reader.readexactly(5)
58+
if suback[0] != 0x90:
59+
raise Exception("Failed to subscribe")
60+
61+
async def publish(self, topic, message):
62+
# Fixed Header: Publish (0x30), Remaining Length
63+
# Variable Header: Topic Name
64+
# Payload: Message
65+
66+
var_header = struct.pack('!H', len(topic)) + topic.encode()
67+
payload = message.encode()
68+
remaining_length = len(var_header) + len(payload)
69+
fixed_header = bytes([0x30, remaining_length]) # Simplified for small lengths
70+
71+
packet = fixed_header + var_header + payload
72+
self.writer.write(packet)
73+
await self.writer.drain()
74+
75+
async def wait_for_message(self):
76+
# Very simplified publish packet reading
77+
header = await self.reader.readexactly(1)
78+
if (header[0] & 0xF0) == 0x30:
79+
rem_len = await self.reader.readexactly(1) # Assuming < 128
80+
data = await self.reader.readexactly(rem_len[0])
81+
topic_len = struct.unpack('!H', data[:2])[0]
82+
msg = data[2 + topic_len:]
83+
return msg.decode()
84+
return None
85+
86+
async def disconnect(self):
87+
if self.writer:
88+
self.writer.write(b'\xe0\x00') # Fixed Header: Disconnect (0xe0), 0 length
89+
await self.writer.drain()
90+
self.writer.close()
91+
await self.writer.wait_closed()
92+
93+
async def benchmark_concurrency(target_clients=100):
94+
print(f"--- Concurrency Test: {target_clients} clients ---")
95+
clients = []
96+
start_time = time.time()
97+
98+
for i in range(target_clients):
99+
client = SimpleMQTTClient(client_id=f"bench-{i}")
100+
try:
101+
await client.connect()
102+
clients.append(client)
103+
if (i + 1) % 10 == 0:
104+
print(f"Connected {i + 1} clients...")
105+
except Exception as e:
106+
print(f"Failed to connect client {i}: {e}")
107+
break
108+
109+
duration = time.time() - start_time
110+
print(f"Successfully connected {len(clients)} clients in {duration:.2f}s")
111+
112+
server_pid = None
113+
for proc in psutil.process_iter(['name']):
114+
if proc.info['name'] == 'mqtt-server':
115+
server_pid = proc.pid
116+
break
117+
118+
if server_pid:
119+
process = psutil.Process(server_pid)
120+
mem_info = process.memory_info()
121+
print(f"Server RSS Memory: {mem_info.rss / 1024 / 1024:.2f} MB")
122+
else:
123+
print("Could not find mqtt-server process for memory measurement.")
124+
125+
# Keep alive for a bit
126+
await asyncio.sleep(2)
127+
128+
# Cleanup
129+
print("Disconnecting clients...")
130+
for client in clients:
131+
await client.disconnect()
132+
133+
return len(clients)
134+
135+
async def benchmark_latency(warmup=5, trials=50):
136+
print(f"--- Latency Test: {trials} trials ---")
137+
sub = SimpleMQTTClient(client_id="bench-sub")
138+
pub = SimpleMQTTClient(client_id="bench-pub")
139+
await sub.connect()
140+
await pub.connect()
141+
142+
topic = "bench/latency"
143+
await sub.subscribe(topic)
144+
145+
# Warmup
146+
for _ in range(warmup):
147+
await pub.publish(topic, "warmup")
148+
await sub.wait_for_message()
149+
150+
latencies = []
151+
for i in range(trials):
152+
msg = f"ping-{i}"
153+
start = time.perf_counter()
154+
await pub.publish(topic, msg)
155+
received = await sub.wait_for_message()
156+
end = time.perf_counter()
157+
158+
if received == msg:
159+
latencies.append((end - start) * 1000) # Convert to ms
160+
else:
161+
print(f"Unexpected message received: {received}")
162+
163+
latencies.sort()
164+
p50 = latencies[len(latencies)//2]
165+
p99 = latencies[int(len(latencies)*0.99)]
166+
avg = sum(latencies) / len(latencies)
167+
168+
print(f"Latency (ms): Avg={avg:.2f}, P50={p50:.2f}, P99={p99:.2f}")
169+
170+
await sub.disconnect()
171+
await pub.disconnect()
172+
return p50, p99
173+
174+
async def main():
175+
# Wait for server to be ready (caller should start the server)
176+
await asyncio.sleep(1)
177+
178+
conn_count = await benchmark_concurrency(100)
179+
p50, p99 = await benchmark_latency(trials=100)
180+
181+
results = {
182+
"concurrent_connections": conn_count,
183+
"latency_p50_ms": p50,
184+
"latency_p99_ms": p99
185+
}
186+
187+
with open("benchmarks/verified_results.json", "w") as f:
188+
json.dump(results, f, indent=4)
189+
190+
if __name__ == "__main__":
191+
asyncio.run(main())

benchmarks/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
psutil==7.2.1

benchmarks/verified_results.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"concurrent_connections": 100,
3+
"latency_p50_ms": 0.16229199536610395,
4+
"latency_p99_ms": 0.2430829918012023
5+
}

0 commit comments

Comments
 (0)