feat: add per-monitor websocket transport#4830
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a per-monitor WebSocket server inside zmc to expose live monitor status, queued capture events, and JPEG/raw/H264 payload delivery directly from the capture process, using a port derived from MIN_STREAMING_PORT + MonitorId (as requested in #2875).
Changes:
- Introduces a lightweight WebSocket implementation (
ComputeAcceptKey/handshake, frame encode/decode) and a per-monitorMonitorWebSocketServerthread. - Extends
Monitorto generate WebSocket status JSON, queue capture failure/recovery events, and produce JPEG/raw snapshots and H.264 packet payloads. - Adds documentation + unit tests for the WebSocket transport and wires tests/build via CMake.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/zm_websocket.cpp | Adds unit tests for handshake key parsing, accept key, frame encode/decode, and port derivation. |
| tests/CMakeLists.txt | Registers the new websocket unit test file in the test suite. |
| src/zmc.cpp | Starts/stops the per-monitor WebSocket server and queues capture failure/recovery events during connect/prime/capture loops. |
| src/zm_websocket.h | Declares websocket helpers plus MonitorWebSocketServer and its client/subscription state. |
| src/zm_websocket.cpp | Implements the websocket protocol handling, polling loop, subscriptions, payload delivery, and event broadcasting. |
| src/zm_monitor.h | Adds WebSocket payload struct, server state, status/event APIs, and H264 iterator APIs to Monitor. |
| src/zm_monitor.cpp | Implements WebSocket server lifecycle, status JSON generation, queued events, and snapshot/H264 payload production. |
| src/CMakeLists.txt | Builds zm_websocket.cpp into the main binaries. |
| docs/userguide/options/options_network.rst | Updates MIN_STREAMING_PORT description to mention the new WebSocket transport usage. |
| docs/api.rst | Adds api_monitor_websocket to the documentation toctree. |
| docs/api_monitor_websocket.rst | Documents the new monitor WebSocket protocol, commands, and message formats. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
I addressed the practical part of this concern by documenting that the websocket listener reuses I’m not taking the suggested separate config/port range change in this PR because the original issue explicitly calls for using the |
|
@connortechnology It would be really helpful if you could guide me for end-to-end testing. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (2)
src/zm_websocket.cpp:876
- When a client is marked closed by setting fd = -1 (e.g., on queueFrame failure in broadcastStatus/broadcastImages/broadcastEvents), removeClosedClients() erases it without calling freeClientResources(). This leaks the per-client H.264 packetqueue iterator (and any future resources added). Ensure freeClientResources() is called for every client being removed/closed (either immediately when setting fd=-1 or inside removeClosedClients before erase).
void MonitorWebSocketServer::removeClosedClients(std::vector<Client> *clients) {
clients->erase(
std::remove_if(
clients->begin(),
clients->end(),
[](const Client &client) { return client.fd < 0; }),
clients->end());
src/zm_monitor.cpp:1507
- CreateWebSocketH264Iterator() walks the packet queue from the first keyframe all the way to the end to find the latest keyframe. With max_video_packet_count==0 (effectively unlimited) this can become O(n) over a very large queue and can stall the websocket thread on subscribe/one-shot h264 requests. Consider adding a PacketQueue helper to locate the most recent keyframe more efficiently (e.g., track last keyframe as packets are queued, or scan backward under the queue lock).
packetqueue_iterator *it = packetqueue.get_video_it(false);
if (!it) {
return nullptr;
}
packetqueue_iterator latest_keyframe = *it;
bool have_keyframe = false;
while (true) {
ZMPacketLock packet_lock = packetqueue.get_packet_no_wait(it);
if (!packet_lock.packet_) {
break;
}
if (packet_lock.packet_->packet &&
packet_lock.packet_->packet->stream_index == video_stream_id &&
packet_lock.packet_->keyframe) {
latest_keyframe = *it;
have_keyframe = true;
}
if (!packetqueue.increment_it(it, video_stream_id)) {
break;
}
}
if (!have_keyframe) {
packetqueue.free_it(it);
return nullptr;
}
*it = latest_keyframe;
return it;
}
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (3)
src/zm_websocket.cpp:824
sendImagePayloadfailure during H.264 streaming setsclient.fd = -1but does not close the socket, and later removal drops the client without closing the fd. This can leak file descriptors under load/backpressure. Close the fd before marking the client closed (or ensure removal closes it).
break;
}
if (!sendImagePayload(&client, payload, "")) {
client.fd = -1;
break;
}
src/zm_websocket.cpp:839
- For non-H.264 image subscriptions, a failed
sendImagePayloadmarksclient.fd = -1without closing the socket; the removal path then erases the client and leaks the fd. Close the fd before invalidating it (or handle closing inremoveClosedClients).
Monitor::WebSocketPayload payload;
if (monitor->GetWebSocketPayload(client.image_format, &payload) && (payload.sequence != client.last_image_sequence)) {
if (!sendImagePayload(&client, payload, "")) {
client.fd = -1;
continue;
}
src/zm_websocket.cpp:871
- When event delivery queueing fails, the code sets
client.fd = -1but never closes the socket;removeClosedClients()will erase the entry without closing the fd. This can leak file descriptors when an events subscriber falls behind. Ensure the fd is closed when a client is force-closed.
}
for (const std::string &event : events) {
if (!queueFrame(&client, websocket::Opcode::TEXT, event)) {
client.fd = -1;
break;
}
| bool encodeJpegImage(const Image &image, std::string *output) { | ||
| std::vector<unsigned char> buffer(ZM_MAX_IMAGE_SIZE); | ||
| size_t encoded_size = 0; | ||
| if (!image.EncodeJpeg(buffer.data(), &encoded_size)) { | ||
| return false; | ||
| } | ||
| output->assign(reinterpret_cast<const char *>(buffer.data()), encoded_size); | ||
| return true; |
There was a problem hiding this comment.
I’m leaving this as-is for now. The current allocation pattern is not ideal, but this is a performance optimization rather than a correctness issue, and I’d prefer to address it separately with profiling or a more deliberate buffering strategy instead of changing the payload path in this feature PR.
There was a problem hiding this comment.
It's a pretty blatant bad idea which should be rectified prior to merge.
| bool writeFully(int fd, const std::string &payload) { | ||
| size_t offset = 0; | ||
| while (offset < payload.size()) { | ||
| const ssize_t bytes_sent = ::send(fd, payload.data() + offset, payload.size() - offset, MSG_NOSIGNAL); | ||
| if (bytes_sent < 0) { | ||
| if (errno == EINTR) { | ||
| continue; | ||
| } | ||
| return false; | ||
| } | ||
| if (bytes_sent == 0) { | ||
| return false; | ||
| } | ||
| offset += static_cast<size_t>(bytes_sent); | ||
| } | ||
| return true; | ||
| } |
There was a problem hiding this comment.
This is only used on the handshake failure path when the connection is about to be closed. I agree it is not a fully general non-blocking write helper, but I’m keeping it narrow here rather than adding polling/queueing complexity for an invalid-request response path.
| bool MonitorWebSocketServer::acceptClients(std::vector<Client> *clients) { | ||
| while (running) { | ||
| sockaddr_storage addr = {}; | ||
| socklen_t addr_len = sizeof(addr); | ||
| const int fd = ::accept(server.getReadDesc(), reinterpret_cast<sockaddr *>(&addr), &addr_len); | ||
| if (fd < 0) { | ||
| if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { | ||
| return true; | ||
| } | ||
| Error("accept() failed in websocket server for monitor %u: %s", monitor->Id(), strerror(errno)); | ||
| return false; | ||
| } | ||
|
|
||
| if (!setNonBlocking(fd)) { | ||
| ::close(fd); | ||
| continue; | ||
| } | ||
|
|
||
| clients->emplace_back(fd); | ||
| } | ||
|
|
||
| return true; | ||
| } |
There was a problem hiding this comment.
A per-monitor connection cap is reasonable hardening, but it introduces an explicit policy/limit choice beyond the scope of this feature implementation. I’d prefer to handle that as a follow-up once we decide the desired operational limit and behavior for rejected connections.
| if (!config.min_streaming_port) { | ||
| return false; | ||
| } | ||
|
|
||
| if (websocket_server) { | ||
| return true; | ||
| } | ||
|
|
||
| const unsigned int websocket_port = zm::websocket::MonitorStreamingPort(config.min_streaming_port, id); | ||
| if (!websocket_port) { | ||
| Warning("Unable to compute websocket port for monitor %u using base port %d", id, config.min_streaming_port); | ||
| return false; | ||
| } |
There was a problem hiding this comment.
I agree the shared port range has operational tradeoffs, but I’m not changing that in this PR because the original feature request explicitly called for using the MIN_STREAMING_PORT range. I’ve documented the conflict risk instead of introducing a new config surface here.
There was a problem hiding this comment.
Ignore the original request. WE need another port range unless this can handle the same requests as the zms cgi path.
| bool MonitorWebSocketServer::queueRaw(Client *client, const std::string &payload) { | ||
| if ((client->queued_bytes + payload.size()) > kMaxQueuedBytesPerClient) { | ||
| Warning( | ||
| "Closing websocket client for monitor %u after queue exceeded %zu bytes", | ||
| monitor->Id(), | ||
| kMaxQueuedBytesPerClient); | ||
| return false; | ||
| } | ||
| client->queued_bytes += payload.size(); | ||
| client->send_queue.push_back({payload, 0}); | ||
| return true; | ||
| } | ||
|
|
||
| bool MonitorWebSocketServer::queueFrame(Client *client, websocket::Opcode opcode, const std::string &payload) { | ||
| return queueRaw(client, websocket::EncodeFrame(opcode, payload)); | ||
| } |
There was a problem hiding this comment.
I’d prefer to keep this PR focused on protocol and lifecycle correctness and handle queue/payload move-optimization separately if we see it matter in profiling.
|
|
||
| * ``jpeg`` | ||
| * ``raw`` | ||
| * ``h264`` |
There was a problem hiding this comment.
Not really sure what an h264 image would be. image should be limited to actual image formats. I would be tempted to say long term that we should be able to return any image format that ffmpeg supports. Realistically raw needs to be either rgba, yuv etc. For now, because I intend to deprecate 24bit support, let's say yuv420p (which is most likely what we will be getting from a decoder), or rgba. Honeslty no one is going to want anything other than jpeg.
| * ``content_type`` | ||
| * ``monitor_id`` | ||
| * ``width`` | ||
| * ``height`` |
There was a problem hiding this comment.
due to alignment/padding, the raw data may have lines longer than width. Will need to commmunicate that as well.
| For subscription mode, ``interval_ms`` controls how often the server checks for | ||
| and sends a newer frame. | ||
|
|
||
| H264 behavior |
There was a problem hiding this comment.
I think you should be careful here using h264 to really mean a video stream =. I think we need clear separation between asking for an image, and asking for a video stream. Please note that I would choose to use mjpeg when we are talking about a stream of jpegs. So we are talking about a video stream, with some codec. We already will need support for h265 and av1. While ideally we should be able to ask for anything (and have it transcode) 99% of the time we will want to simply request the passthrough video data in whatever codec it happens to be in.
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
|
||
| ``jpeg`` and ``raw`` payloads are generated from the latest image in the | ||
| monitor shared-memory ring buffer. |
There was a problem hiding this comment.
For simplicity sake, you should be able to use the packetqueue for these as well. Everything in shm should also be in packetqueue.
| #include <cstring> | ||
| #include <sys/types.h> | ||
|
|
||
| namespace { |
There was a problem hiding this comment.
Why is this up here in the includes?
| static constexpr char kBase64Alphabet[] = | ||
| "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; | ||
|
|
||
| std::string base64EncodeBytes(const uint8_t *data, size_t length) { |
There was a problem hiding this comment.
These are a lot of general string utility functions which likely already exist in zm in zm_utils.cpp. Anything new should be located there.
|
@copilot we appear to be generating a websocket server from scratch rather than using a mature c++ websocket library like https://github.com/zaphoyd/websocketpp. Please document the tradeoffs. @AJ0070 This implementation will at the very least need to support TLS, authentication, etc. |
|
@connortechnology I’ve addressed the auth/TLS concerns in the current implementation. The websocket handshake now enforces ZoneMinder authentication whenever |
Summary
Adds a per-monitor websocket server to
zmcso clients can connect directly to a monitor and request live status, queued events, and image/video payloads.This implements the websocket transport requested in #2875.
Changes
zmcMIN_STREAMING_PORT + MonitorIdjpegrawh264Supported commands
{"command":"status"}{"command":"image","format":"jpeg|raw|h264"}{"command":"subscribe","topic":"status","interval_ms":1000}{"command":"subscribe","topic":"events"}{"command":"subscribe","topic":"image","format":"jpeg|raw|h264","interval_ms":1000}{"command":"unsubscribe","topic":"status|events|image"}Responses use:
Notes
jpegandrawpayloads are generated from the current shared-memory frameh264delivery uses the monitor packet queueh264requests return the latest packet snapshoth264subscriptions stream queued packets in order starting from the latest available keyframe