Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient(ticdc): close the grpc client after all goroutine exited to prevent data race and panic #10865

Merged
merged 5 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
}
}()

cc, err := c.grpcPool.Connect(ctx, rs.storeAddr)
// grpc stream can be canceled by this context when any goroutine meet error,
// the underline established grpc connections is unaffected.
g, gctx := errgroup.WithContext(ctx)
cc, err := c.grpcPool.Connect(gctx, rs.storeAddr)
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Warn("event feed create grpc stream failed",
zap.String("namespace", c.changefeed.Namespace),
Expand All @@ -181,7 +184,6 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
return isCanceled()
}

g, gctx := errgroup.WithContext(ctx)
if cc.Multiplexing() {
s.multiplexing = cc
g.Go(func() error { return s.receive(gctx, c, rs, s.multiplexing, invalidSubscriptionID) })
Expand Down
5 changes: 3 additions & 2 deletions cdc/kv/sharedconn/conn_and_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ConnAndClient struct {
conn *Conn
array *connArray
client cdcpb.ChangeData_EventFeedV2Client
closed atomic.Bool
}

// Conn is a connection.
Expand Down Expand Up @@ -161,9 +162,9 @@ func (c *ConnAndClient) Multiplexing() bool {

// Release releases a ConnAndClient object.
func (c *ConnAndClient) Release() {
if c.client != nil {
if c.client != nil && !c.closed.Load() {
_ = c.client.CloseSend()
c.client = nil
c.closed.Store(true)
}
if c.conn != nil && c.array != nil {
c.array.release(c.conn, false)
Expand Down
7 changes: 2 additions & 5 deletions tests/integration_tests/kafka_big_txn_v2/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@ function run() {
check_table_exists "big_txn.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "CREATE TABLE big_txn.usertable1 LIKE big_txn.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO big_txn.usertable1 SELECT * FROM big_txn.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
sleep 60
check_table_exists "big_txn.usertable1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_sql "CREATE TABLE big_txn.finish_mark_1 (a int primary key);"
run_sql "CREATE TABLE big_txn.finish_mark (a int primary key);"
sleep 120
check_table_exists "big_txn.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60
check_table_exists "big_txn.finish_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

Expand Down
Loading