Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: AsyncEventSource writes multiple events per tcp send, including partial events that straddle buffers; Improvement: don't hold onto event items until ack, immediately remove them from queue #41

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ AsyncEventSourceMessage::~AsyncEventSourceMessage() {
}

size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) {
(void)time;
(void)time;
return ack(len);
}

size_t AsyncEventSourceMessage::ack(size_t len) {
// If the whole message is now acked...
if(_acked + len > _len){
// Return the number of extra bytes acked (they will be carried on to the next message)
Expand All @@ -137,19 +141,24 @@ size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) {
return 0;
}

size_t AsyncEventSourceMessage::send(AsyncClient *client) {
if (!client->canSend())
size_t AsyncEventSourceMessage::write_buffer(AsyncClient *client) {
if (!client->canSend() || client->space() <= 0)
return 0;
const size_t len = _len - _sent;
size_t len = _len - _sent;
if(client->space() < len){
return 0;
len = client->space();
nkinnan marked this conversation as resolved.
Show resolved Hide resolved
}
size_t sent = client->add((const char *)_data, len);
client->send();
size_t sent = client->add((const char *)_data + _sent, len);
_sent += sent;
return sent;
}

size_t AsyncEventSourceMessage::send(AsyncClient *client) {
size_t sent = write_buffer(client);
client->send();
return sent;
}

// Client

AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server)
Expand All @@ -171,6 +180,8 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A

_server->_addClient(this);
delete request;

_client->setNoDelay(true);
}

AsyncEventSourceClient::~AsyncEventSourceClient(){
Expand All @@ -185,8 +196,9 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
delete dataMessage;
return;
}

if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
ets_printf("ERROR: Too many messages queued\n");
ets_printf("AsyncEventSourceClient: ERROR: Queue is full, communications too slow, dropping event");
delete dataMessage;
} else {
_messageQueue.add(dataMessage);
Expand All @@ -196,12 +208,6 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
}

void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
while(len && !_messageQueue.isEmpty()){
len = _messageQueue.front()->ack(len, time);
if(_messageQueue.front()->finished())
_messageQueue.remove(_messageQueue.front());
}

_runQueue();
}

Expand Down Expand Up @@ -247,14 +253,25 @@ void AsyncEventSourceClient::_runQueue(){
this->_messageQueue_processing = true;
#endif // ESP32

while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){
_messageQueue.remove(_messageQueue.front());
}

size_t total_bytes_written = 0;
for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i)
{
if(!(*i)->sent())
(*i)->send(_client);
if(!(*i)->sent()) {
size_t bytes_written = (*i)->write_buffer(_client);
total_bytes_written += bytes_written;
if(bytes_written == 0)
break;
}
}
if(total_bytes_written > 0)
_client->send();

size_t len = total_bytes_written;
while(len && !_messageQueue.isEmpty()){
len = _messageQueue.front()->ack(len);
if(_messageQueue.front()->finished()){
_messageQueue.remove(_messageQueue.front());
}
}

#if defined(ESP32)
Expand Down
2 changes: 2 additions & 0 deletions src/AsyncEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class AsyncEventSourceMessage {
AsyncEventSourceMessage(const char * data, size_t len);
~AsyncEventSourceMessage();
size_t ack(size_t len, uint32_t time __attribute__((unused)));
size_t ack(size_t len);
nkinnan marked this conversation as resolved.
Show resolved Hide resolved
size_t write_buffer(AsyncClient *client);
size_t send(AsyncClient *client);
bool finished(){ return _acked == _len; }
bool sent() { return _sent == _len; }
Expand Down