Skip to content

Commit

Permalink
kvclient(ticdc): close the grpc client after all goroutine exited to …
Browse files Browse the repository at this point in the history
…prevent data race and panic (#10865) (#10906)

close #10799
  • Loading branch information
ti-chi-bot authored Apr 16, 2024
1 parent d331dd5 commit 0d6a661
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
6 changes: 4 additions & 2 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
}
}()

cc, err := c.grpcPool.Connect(ctx, rs.storeAddr)
// grpc stream can be canceled by this context when any goroutine meet error,
// the underline established grpc connections is unaffected.
g, gctx := errgroup.WithContext(ctx)
cc, err := c.grpcPool.Connect(gctx, rs.storeAddr)
if err != nil {
log.Warn("event feed create grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
Expand All @@ -178,7 +181,6 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
return isCanceled()
}

g, gctx := errgroup.WithContext(ctx)
if cc.Multiplexing() {
s.multiplexing = cc
g.Go(func() error { return s.receive(gctx, c, rs, s.multiplexing, invalidSubscriptionID) })
Expand Down
5 changes: 3 additions & 2 deletions cdc/kv/sharedconn/conn_and_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ConnAndClient struct {
conn *Conn
array *connArray
client cdcpb.ChangeData_EventFeedV2Client
closed atomic.Bool
}

// Conn is a connection.
Expand Down Expand Up @@ -161,9 +162,9 @@ func (c *ConnAndClient) Multiplexing() bool {

// Release releases a ConnAndClient object.
func (c *ConnAndClient) Release() {
if c.client != nil {
if c.client != nil && !c.closed.Load() {
_ = c.client.CloseSend()
c.client = nil
c.closed.Store(true)
}
if c.conn != nil && c.array != nil {
c.array.release(c.conn, false)
Expand Down
7 changes: 2 additions & 5 deletions tests/integration_tests/kafka_big_txn_v2/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@ function run() {
check_table_exists "big_txn.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "CREATE TABLE big_txn.usertable1 LIKE big_txn.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO big_txn.usertable1 SELECT * FROM big_txn.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
sleep 60
check_table_exists "big_txn.usertable1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_sql "CREATE TABLE big_txn.finish_mark_1 (a int primary key);"
run_sql "CREATE TABLE big_txn.finish_mark (a int primary key);"
sleep 120
check_table_exists "big_txn.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60
check_table_exists "big_txn.finish_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

Expand Down

0 comments on commit 0d6a661

Please sign in to comment.