Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

printf() can not handle a argument of type va_list. #697

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
18cc2ca
printf() can not handle a argument of type va_list.
Feb 3, 2020
f69768d
Possible NULL pointer dereferencing fixed.
BlueAndi Apr 3, 2020
05292e2
Variable i shadowing fixed.
BlueAndi Apr 3, 2020
de6732e
Missing member initialization added.
BlueAndi Apr 3, 2020
7398942
Premature client close() will leak _itemBuffer fixed.
BlueAndi Apr 3, 2020
0e2f7de
Fix duplicated number of fragmented frames
fightingdreamer Aug 22, 2020
3a406f8
Explicitly close socket after response has entered finished state
avillacis Aug 25, 2020
8aefa08
Merge pull request #1 from yubox-node-org/explicit-close-on-response-…
BlueAndi Aug 28, 2020
11314e1
Merge pull request #2 from fightingdreamer/fix-duplicated-frame-number
BlueAndi Aug 28, 2020
1759b70
Merge branch 'master' of https://github.com/BlueAndi/ESPAsyncWebServer
BlueAndi Aug 28, 2020
6efd7fd
Using .html instead of .htm to avoid page loading error.
BlueAndi Aug 31, 2020
1178428
Fix #837 invalidated iterator when removing items from list
Nov 16, 2020
344062a
Fix #884, protect list concurrent access with mutex
Nov 16, 2020
dc6e2a3
Remove unused variable; add assertion for failed semaphore creation
Nov 16, 2020
6744ad4
Merge pull request #3 from ul-gh/fix_#837
BlueAndi Dec 11, 2020
06ed63e
Merge pull request #4 from ul-gh/fix_#884
BlueAndi Dec 11, 2020
cf9d180
TEMPLATE_PLACEHOLDER moved to top of file.
BlueAndi Dec 23, 2020
0ee67c5
Change placeholder from '%' to '$' directly, because on the CI a erro…
BlueAndi Dec 23, 2020
d63ad6d
Merge pull request #5 from me-no-dev/master
BlueAndi Apr 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 64 additions & 32 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
}

AsyncEventSourceClient::~AsyncEventSourceClient(){
_messageQueue.free();
_lockmq.lock();
_messageQueue.free();
_lockmq.unlock();
close();
}

Expand All @@ -184,33 +186,41 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
delete dataMessage;
return;
}
//length() is not thread-safe, thus acquiring the lock before this call..
_lockmq.lock();
if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
ets_printf("ERROR: Too many messages queued\n");
delete dataMessage;
ets_printf("ERROR: Too many messages queued\n");
delete dataMessage;
} else {
_messageQueue.add(dataMessage);
_messageQueue.add(dataMessage);
// runqueue trigger when new messages added
if(_client->canSend()) {
_runQueue();
}
}
if(_client->canSend())
_runQueue();
_lockmq.unlock();
}

void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
// Same here, acquiring the lock early
_lockmq.lock();
while(len && !_messageQueue.isEmpty()){
len = _messageQueue.front()->ack(len, time);
if(_messageQueue.front()->finished())
_messageQueue.remove(_messageQueue.front());
}

_runQueue();
_lockmq.unlock();
}

void AsyncEventSourceClient::_onPoll(){
_lockmq.lock();
if(!_messageQueue.isEmpty()){
_runQueue();
}
_lockmq.unlock();
}


void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
_client->close(true);
}
Expand All @@ -225,7 +235,7 @@ void AsyncEventSourceClient::close(){
_client->close();
}

void AsyncEventSourceClient::write(const char * message, size_t len){
void AsyncEventSourceClient::_write(const char * message, size_t len){
_queueMessage(new AsyncEventSourceMessage(message, len));
}

Expand All @@ -234,15 +244,23 @@ void AsyncEventSourceClient::send(const char *message, const char *event, uint32
_queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
}

void AsyncEventSourceClient::_runQueue(){
while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){
_messageQueue.remove(_messageQueue.front());
}
size_t AsyncEventSourceClient::packetsWaiting() const {
size_t len;
_lockmq.lock();
len = _messageQueue.length();
_lockmq.unlock();
return len;
}

for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i)
{
if(!(*i)->sent())
void AsyncEventSourceClient::_runQueue() {
// Calls to this private method now already protected by _lockmq acquisition
// so no extra call of _lockmq.lock() here..
for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) {
// If it crashes here, iterator (i) has been invalidated as _messageQueue
// has been changed... (UL 2020-11-15: Not supposed to happen any more ;-) )
if (!(*i)->sent()) {
(*i)->send(_client);
}
}
}

Expand Down Expand Up @@ -276,56 +294,70 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
client->write((const char *)temp, 2053);
free(temp);
}*/

_client_queue_lock.lock();
_clients.add(client);
if(_connectcb)
_connectcb(client);
_client_queue_lock.unlock();
}

void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
_client_queue_lock.lock();
_clients.remove(client);
_client_queue_lock.unlock();
}

void AsyncEventSource::close(){
// While the whole loop is not done, the linked list is locked and so the
// iterator should remain valid even when AsyncEventSource::_handleDisconnect()
// is called very early
_client_queue_lock.lock();
for(const auto &c: _clients){
if(c->connected())
c->close();
}
_client_queue_lock.unlock();
}

// pmb fix
size_t AsyncEventSource::avgPacketsWaiting() const {
if(_clients.isEmpty())
size_t aql = 0;
uint32_t nConnectedClients = 0;
_client_queue_lock.lock();
if (_clients.isEmpty()) {
_client_queue_lock.unlock();
return 0;

size_t aql=0;
uint32_t nConnectedClients=0;

}
for(const auto &c: _clients){
if(c->connected()) {
aql+=c->packetsWaiting();
aql += c->packetsWaiting();
++nConnectedClients;
}
}
// return aql / nConnectedClients;
return ((aql) + (nConnectedClients/2))/(nConnectedClients); // round up
_client_queue_lock.unlock();
return ((aql) + (nConnectedClients/2)) / (nConnectedClients); // round up
}

void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){


void AsyncEventSource::send(
const char *message, const char *event, uint32_t id, uint32_t reconnect){
String ev = generateEventMessage(message, event, id, reconnect);
_client_queue_lock.lock();
for(const auto &c: _clients){
if(c->connected()) {
c->write(ev.c_str(), ev.length());
c->_write(ev.c_str(), ev.length());
}
}
_client_queue_lock.unlock();
}

size_t AsyncEventSource::count() const {
return _clients.count_if([](AsyncEventSourceClient *c){
return c->connected();
});
size_t n_clients;
_client_queue_lock.lock();
n_clients = _clients.count_if([](AsyncEventSourceClient *c){
return c->connected();
});
_client_queue_lock.unlock();
return n_clients;
}

bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){
Expand Down
12 changes: 9 additions & 3 deletions src/AsyncEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class AsyncEventSourceClient {
AsyncEventSource *_server;
uint32_t _lastId;
LinkedList<AsyncEventSourceMessage *> _messageQueue;
// ArFi 2020-08-27 for protecting/serializing _messageQueue
AsyncPlainLock _lockmq;
void _queueMessage(AsyncEventSourceMessage *dataMessage);
void _runQueue();

Expand All @@ -82,12 +84,12 @@ class AsyncEventSourceClient {

AsyncClient* client(){ return _client; }
void close();
void write(const char * message, size_t len);
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
bool connected() const { return (_client != NULL) && _client->connected(); }
uint32_t lastId() const { return _lastId; }
size_t packetsWaiting() const { return _messageQueue.length(); }
size_t packetsWaiting() const;

void _write(const char * message, size_t len);
//system callbacks (do not call)
void _onAck(size_t len, uint32_t time);
void _onPoll();
Expand All @@ -99,7 +101,11 @@ class AsyncEventSource: public AsyncWebHandler {
private:
String _url;
LinkedList<AsyncEventSourceClient *> _clients;
// Same as for individual messages, protect mutations of _clients list
// since simultaneous access from different tasks is possible
AsyncPlainLock _client_queue_lock;
ArEventHandlerFunction _connectcb;

public:
AsyncEventSource(const String& url);
~AsyncEventSource();
Expand All @@ -108,7 +114,7 @@ class AsyncEventSource: public AsyncWebHandler {
void close();
void onConnect(ArEventHandlerFunction cb);
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
size_t count() const; //number clinets connected
size_t count() const; //number clients connected
size_t avgPacketsWaiting() const;

//system callbacks (do not call)
Expand Down
45 changes: 40 additions & 5 deletions src/AsyncWebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, Async
_clientId = _server->_getNextId();
_status = WS_CONNECTED;
_pstate = 0;
memset(&_pinfo,0,sizeof(_pinfo));
_lastMessageTime = millis();
_keepAlivePeriod = 0;
_client->setRxTimeout(0);
Expand Down Expand Up @@ -652,9 +653,9 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen){
if(_pinfo.opcode){
_pinfo.message_opcode = _pinfo.opcode;
_pinfo.num = 0;
} else _pinfo.num += 1;
}
}
_server->_handleEvent(this, WS_EVT_DATA, (void *)&_pinfo, (uint8_t*)data, datalen);
if (datalen > 0) _server->_handleEvent(this, WS_EVT_DATA, (void *)&_pinfo, (uint8_t*)data, datalen);

_pinfo.index += datalen;
} else if((datalen + _pinfo.index) == _pinfo.len){
Expand Down Expand Up @@ -682,6 +683,8 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen){
_server->_handleEvent(this, WS_EVT_PONG, NULL, data, datalen);
} else if(_pinfo.opcode < 8){//continuation or text/binary frame
_server->_handleEvent(this, WS_EVT_DATA, (void *)&_pinfo, data, datalen);
if (_pinfo.final) _pinfo.num = 0;
else _pinfo.num += 1;
}
} else {
//os_printf("frame error: len: %u, index: %llu, total: %llu\n", datalen, _pinfo.index, _pinfo.len);
Expand Down Expand Up @@ -1000,13 +1003,29 @@ void AsyncWebSocket::messageAll(AsyncWebSocketMultiMessage *message){
_cleanBuffers();
}

size_t AsyncWebSocket::printf(uint32_t id, const char *format, ...){
size_t AsyncWebSocket::printf(uint32_t id, const char *format, ...) {
AsyncWebSocketClient * c = client(id);
if(c){
va_list arg;
char* temp = new char[MAX_PRINTF_LEN];
if(!temp){
return 0;
}
va_start(arg, format);
size_t len = c->printf(format, arg);
size_t len = vsnprintf(temp, MAX_PRINTF_LEN, format, arg);
va_end(arg);
delete[] temp;

AsyncWebSocketMessageBuffer * buffer = makeBuffer(len);
if (!buffer) {
return 0;
}

va_start(arg, format);
vsnprintf( (char *)buffer->get(), len + 1, format, arg);
va_end(arg);

c->text(buffer);
return len;
}
return 0;
Expand Down Expand Up @@ -1041,9 +1060,25 @@ size_t AsyncWebSocket::printf_P(uint32_t id, PGM_P formatP, ...){
AsyncWebSocketClient * c = client(id);
if(c != NULL){
va_list arg;
char* temp = new char[MAX_PRINTF_LEN];
if(!temp){
return 0;
}
va_start(arg, formatP);
size_t len = vsnprintf_P(temp, MAX_PRINTF_LEN, formatP, arg);
va_end(arg);
delete[] temp;

AsyncWebSocketMessageBuffer * buffer = makeBuffer(len + 1);
if (!buffer) {
return 0;
}

va_start(arg, formatP);
size_t len = c->printf_P(formatP, arg);
vsnprintf_P((char *)buffer->get(), len + 1, formatP, arg);
va_end(arg);

c->text(buffer);
return len;
}
return 0;
Expand Down
41 changes: 40 additions & 1 deletion src/AsyncWebSynchronization.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,38 @@

#ifdef ESP32

// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
// Modified 'AsyncWebLock' to just only use mutex since pxCurrentTCB is not
// always available. According to example by Arjan Filius, changed name,
// added unimplemented version for ESP8266
class AsyncPlainLock
{
private:
SemaphoreHandle_t _lock;

public:
AsyncPlainLock() {
_lock = xSemaphoreCreateBinary();
// In this fails, the system is likely that much out of memory that
// we should abort anyways. If assertions are disabled, nothing is lost..
assert(_lock);
xSemaphoreGive(_lock);
}

~AsyncPlainLock() {
vSemaphoreDelete(_lock);
}

bool lock() const {
xSemaphoreTake(_lock, portMAX_DELAY);
return true;
}

void unlock() const {
xSemaphoreGive(_lock);
}
};

// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
class AsyncWebLock
{
Expand All @@ -17,6 +49,9 @@ class AsyncWebLock
public:
AsyncWebLock() {
_lock = xSemaphoreCreateBinary();
// In this fails, the system is likely that much out of memory that
// we should abort anyways. If assertions are disabled, nothing is lost..
assert(_lock);
_lockedBy = NULL;
xSemaphoreGive(_lock);
}
Expand Down Expand Up @@ -61,6 +96,10 @@ class AsyncWebLock
void unlock() const {
}
};

// Same for AsyncPlainLock, for ESP8266 this is just the unimplemented version above.
using AsyncPlainLock = AsyncWebLock;

#endif

class AsyncWebLockGuard
Expand All @@ -84,4 +123,4 @@ class AsyncWebLockGuard
}
};

#endif // ASYNCWEBSYNCHRONIZATION_H_
#endif // ASYNCWEBSYNCHRONIZATION_H_
Loading