diff --git a/client/client.go b/client/client.go index 800f97ed82c3..714f85b91f77 100644 --- a/client/client.go +++ b/client/client.go @@ -396,6 +396,9 @@ func createClientWithKeyspace( nil, keyspaceID, c.svrUrls, c.tlsCfg, c.option) if err := c.setup(); err != nil { c.cancel() + if c.pdSvcDiscovery != nil { + c.pdSvcDiscovery.Close() + } return nil, err } diff --git a/client/http/client.go b/client/http/client.go index 1235266a2713..9069ddf66a7a 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -238,6 +238,7 @@ type client struct { callerID string respHandler respHandleFunc bo *retry.Backoffer + defaultSD bool } // ClientOption configures the HTTP client. @@ -282,6 +283,7 @@ func NewClientWithServiceDiscovery( opt(c) } c.inner.init(sd) + c.defaultSD = false return c } @@ -303,12 +305,16 @@ func NewClient( return nil } c.inner.init(sd) + c.defaultSD = true return c } // Close gracefully closes the HTTP client. func (c *client) Close() { c.inner.close() + if c.defaultSD && c.inner.sd != nil { + c.inner.sd.Close() + } log.Info("[pd] http client closed", zap.String("source", c.inner.source)) } diff --git a/client/pd_service_discovery_test.go b/client/pd_service_discovery_test.go index 1dc73af1f5f5..d97f117790fe 100644 --- a/client/pd_service_discovery_test.go +++ b/client/pd_service_discovery_test.go @@ -158,6 +158,8 @@ func (suite *serviceClientTestSuite) TearDownTest() { func (suite *serviceClientTestSuite) TearDownSuite() { suite.leaderServer.grpcServer.GracefulStop() suite.followerServer.grpcServer.GracefulStop() + suite.leaderClient.GetClientConn().Close() + suite.followerClient.GetClientConn().Close() suite.clean() } diff --git a/client/testutil/leak.go b/client/testutil/leak.go index ec2a65439411..28b5baae60f1 100644 --- a/client/testutil/leak.go +++ b/client/testutil/leak.go @@ -23,9 +23,4 @@ var LeakOptions = []goleak.Option{ goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).createTransport"), goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).resetTransport"), goleak.IgnoreTopFunction("google.golang.org/grpc.(*Server).handleRawConn"), - // TODO: remove the below options once we fixed the http connection leak problems - goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), - goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"), - goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*http2Server).keepalive"), - goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index cfc37f426288..a29e0ecc4566 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -77,19 +77,21 @@ func TestClientClusterIDCheck(t *testing.T) { defer cluster2.Destroy() endpoints2 := runServer(re, cluster2) // Try to create a client with the mixed endpoints. - _, err = pd.NewClientWithContext( + cli, err := pd.NewClientWithContext( ctx, append(endpoints1, endpoints2...), pd.SecurityOption{}, pd.WithMaxErrorRetry(1), ) re.Error(err) + defer cli.Close() re.Contains(err.Error(), "unmatched cluster id") // updateMember should fail due to unmatched cluster ID found. re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipClusterIDCheck", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipFirstUpdateMember", `return(true)`)) - _, err = pd.NewClientWithContext(ctx, []string{endpoints1[0], endpoints2[0]}, + cli, err = pd.NewClientWithContext(ctx, []string{endpoints1[0], endpoints2[0]}, pd.SecurityOption{}, pd.WithMaxErrorRetry(1), ) re.Error(err) + defer cli.Close() re.Contains(err.Error(), "ErrClientGetMember") re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipFirstUpdateMember")) re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipClusterIDCheck")) @@ -105,6 +107,7 @@ func TestClientLeaderChange(t *testing.T) { endpoints := runServer(re, cluster) cli := setupCli(re, ctx, endpoints) + defer cli.Close() innerCli, ok := cli.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) @@ -165,6 +168,7 @@ func TestLeaderTransfer(t *testing.T) { endpoints := runServer(re, cluster) cli := setupCli(re, ctx, endpoints) + defer cli.Close() var lastTS uint64 testutil.Eventually(re, func() bool { @@ -254,6 +258,7 @@ func TestTSOAllocatorLeader(t *testing.T) { allocatorLeaderMap[dcLocation] = pdName } cli := setupCli(re, ctx, endpoints) + defer cli.Close() innerCli, ok := cli.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) @@ -287,7 +292,9 @@ func TestTSOFollowerProxy(t *testing.T) { endpoints := runServer(re, cluster) cli1 := setupCli(re, ctx, endpoints) + defer cli1.Close() cli2 := setupCli(re, ctx, endpoints) + defer cli2.Close() cli2.UpdateOption(pd.EnableTSOFollowerProxy, true) var wg sync.WaitGroup @@ -325,6 +332,7 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { endpoints := runServer(re, cluster) cli := setupCli(re, ctx, endpoints) + defer cli.Close() var wg sync.WaitGroup var maxUnavailableTime, leaderReadyTime time.Time @@ -397,6 +405,7 @@ func TestGlobalAndLocalTSO(t *testing.T) { endpoints := runServer(re, cluster) cli := setupCli(re, ctx, endpoints) + defer cli.Close() // Wait for all nodes becoming healthy. time.Sleep(time.Second * 5) @@ -508,6 +517,7 @@ func TestCustomTimeout(t *testing.T) { endpoints := runServer(re, cluster) cli := setupCli(re, ctx, endpoints, pd.WithCustomTimeoutOption(time.Second)) + defer cli.Close() start := time.Now() re.NoError(failpoint.Enable("github.com/tikv/pd/server/customTimeout", "return(true)")) @@ -581,6 +591,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionByFollowerForwardin defer cancel() cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true)) + defer cli.Close() re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", "return(true)")) time.Sleep(200 * time.Millisecond) r, err := cli.GetRegion(context.Background(), []byte("a")) @@ -600,6 +611,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding1( ctx, cancel := context.WithCancel(suite.ctx) defer cancel() cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true)) + defer cli.Close() re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)")) var lastTS uint64 @@ -634,6 +646,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding2( ctx, cancel := context.WithCancel(suite.ctx) defer cancel() cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true)) + defer cli.Close() re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)")) var lastTS uint64 @@ -670,6 +683,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoAndRegionByFollowerFor re.NoError(failpoint.Enable("github.com/tikv/pd/client/grpcutil/unreachableNetwork2", fmt.Sprintf("return(\"%s\")", follower.GetAddr()))) cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true)) + defer cli.Close() var lastTS uint64 testutil.Eventually(re, func() bool { physical, logical, err := cli.GetTS(context.TODO()) @@ -732,6 +746,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { cluster := suite.cluster cli := setupCli(re, ctx, suite.endpoints) + defer cli.Close() cli.UpdateOption(pd.EnableFollowerHandle, true) re.NotEmpty(cluster.WaitLeader()) leader := cluster.GetLeaderServer() diff --git a/tests/integrations/client/client_tls_test.go b/tests/integrations/client/client_tls_test.go index b46895f4f8cf..dea5b7d8e296 100644 --- a/tests/integrations/client/client_tls_test.go +++ b/tests/integrations/client/client_tls_test.go @@ -173,13 +173,13 @@ func testTLSReload( CertPath: testClientTLSInfo.CertFile, KeyPath: testClientTLSInfo.KeyFile, }, pd.WithGRPCDialOptions(grpc.WithBlock())) + cli.Close() if err != nil { errc <- err dcancel() return } dcancel() - cli.Close() } }() @@ -212,12 +212,13 @@ func testTLSReload( caData, certData, keyData := loadTLSContent(re, testClientTLSInfo.TrustedCAFile, testClientTLSInfo.CertFile, testClientTLSInfo.KeyFile) ctx1, cancel1 := context.WithTimeout(ctx, 2*time.Second) - _, err = pd.NewClientWithContext(ctx1, endpoints, pd.SecurityOption{ + cli, err = pd.NewClientWithContext(ctx1, endpoints, pd.SecurityOption{ SSLCABytes: caData, SSLCertBytes: certData, SSLKEYBytes: keyData, }, pd.WithGRPCDialOptions(grpc.WithBlock())) re.NoError(err) + defer cli.Close() cancel1() } diff --git a/tests/integrations/client/gc_client_test.go b/tests/integrations/client/gc_client_test.go index a2c3c3263f7d..737fd09a08fa 100644 --- a/tests/integrations/client/gc_client_test.go +++ b/tests/integrations/client/gc_client_test.go @@ -89,6 +89,7 @@ func (suite *gcClientTestSuite) TearDownSuite() { re := suite.Require() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/gc/checkKeyspace")) suite.cleanup() + suite.client.Close() } func (suite *gcClientTestSuite) TearDownTest() { diff --git a/tests/integrations/client/global_config_test.go b/tests/integrations/client/global_config_test.go index 349b16579bdf..aeb704c33051 100644 --- a/tests/integrations/client/global_config_test.go +++ b/tests/integrations/client/global_config_test.go @@ -80,6 +80,7 @@ func (suite *globalConfigTestSuite) SetupSuite() { func (suite *globalConfigTestSuite) TearDownSuite() { suite.client.Close() suite.cleanup() + suite.client.Close() } func (suite *globalConfigTestSuite) GetEtcdPath(configPath string) string { diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 5ff5fef02222..4e467c1c113e 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -675,6 +675,7 @@ func (suite *httpClientTestSuite) TestRedirectWithMetrics() { env := suite.env[defaultServiceDiscovery] cli := setupCli(suite.Require(), env.ctx, env.endpoints) + defer cli.Close() sd := cli.GetServiceDiscovery() metricCnt := prometheus.NewCounterVec( diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 52248086249d..af33b85b1a1a 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -151,6 +151,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() { time.Sleep(3 * time.Second) }) + for _, client := range clients { + client.Close() + } } func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroups() { @@ -232,6 +235,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() { time.Sleep(3 * time.Second) }) + for _, client := range clients { + client.Close() + } } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index a6a2c42acc96..b175f63c8f49 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -186,6 +186,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) { defer cleanup() cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV2(""), []string{backendEndpoints}) + defer cli.Close() physical, logical, err := cli.GetTS(ctx) re.NoError(err) ts := tsoutil.ComposeTS(physical, logical) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index c8e8f5b2f523..c5cc6ec5d6dc 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -183,6 +183,9 @@ func (suite *tsoClientTestSuite) TearDownSuite() { suite.tsoCluster.Destroy() } suite.cluster.Destroy() + for _, client := range suite.clients { + client.Close() + } } func (suite *tsoClientTestSuite) TestGetTS() { @@ -252,6 +255,7 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { defer cancel() client := mcs.SetupClientWithKeyspaceID( ctx, re, keyspaceID, strings.Split(suite.backendEndpoints, ",")) + defer client.Close() var lastTS uint64 for j := 0; j < tsoRequestRound; j++ { physical, logical, err := client.GetTS(ctx) @@ -491,6 +495,7 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) { pdClient, err := pd.NewClientWithContext(context.Background(), []string{backendEndpoints}, pd.SecurityOption{}, pd.WithMaxErrorRetry(1)) re.NoError(err) + defer pdClient.Close() // Create a TSO cluster which has 2 servers tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, backendEndpoints)