Skip to content

Commit

Permalink
* FIX [nmq_mqtt] more strict lock of pipe map.
Browse files Browse the repository at this point in the history
Signed-off-by: jaylin <jaylin@emqx.io>
  • Loading branch information
JaylinYu committed Aug 30, 2024
1 parent 3886a2d commit c98f554
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions src/sp/protocol/mqtt/nmq_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ nano_ctx_send(void *arg, nni_aio *aio)
}

// 2 locks here cause performance degradation
nni_mtx_unlock(&s->lk);
nni_mtx_lock(&p->lk);
nni_mtx_unlock(&s->lk);

if (p->pipe->cache) {
if (nni_msg_get_type(msg) == CMD_PUBLISH) {
Expand Down Expand Up @@ -568,12 +568,23 @@ static void
nano_pipe_fini(void *arg)
{
nano_pipe *p = arg;
nano_sock *s = p->broker;
nng_msg *msg;

log_trace(" ########## nano_pipe_fini ########## ");
if (p->pipe->cache) {
return; // your time is yet to come
}
nni_mtx_lock(&s->lk);
nano_pipe *tpipe = NULL;
if ((tpipe = nni_id_get(&s->pipes, p->id)) != NULL) {
// Pipe is gone. Make this look like a good send to avoid
// disrupting the state machine. We don't care if the peer
// lost interest in our reply.
nni_id_remove(&s->pipes, p->id);
log_warn("removing pipe id %ld again", p->id);
}
nni_mtx_lock(&p->lk);
if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) {
nni_aio_set_msg(&p->aio_recv, NULL);
}
Expand All @@ -583,7 +594,6 @@ nano_pipe_fini(void *arg)
}

//Safely free the msgs in qos_db, only when nano_qos_db is not taken by new pipe
nni_mtx_lock(&p->lk);
void *nano_qos_db = p->pipe->nano_qos_db;
if (p->event == true) {
if (!p->broker->conf->sqlite.enable && nano_qos_db != NULL) {
Expand All @@ -603,6 +613,7 @@ nano_pipe_fini(void *arg)
nni_aio_fini(&p->aio_recv);
nni_aio_fini(&p->aio_timer);
nano_nni_lmq_fini(&p->rlmq);
nni_mtx_unlock(&s->lk);
}

static int
Expand Down

0 comments on commit c98f554

Please sign in to comment.