Skip to content

Commit

Permalink
Split block clients queue into multi
Browse files Browse the repository at this point in the history
  • Loading branch information
blacktear23 committed Feb 19, 2022
1 parent d5915a1 commit cb98efa
Showing 1 changed file with 96 additions and 86 deletions.
182 changes: 96 additions & 86 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,11 @@ typedef struct RedisModuleBlockedClient {
Used for measuring latency of blocking cmds */
} RedisModuleBlockedClient;

static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
static list *moduleUnblockedClients;
#define BLOCKED_CLIENTS_QUEUES 2

static size_t moduleUnblockClientQueueIdx = 0;
static pthread_mutex_t moduleUnblockedClientsMutex[BLOCKED_CLIENTS_QUEUES];
static list *moduleUnblockedClients[BLOCKED_CLIENTS_QUEUES];

/* Pool for temporary client objects. Creating and destroying a client object is
* costly. We manage a pool of clients to avoid this cost. Pool expands when
Expand Down Expand Up @@ -983,7 +986,7 @@ int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc c
cp->rediscmd->arity = cmdfunc ? -1 : -2; /* Default value, can be changed later via dedicated API */

serverAssert(dictAdd(server.commands, sdsdup(declared_name), cp->rediscmd) == DICT_OK);
serverAssert(dictAdd(server.orig_commands, sdsdup(declared_name), cp->rediscmd) == DICT_OK);
dictAdd(server.orig_commands, sdsdup(declared_name), cp->rediscmd);
cp->rediscmd->id = ACLGetCommandID(declared_name); /* ID used for ACL. */
return REDISMODULE_OK;
}
Expand Down Expand Up @@ -6458,16 +6461,18 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {

/* Implements RM_UnblockClient() and moduleUnblockClient(). */
int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
int idx = moduleUnblockClientQueueIdx++;
int i = idx % BLOCKED_CLIENTS_QUEUES;
pthread_mutex_lock(&moduleUnblockedClientsMutex[i]);
if (!bc->blocked_on_keys) bc->privdata = privdata;
bc->unblocked = 1;
if (listLength(moduleUnblockedClients) == 0) {
if (listLength(moduleUnblockedClients[i]) == 0) {
if (write(server.module_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
}
}
listAddNodeTail(moduleUnblockedClients,bc);
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
listAddNodeTail(moduleUnblockedClients[i],bc);
pthread_mutex_unlock(&moduleUnblockedClientsMutex[i]);
return REDISMODULE_OK;
}

Expand Down Expand Up @@ -6556,90 +6561,92 @@ void moduleHandleBlockedClients(void) {
listNode *ln;
RedisModuleBlockedClient *bc;

pthread_mutex_lock(&moduleUnblockedClientsMutex);
while (listLength(moduleUnblockedClients)) {
ln = listFirst(moduleUnblockedClients);
bc = ln->value;
client *c = bc->client;
listDelNode(moduleUnblockedClients,ln);
pthread_mutex_unlock(&moduleUnblockedClientsMutex);

/* Release the lock during the loop, as long as we don't
* touch the shared list. */

/* Call the reply callback if the client is valid and we have
* any callback. However the callback is not called if the client
* was blocked on keys (RM_BlockClientOnKeys()), because we already
* called such callback in moduleTryServeClientBlockedOnKey() when
* the key was signaled as ready. */
uint64_t reply_us = 0;
if (c && !bc->blocked_on_keys && bc->reply_callback) {
RedisModuleCtx ctx;
moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_REPLY);
ctx.blocked_privdata = bc->privdata;
ctx.blocked_ready_key = NULL;
ctx.client = bc->client;
ctx.blocked_client = bc;
monotime replyTimer;
elapsedStart(&replyTimer);
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
reply_us = elapsedUs(replyTimer);
moduleFreeContext(&ctx);
}
/* Update stats now that we've finished the blocking operation.
* This needs to be out of the reply callback above given that a
* module might not define any callback and still do blocking ops.
*/
if (c && !bc->blocked_on_keys) {
updateStatsOnUnblock(c, bc->background_duration, reply_us);
}
for (int i = 0; i < BLOCKED_CLIENTS_QUEUES; i++) {
pthread_mutex_lock(&moduleUnblockedClientsMutex[i]);
while (listLength(moduleUnblockedClients[i])) {
ln = listFirst(moduleUnblockedClients[i]);
bc = ln->value;
client *c = bc->client;
listDelNode(moduleUnblockedClients[i],ln);
pthread_mutex_unlock(&moduleUnblockedClientsMutex[i]);

/* Release the lock during the loop, as long as we don't
* touch the shared list. */

/* Call the reply callback if the client is valid and we have
* any callback. However the callback is not called if the client
* was blocked on keys (RM_BlockClientOnKeys()), because we already
* called such callback in moduleTryServeClientBlockedOnKey() when
* the key was signaled as ready. */
uint64_t reply_us = 0;
if (c && !bc->blocked_on_keys && bc->reply_callback) {
RedisModuleCtx ctx;
moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_REPLY);
ctx.blocked_privdata = bc->privdata;
ctx.blocked_ready_key = NULL;
ctx.client = bc->client;
ctx.blocked_client = bc;
monotime replyTimer;
elapsedStart(&replyTimer);
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
reply_us = elapsedUs(replyTimer);
moduleFreeContext(&ctx);
}
/* Update stats now that we've finished the blocking operation.
* This needs to be out of the reply callback above given that a
* module might not define any callback and still do blocking ops.
*/
if (c && !bc->blocked_on_keys) {
updateStatsOnUnblock(c, bc->background_duration, reply_us);
}

/* Free privdata if any. */
if (bc->privdata && bc->free_privdata) {
RedisModuleCtx ctx;
int ctx_flags = c == NULL ? REDISMODULE_CTX_BLOCKED_DISCONNECTED : REDISMODULE_CTX_NONE;
moduleCreateContext(&ctx, bc->module, ctx_flags);
ctx.blocked_privdata = bc->privdata;
ctx.client = bc->client;
bc->free_privdata(&ctx,bc->privdata);
moduleFreeContext(&ctx);
}
/* Free privdata if any. */
if (bc->privdata && bc->free_privdata) {
RedisModuleCtx ctx;
int ctx_flags = c == NULL ? REDISMODULE_CTX_BLOCKED_DISCONNECTED : REDISMODULE_CTX_NONE;
moduleCreateContext(&ctx, bc->module, ctx_flags);
ctx.blocked_privdata = bc->privdata;
ctx.client = bc->client;
bc->free_privdata(&ctx,bc->privdata);
moduleFreeContext(&ctx);
}

/* It is possible that this blocked client object accumulated
* replies to send to the client in a thread safe context.
* We need to glue such replies to the client output buffer and
* free the temporary client we just used for the replies. */
if (c) AddReplyFromClient(c, bc->reply_client);
moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);

if (c != NULL) {
/* Before unblocking the client, set the disconnect callback
* to NULL, because if we reached this point, the client was
* properly unblocked by the module. */
bc->disconnect_callback = NULL;
unblockClient(c);
/* Put the client in the list of clients that need to write
* if there are pending replies here. This is needed since
* during a non blocking command the client may receive output. */
if (clientHasPendingReplies(c) &&
!(c->flags & CLIENT_PENDING_WRITE))
{
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
/* It is possible that this blocked client object accumulated
* replies to send to the client in a thread safe context.
* We need to glue such replies to the client output buffer and
* free the temporary client we just used for the replies. */
if (c) AddReplyFromClient(c, bc->reply_client);
moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);

if (c != NULL) {
/* Before unblocking the client, set the disconnect callback
* to NULL, because if we reached this point, the client was
* properly unblocked by the module. */
bc->disconnect_callback = NULL;
unblockClient(c);
/* Put the client in the list of clients that need to write
* if there are pending replies here. This is needed since
* during a non blocking command the client may receive output. */
if (clientHasPendingReplies(c) &&
!(c->flags & CLIENT_PENDING_WRITE))
{
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
}
}

/* Free 'bc' only after unblocking the client, since it is
* referenced in the client blocking context, and must be valid
* when calling unblockClient(). */
bc->module->blocked_clients--;
zfree(bc);
/* Free 'bc' only after unblocking the client, since it is
* referenced in the client blocking context, and must be valid
* when calling unblockClient(). */
bc->module->blocked_clients--;
zfree(bc);

/* Lock again before to iterate the loop. */
pthread_mutex_lock(&moduleUnblockedClientsMutex);
/* Lock again before to iterate the loop. */
pthread_mutex_lock(&moduleUnblockedClientsMutex[i]);
}
pthread_mutex_unlock(&moduleUnblockedClientsMutex[i]);
}
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
}

/* Check if the specified client can be safely timed out using
Expand Down Expand Up @@ -9867,7 +9874,10 @@ void moduleInitModulesSystemLast(void) {
}

void moduleInitModulesSystem(void) {
moduleUnblockedClients = listCreate();
for (int i = 0; i < BLOCKED_CLIENTS_QUEUES; i++){
moduleUnblockedClients[i] = listCreate();
pthread_mutex_init(&moduleUnblockedClientsMutex[i], NULL);
}
server.loadmodule_queue = listCreate();
modules = dictCreate(&modulesDictType);

Expand Down

0 comments on commit cb98efa

Please sign in to comment.