From dd1bbae1d4ef930b6f072c89ccb1d6babb4a5f87 Mon Sep 17 00:00:00 2001 From: Nick Kinnan Date: Mon, 7 Oct 2024 18:38:11 -0700 Subject: [PATCH] Bugfix: AsyncEventSource writes multiple events per tcp send, including partial events that straddle buffers; Improvement: don't hold onto event items until ack, immediately remove them from queue (#41) --- src/AsyncEventSource.cpp | 57 ++++++++++++++++++++++++++-------------- src/AsyncEventSource.h | 2 ++ 2 files changed, 39 insertions(+), 20 deletions(-) diff --git a/src/AsyncEventSource.cpp b/src/AsyncEventSource.cpp index 5473f085..1e0920a4 100644 --- a/src/AsyncEventSource.cpp +++ b/src/AsyncEventSource.cpp @@ -124,7 +124,11 @@ AsyncEventSourceMessage::~AsyncEventSourceMessage() { } size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) { - (void)time; + (void)time; + return ack(len); +} + +size_t AsyncEventSourceMessage::ack(size_t len) { // If the whole message is now acked... if(_acked + len > _len){ // Return the number of extra bytes acked (they will be carried on to the next message) @@ -137,19 +141,24 @@ size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) { return 0; } -size_t AsyncEventSourceMessage::send(AsyncClient *client) { - if (!client->canSend()) +size_t AsyncEventSourceMessage::write_buffer(AsyncClient *client) { + if (!client->canSend() || client->space() <= 0) return 0; - const size_t len = _len - _sent; + size_t len = _len - _sent; if(client->space() < len){ - return 0; + len = client->space(); } - size_t sent = client->add((const char *)_data, len); - client->send(); + size_t sent = client->add((const char *)_data + _sent, len); _sent += sent; return sent; } +size_t AsyncEventSourceMessage::send(AsyncClient *client) { + size_t sent = write_buffer(client); + client->send(); + return sent; +} + // Client AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server) @@ -171,6 +180,8 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A _server->_addClient(this); delete request; + + _client->setNoDelay(true); } AsyncEventSourceClient::~AsyncEventSourceClient(){ @@ -185,8 +196,9 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage) delete dataMessage; return; } + if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){ - ets_printf("ERROR: Too many messages queued\n"); + ets_printf("AsyncEventSourceClient: ERROR: Queue is full, communications too slow, dropping event"); delete dataMessage; } else { _messageQueue.add(dataMessage); @@ -196,12 +208,6 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage) } void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){ - while(len && !_messageQueue.isEmpty()){ - len = _messageQueue.front()->ack(len, time); - if(_messageQueue.front()->finished()) - _messageQueue.remove(_messageQueue.front()); - } - _runQueue(); } @@ -247,14 +253,25 @@ void AsyncEventSourceClient::_runQueue(){ this->_messageQueue_processing = true; #endif // ESP32 - while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){ - _messageQueue.remove(_messageQueue.front()); - } - + size_t total_bytes_written = 0; for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) { - if(!(*i)->sent()) - (*i)->send(_client); + if(!(*i)->sent()) { + size_t bytes_written = (*i)->write_buffer(_client); + total_bytes_written += bytes_written; + if(bytes_written == 0) + break; + } + } + if(total_bytes_written > 0) + _client->send(); + + size_t len = total_bytes_written; + while(len && !_messageQueue.isEmpty()){ + len = _messageQueue.front()->ack(len); + if(_messageQueue.front()->finished()){ + _messageQueue.remove(_messageQueue.front()); + } } #if defined(ESP32) diff --git a/src/AsyncEventSource.h b/src/AsyncEventSource.h index 7dd559de..d1d78823 100644 --- a/src/AsyncEventSource.h +++ b/src/AsyncEventSource.h @@ -69,6 +69,8 @@ class AsyncEventSourceMessage { AsyncEventSourceMessage(const char * data, size_t len); ~AsyncEventSourceMessage(); size_t ack(size_t len, uint32_t time __attribute__((unused))); + size_t ack(size_t len); + size_t write_buffer(AsyncClient *client); size_t send(AsyncClient *client); bool finished(){ return _acked == _len; } bool sent() { return _sent == _len; }