Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ LDClientInit(struct LDConfig *const config, const unsigned int maxwaitmilli)
client->shuttingdown = false;
client->config = config;

client->threadAwaken = false;
client->inClientInit = true;

if (!(client->eventProcessor = LDi_newEventProcessor(config))) {
LDStoreDestroy(client->store);
LDFree(client);
Expand All @@ -55,6 +58,8 @@ LDClientInit(struct LDConfig *const config, const unsigned int maxwaitmilli)
}

LDi_rwlock_init(&client->lock);
LDi_mutex_init(&client->threadSleepLock);
LDi_cond_init(&client->threadSleepCond);

LDi_thread_create(&client->thread, LDi_networkthread, client);

Expand All @@ -74,6 +79,9 @@ LDClientInit(struct LDConfig *const config, const unsigned int maxwaitmilli)
} while ((diff = now - start) < maxwaitmilli);
}
LD_LOG(LD_LOG_INFO, "initialized");
LDi_rwlock_wrlock(&client->lock);
client->inClientInit = false;
LDi_rwlock_wrunlock(&client->lock);

return client;
}
Expand All @@ -87,11 +95,18 @@ LDClientClose(struct LDClient *const client)
client->shuttingdown = true;
LDi_rwlock_wrunlock(&client->lock);

LDi_mutex_lock(&client->threadSleepLock);
client->threadAwaken = true;
LDi_cond_signal(&client->threadSleepCond);
LDi_mutex_unlock(&client->threadSleepLock);

/* wait until background exits */
LDi_thread_join(&client->thread);

/* cleanup resources */
LDi_rwlock_destroy(&client->lock);
LDi_mutex_destroy(&client->threadSleepLock);
LDi_cond_destroy(&client->threadSleepCond);
LDi_freeEventProcessor(client->eventProcessor);

LDStoreDestroy(client->store);
Expand Down Expand Up @@ -274,5 +289,10 @@ LDClientFlush(struct LDClient *const client)
client->shouldFlush = true;
LDi_rwlock_wrunlock(&client->lock);

LDi_mutex_lock(&client->threadSleepLock);
client->threadAwaken = true;
LDi_cond_signal(&client->threadSleepCond);
LDi_mutex_unlock(&client->threadSleepLock);

return true;
}
5 changes: 5 additions & 0 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ struct LDClient {
bool shouldFlush;
struct LDStore *store;
struct EventProcessor *eventProcessor;
bool inClientInit;

ld_mutex_t threadSleepLock;
ld_cond_t threadSleepCond;
bool threadAwaken;
};
44 changes: 43 additions & 1 deletion src/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,31 @@ LDi_removeAndFreeHandle(CURLM *const multi, CURL *const handle)
return true;
}

bool
LDi_interruptableSleep(struct LDClient *const client,
const unsigned long milliseconds)
{
double deadline;
double now;
bool awoken = false;
LDi_getMonotonicMilliseconds(&now);
deadline = now + milliseconds;
LDi_mutex_lock(&client->threadSleepLock);
while (now < deadline && !client->threadAwaken) {
int wait_ms = deadline - now;
if (wait_ms < 10) {
wait_ms = 10;
}

LDi_cond_wait(&client->threadSleepCond, &client->threadSleepLock, wait_ms);
LDi_getMonotonicMilliseconds(&now);
}
awoken = client->threadAwaken;
client->threadAwaken = false;
LDi_mutex_unlock(&client->threadSleepLock);
return awoken;
}

THREAD_RETURN
LDi_networkthread(void* const clientref)
{
Expand Down Expand Up @@ -205,11 +230,14 @@ LDi_networkthread(void* const clientref)
return THREAD_RETURN_DEFAULT;
}

unsigned long sleep_ms = 10;

while (true) {
struct CURLMsg *info;
int running_handles, active_events;
unsigned int i;
bool offline;
bool inClientInit;

info = NULL;
running_handles = 0;
Expand All @@ -222,6 +250,7 @@ LDi_networkthread(void* const clientref)
break;
}
offline = client->config->offline;
inClientInit = client->inClientInit;
LDi_rwlock_rdunlock(&client->lock);

curl_multi_perform(multihandle, &running_handles);
Expand Down Expand Up @@ -303,7 +332,20 @@ LDi_networkthread(void* const clientref)

if (!active_events) {
/* if curl is not doing anything wait so we don't burn CPU */
LDi_sleepMilliseconds(10);
/* If there are no active events, we aren't interrupted and we aren't
* intializing, we'll backoff. If we're doing something interesting
* we'll start over at 10ms for responsiveness.
*/
bool interrupted = LDi_interruptableSleep(client, sleep_ms);
if (inClientInit || interrupted) {
sleep_ms = 10;
} else if (sleep_ms * 2 < 100) {
sleep_ms *= 2;
} else {
sleep_ms = 100;
}
} else {
sleep_ms = 10;
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@ bool validatePutBody(const struct LDJSON *const put);
bool LDi_addHandle(CURLM *const multi,
struct NetworkInterface *const networkInterface, CURL *const handle);

bool LDi_removeAndFreeHandle(CURLM *const multi, CURL *const handle);
bool LDi_removeAndFreeHandle(CURLM *const multi, CURL *const handle);

bool LDi_interruptableSleep(struct LDClient *const client,
const unsigned long milliseconds);