Skip to content

Commit cbaffca

Browse files
feat: add admin HTTP server, MQTT schema discovery, CLI Protobuf decoding, and float display (#3)
* fix(benchmarks): resolve missing dependencies and package inclusion The Python benchmark suite strictly relies on psutil for monitoring system memory footprints and CPU load during tests. Additionally, the internal common modules failed to package gracefully. This fixes the pyproject definitions to allow seamless uv execution across all environments. Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * feat(server): introduce optional HTTP Admin Server This adds conditional compilation for a non-blocking Admin Server that safely shares the MQTT event loop. It introduces 'POST /api/v1/schemas' to dynamically register protobuf schemas without restarting the broker, and various telemetry 'GET' endpoints for runtime observability. It tracks total message counts directly inside TopicBroker. It has zero overhead footprint when disabled via build flags. Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * test(admin): add integration suite for Admin Server This introduces a robust bash integration test that boots the server, verifies bearer token rejection on unauthorized attempts, executes schema loading via POST, and checks the integrity of the /metrics payload. Hooked into run_all.sh to guarantee standard execution. Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * docs(admin): outline Admin Server features and memory footprint Adding documentation to the main README detailing the optional nature of the Admin Server, the available endpoints for dynamic schemas, and the explicitly verified zero overhead memory profile when compiled for embedded deployment. Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * chore: add systemd service and remote deployment workflow Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * chore: update systemd service and remote deployment workflow Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * chore: move systemd service to deploy/systemd Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * feat(build): install systemd service on linux targets Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * docs: update deployment workflow to use zig build --prefix Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * docs: fix protomq-cli command in deploy workflow Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * feat(cli): implement schema discovery for subscribe command Add MQTT Service Discovery support to the command: - Subscribe to `$SYS/discovery/response` and publish to `$SYS/discovery/request` before entering the main message loop to fetch topic-to-schema mappings. - Parse the protobuf payload using the decoded tag and field tag numbers (1/2/3 for topic/message_type/schema_source). - Populate a holding a and topic mapping so that can decode incoming Protobuf payloads automatically. - Load any embedded from the discovery response into the registry so external schemas do not need to be pre-loaded manually. - Change callback signature to use (mutable) to allow the callback to call methods with mutable-receiver requirements (e.g. , ). - Fall back to printing raw bytes if no schema mapping is found. All 5 integration tests still pass. Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * feat(protobuf): add float32/float64 display in ProtoValue decoder Add two new variants to ProtoValue: - float32: f32 (for Protobuf 'float' fields, wire type Fixed32) - float64: f64 (for Protobuf 'double' fields, wire type Fixed64) Previously these were stored as raw u32/u64 integers, which resulted in unintuitive output like '1102315520' instead of '22.500000'. Changes: - types.zig: add float32/float64 union arms; debugPrint renders them with 6 decimal places using '{d:.6}' - decoder.zig: in decodeValue, check field.type == .Float/.Double when handling Fixed32/Fixed64 wire types and @bitcast the raw bits into the correct float type All 5 integration tests pass. Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> * feat(cli): add --type flag to subscribe for direct schema decoding Allow passing --type alongside --proto-dir to the subscribe command to directly force a topic->message_type mapping for the subscribed topic. This bypasses the need for the server to advertise the mapping via Service Discovery, which is useful when the server's topic_mapping does not include the topic being subscribed to. The --type flag supplements (not replaces) discovery: discovery still runs first, then --type overrides/adds the mapping for the exact subscribed topic. Example: protomq-cli subscribe -t sensor/temp --proto-dir schemas --type SensorData Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com> --------- Signed-off-by: Gyokhan Kochmarla <gokhan.kocmarli@gmail.com>
1 parent 4243b37 commit cbaffca

File tree

16 files changed

+716
-27
lines changed

16 files changed

+716
-27
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
---
2+
description: How to deploy ProtoMQ to a remote server and validate it
3+
---
4+
5+
# ProtoMQ Remote Deployment Guide
6+
7+
This workflow guides agents on how to deploy the ProtoMQ server to a remote machine, specifically configuring it to run as a systemd service, and how to validate that the deployment is working correctly.
8+
9+
> [!IMPORTANT]
10+
> Do not assume any prior knowledge of the target environment.
11+
12+
## Prerequisites
13+
- Ask the user which SSH connection string to use (e.g., `<ssh_user>@<remote_host>`).
14+
15+
## Deployment Steps
16+
17+
1. **Verify Zig Dependency**:
18+
Search for the `zig` binary on the remote machine (e.g., `ssh <ssh_target> "which zig"` or `ssh <ssh_target> "find / -name zig 2>/dev/null | grep bin/zig"`).
19+
If `zig` is not found, **STOP** and tell the user to install Zig on the remote machine before proceeding.
20+
21+
2. **Clone Repository to `/opt/protomq`**:
22+
Connect via the provided SSH connection and create the `/opt/protomq` directory, ensuring appropriate permissions, then clone the repository there.
23+
```bash
24+
ssh <ssh_target> "sudo mkdir -p /opt/protomq && sudo chown \$USER /opt/protomq && git clone https://github.com/electricalgorithm/protomq /opt/protomq || (cd /opt/protomq && git fetch --all)"
25+
```
26+
27+
3. **Checkout and Pull**:
28+
Checkout the correct branch and pull the latest changes.
29+
```bash
30+
ssh <ssh_target> "cd /opt/protomq && git checkout <branch_name> && git pull"
31+
```
32+
33+
4. **Build and Install the Application**:
34+
Build the Zig application on the remote server using the located `zig` binary. Ensure you build with the `-Dadmin_server=true` flag.
35+
Use the `--prefix /opt/protomq` flag so that it installs the `bin/` files and the systemd service file into `/opt/protomq`.
36+
```bash
37+
ssh <ssh_target> "cd /opt/protomq && sudo <path_to_zig_binary> build -Doptimize=ReleaseSafe -Dadmin_server=true --prefix /opt/protomq"
38+
```
39+
40+
5. **Configure systemd Service**:
41+
Since the build step installed the service file directly into `/opt/protomq/etc/systemd/system/protomq.service`, simply link it to the system bus, reload, and start it.
42+
```bash
43+
ssh <ssh_target> "sudo ln -sf /opt/protomq/etc/systemd/system/protomq.service /etc/systemd/system/protomq.service && sudo systemctl daemon-reload && sudo systemctl enable --now protomq && sudo systemctl restart protomq"
44+
```
45+
46+
6. **Verify Service Status**:
47+
Ensure the service is actively running.
48+
```bash
49+
ssh <ssh_target> "systemctl status protomq"
50+
```
51+
It should say `Active: active (running)`.
52+
53+
## Validation Steps
54+
55+
### 1. Local MQTT Client Validation
56+
Send a ProtoMQ request from the **host machine** (the machine you are running on) to the remote machine to verify basic functionality using the `protomq-cli` tool.
57+
First, build the project locally if necessary, then run the CLI (ensure you use the correct IP/host of the remote machine).
58+
```bash
59+
./zig-out/bin/protomq-cli connect --host <remote_host>
60+
```
61+
*(You can also use `publish` or `subscribe` commands with `-t <topic>` to test actual message flow).*
62+
63+
### 2. Admin Server Validation
64+
If the Admin Server is enabled, it will listen on `127.0.0.1:8080` on the remote server. Validate the endpoints directly on the remote machine over SSH using the default authorization token (`admin_secret` or check `ADMIN_TOKEN`):
65+
66+
- **Metrics Endpoint**:
67+
```bash
68+
ssh <ssh_target> 'curl -s -H "Authorization: Bearer admin_secret" http://127.0.0.1:8080/metrics'
69+
```
70+
*Expected Output*: JSON with connections, messages, schemas, etc. `{"connections":0,"messages_routed":0,"schemas":1,"memory_mb":0}`
71+
72+
- **Schemas Endpoint**:
73+
```bash
74+
ssh <ssh_target> 'curl -s -H "Authorization: Bearer admin_secret" http://127.0.0.1:8080/api/v1/schemas'
75+
```
76+
*Expected Output*: Topic-schema mapping JSON. e.g., `{"sensor/data":"SensorData"}`
77+
78+
If all responses match expectations and the remote CLI connection succeeds, the server is healthy and successfully deployed.

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ protomq-cli discover --proto-dir schemas
5858
```
5959
This allows clients to "bootstrap" themselves without needing pre-shared `.proto` files.
6060

61+
### Admin Server
62+
63+
ProtoMQ includes an optional HTTP Admin Server for runtime observability and dynamic schema management without polluting the core MQTT hot-paths.
64+
65+
- **Dynamic Schema Registration**: Register `.proto` files at runtime via `POST /api/v1/schemas`.
66+
- **Telemetry**: Monitor active connections, message throughput, and schemas via `GET /metrics`.
67+
- **Zero Overhead Footprint**: The Admin Server is disabled by default to preserve the absolute minimum memory footprint for embedded devices. It is strictly conditionally compiled via the `zig build -Dadmin_server=true` flag. Enabling it moderately increases the initial static memory baseline (e.g., from ~2.6 MB to ~4.0 MB) by safely running a parallel HTTP listener, but it executes cooperatively on the same event loop ensuring zero degradation to per-message MQTT performance. When the flag is deactivated, it incurs **zero overhead footprint**.
68+
6169
### Performance Results
6270

6371
ProtoMQ delivers high performance across both high-end and edge hardware:

benchmarks/pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ name = "protomq-bench-suite"
33
version = "0.1.0"
44
description = "ProtoMQ benchmark suite scripts"
55
requires-python = ">=3.11"
6-
dependencies = []
6+
dependencies = [
7+
"psutil>=7.2.2",
8+
]
79

810
[project.scripts]
911
protomq-bench-b1 = "b1_baseline_concurrency.benchmark:main"
@@ -23,6 +25,7 @@ packages = [
2325
"b5_protobuf_load",
2426
"b6_connection_churn",
2527
"b7_message_sizes",
28+
"common/protomq_benchmarks",
2629
]
2730

2831
[build-system]

benchmarks/uv.lock

Lines changed: 35 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

build.zig

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,22 @@ pub fn build(b: *std.Build) void {
44
const target = b.standardTargetOptions(.{});
55
const optimize = b.standardOptimizeOption(.{});
66

7+
const admin_server = b.option(bool, "admin_server", "Enable the HTTP Admin Server") orelse false;
8+
9+
const options = b.addOptions();
10+
options.addOption(bool, "admin_server", admin_server);
11+
const options_module = options.createModule();
12+
713
// Server executable
814
const server_exe = b.addExecutable(.{
915
.name = "protomq-server",
1016
.root_module = b.createModule(.{
1117
.root_source_file = b.path("src/main.zig"),
1218
.target = target,
1319
.optimize = optimize,
20+
.imports = &.{
21+
.{ .name = "build_options", .module = options_module },
22+
},
1423
}),
1524
});
1625
b.installArtifact(server_exe);
@@ -26,6 +35,15 @@ pub fn build(b: *std.Build) void {
2635
});
2736
b.installArtifact(client_exe);
2837

38+
// Install systemd service if building for Linux
39+
if (target.result.os.tag == .linux) {
40+
b.getInstallStep().dependOn(&b.addInstallFileWithDir(
41+
b.path("deploy/systemd/protomq.service"),
42+
.prefix,
43+
"etc/systemd/system/protomq.service",
44+
).step);
45+
}
46+
2947
// Run command for server
3048
const run_server_cmd = b.addRunArtifact(server_exe);
3149
run_server_cmd.step.dependOn(b.getInstallStep());
@@ -50,6 +68,9 @@ pub fn build(b: *std.Build) void {
5068
.root_source_file = b.path("src/main.zig"),
5169
.target = target,
5270
.optimize = optimize,
71+
.imports = &.{
72+
.{ .name = "build_options", .module = options_module },
73+
},
5374
}),
5475
});
5576
const run_unit_tests = b.addRunArtifact(unit_tests);

deploy/systemd/protomq.service

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[Unit]
2+
Description=ProtoMQ Server
3+
After=network.target
4+
5+
[Service]
6+
Type=simple
7+
User=root
8+
WorkingDirectory=/opt/protomq
9+
ExecStart=/opt/protomq/bin/protomq-server
10+
Restart=always
11+
RestartSec=5
12+
13+
[Install]
14+
WantedBy=multi-user.target

src/broker/broker.zig

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ const std = @import("std");
44
pub const TopicBroker = struct {
55
allocator: std.mem.Allocator,
66
subscriptions: std.StringHashMap(SubscriberList),
7+
total_messages_routed: u64,
78

89
const SubscriberList = std.ArrayList(usize); // List of client IDs
910

1011
pub fn init(allocator: std.mem.Allocator) TopicBroker {
1112
return TopicBroker{
1213
.allocator = allocator,
1314
.subscriptions = std.StringHashMap(SubscriberList).init(allocator),
15+
.total_messages_routed = 0,
1416
};
1517
}
1618

src/broker/mqtt_handler.zig

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ pub const MqttHandler = struct {
155155
std.debug.print(" ⚠ Failed to forward to client {}: {}\n", .{ sub_index, err });
156156
continue;
157157
};
158+
broker.total_messages_routed += 1;
158159
std.debug.print(" → Forwarded to client {}\n", .{sub_index});
159160
}
160161
}

src/client/client.zig

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ pub const MqttClient = struct {
155155
}
156156

157157
/// Loop to receive messages (Blocking)
158-
/// Calls callback with (topic, message)
159-
pub fn run(self: *MqttClient, callback: *const fn (topic: []const u8, message: []const u8) void) !void {
158+
/// Calls callback with (context, topic, message)
159+
pub fn run(self: *MqttClient, context: *anyopaque, callback: *const fn (ctx: *anyopaque, topic: []const u8, message: []const u8) void) !void {
160160
if (self.connection == null) return error.NotConnected;
161161

162162
while (self.connection.?.isActive()) {
@@ -175,7 +175,7 @@ pub const MqttClient = struct {
175175
// Use parser to parse PUBLISH packet
176176
// Parser.parsePublish works for incoming PUBLISH
177177
const publish_pkt = try self.parser.parsePublish(buffer);
178-
callback(publish_pkt.topic, publish_pkt.payload);
178+
callback(context, publish_pkt.topic, publish_pkt.payload);
179179
} else if (header.packet_type == .PINGRESP) {
180180
// Ignore
181181
}

0 commit comments

Comments
 (0)