From 0d6a6613d2881af53a0839a592e11aafd747b12f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 16 Apr 2024 14:54:06 +0800 Subject: [PATCH] kvclient(ticdc): close the grpc client after all goroutine exited to prevent data race and panic (#10865) (#10906) close pingcap/tiflow#10799 --- cdc/kv/shared_stream.go | 6 ++++-- cdc/kv/sharedconn/conn_and_client.go | 5 +++-- tests/integration_tests/kafka_big_txn_v2/run.sh | 7 ++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index 5e13007f096..a2a3f4f6540 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -166,7 +166,10 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste } }() - cc, err := c.grpcPool.Connect(ctx, rs.storeAddr) + // grpc stream can be canceled by this context when any goroutine meet error, + // the underline established grpc connections is unaffected. + g, gctx := errgroup.WithContext(ctx) + cc, err := c.grpcPool.Connect(gctx, rs.storeAddr) if err != nil { log.Warn("event feed create grpc stream failed", zap.String("namespace", c.changefeed.Namespace), @@ -178,7 +181,6 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste return isCanceled() } - g, gctx := errgroup.WithContext(ctx) if cc.Multiplexing() { s.multiplexing = cc g.Go(func() error { return s.receive(gctx, c, rs, s.multiplexing, invalidSubscriptionID) }) diff --git a/cdc/kv/sharedconn/conn_and_client.go b/cdc/kv/sharedconn/conn_and_client.go index c3a96473dec..eeff06d60b1 100644 --- a/cdc/kv/sharedconn/conn_and_client.go +++ b/cdc/kv/sharedconn/conn_and_client.go @@ -56,6 +56,7 @@ type ConnAndClient struct { conn *Conn array *connArray client cdcpb.ChangeData_EventFeedV2Client + closed atomic.Bool } // Conn is a connection. @@ -161,9 +162,9 @@ func (c *ConnAndClient) Multiplexing() bool { // Release releases a ConnAndClient object. func (c *ConnAndClient) Release() { - if c.client != nil { + if c.client != nil && !c.closed.Load() { _ = c.client.CloseSend() - c.client = nil + c.closed.Store(true) } if c.conn != nil && c.array != nil { c.array.release(c.conn, false) diff --git a/tests/integration_tests/kafka_big_txn_v2/run.sh b/tests/integration_tests/kafka_big_txn_v2/run.sh index f0c88efa38a..5190a932167 100755 --- a/tests/integration_tests/kafka_big_txn_v2/run.sh +++ b/tests/integration_tests/kafka_big_txn_v2/run.sh @@ -45,12 +45,9 @@ function run() { check_table_exists "big_txn.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_sql "CREATE TABLE big_txn.usertable1 LIKE big_txn.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT INTO big_txn.usertable1 SELECT * FROM big_txn.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - sleep 60 - check_table_exists "big_txn.usertable1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - - run_sql "CREATE TABLE big_txn.finish_mark_1 (a int primary key);" + run_sql "CREATE TABLE big_txn.finish_mark (a int primary key);" sleep 120 - check_table_exists "big_txn.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + check_table_exists "big_txn.finish_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml