Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](third party) fix hang when destroy of rdkafka instances #44913

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix hang when destroy of rdkafka instances
  • Loading branch information
sollhui committed Dec 3, 2024
commit 3d140c546ade6d73217b709807074c47cf1d15a0
111 changes: 110 additions & 1 deletion thirdparty/patches/librdkafka-1.9.2.patch
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,19 @@

--- src/rdkafka_broker.c
+++ src/rdkafka_broker.c
@@ -5461,7 +5461,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
@@ -3288,6 +3288,11 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) {
: (topic_err
? topic_err
: RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION));
+
+ if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
+ rd_kafka_toppar_purge_internal_fetch_queue_maybe(
+ rktp);
+ }
}

rd_kafka_toppar_unlock(rktp);
@@ -5461,7 +5466,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
*/
void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {

Expand All @@ -78,3 +90,100 @@
rd_assert(TAILQ_EMPTY(&rkb->rkb_monitors));
rd_assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
rd_assert(TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs));
--- src/rdkafka_cgrp.c
+++ src/rdkafka_cgrp.c
@@ -2734,6 +2734,9 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg,
rd_kafka_toppar_lock(rktp);
rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;
+
+ rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp);
+
rd_kafka_toppar_unlock(rktp);

rd_list_remove(&rkcg->rkcg_toppars, rktp);
--- src/rdkafka_partition.c
+++ src/rdkafka_partition.c
@@ -959,7 +959,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp,
rd_kafka_toppar_unlock(rktp);
}

+/**
+ * @brief Purge internal fetch queue if toppar is stopped
+ * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster
+ * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's
+ * removed starting from a metadata response and stopped from a rebalance or a
+ * consumer close.
+ *
+ * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same
+ * toppar that stop destroying a consumer.
+ *
+ * @locks rd_kafka_toppar_lock() MUST be held
+ */
+void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp) {
+ rd_kafka_q_t *rkq;
+ rkq = rktp->rktp_fetchq;
+ mtx_lock(&rkq->rkq_lock);
+ if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE &&
+ !rktp->rktp_fetchq->rkq_fwdq) {
+ rd_kafka_op_t *rko;
+ int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0;
+
+ /* Partition is being removed from the cluster and it's stopped,
+ * so rktp->rktp_fetchq->rkq_fwdq is NULL.
+ * Purge remaining operations in rktp->rktp_fetchq->rkq_q,
+ * while holding lock, to avoid circular references */
+ rko = TAILQ_FIRST(&rkq->rkq_q);
+ while (rko) {
+ if (rko->rko_type != RD_KAFKA_OP_BARRIER &&
+ rko->rko_type != RD_KAFKA_OP_FETCH) {
+ rd_kafka_log(
+ rktp->rktp_rkt->rkt_rk, LOG_WARNING,
+ "PARTDEL",
+ "Purging toppar fetch queue buffer op"
+ "with unexpected type: %s",
+ rd_kafka_op2str(rko->rko_type));
+ }
+
+ if (rko->rko_type == RD_KAFKA_OP_BARRIER)
+ barrier_cnt++;
+ else if (rko->rko_type == RD_KAFKA_OP_FETCH)
+ message_cnt++;
+ else
+ other_cnt++;

+ rko = TAILQ_NEXT(rko, rko_link);
+ cnt++;
+ }
+
+ if (cnt) {
+ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
+ "Purge toppar fetch queue buffer "
+ "containing %d op(s) "
+ "(%d barrier(s), %d message(s), %d other)"
+ " to avoid "
+ "circular references",
+ cnt, barrier_cnt, message_cnt, other_cnt);
+ rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false);
+ } else {
+ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
+ "Not purging toppar fetch queue buffer."
+ " No ops present in the buffer.");
+ }
+ }
+ mtx_unlock(&rkq->rkq_lock);
+}

/**
* Helper method for purging queues when removing a toppar.
--- src/rdkafka_partition.h
+++ src/rdkafka_partition.h
@@ -541,6 +541,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp,
int64_t query_offset,
int backoff_ms);

+void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp);
+
int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp,
int purge_flags,
rd_bool_t include_xmit_msgq);
Loading