Skip to content

Commit

Permalink
add leak
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jan 31, 2024
1 parent 1c54865 commit 30ddb17
Show file tree
Hide file tree
Showing 21 changed files with 283 additions and 61 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,4 @@ replace google.golang.org/grpc v1.59.0 => google.golang.org/grpc v1.26.0
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
replace github.com/pingcap/tidb-dashboard => github.com/HuSharp/tidb-dashboard v0.0.0-20240130065036-5d37687b904c
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/AlekSi/gocov-xml v1.0.0 h1:4QctJBgXEkbzeKz6PJy6bt3JSPNSN4I2mITYW+eKUo
github.com/AlekSi/gocov-xml v1.0.0/go.mod h1:J0qYeZ6tDg4oZubW9mAAgxlqw39PDfoEkzB3HXSbEuA=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/HuSharp/tidb-dashboard v0.0.0-20240130065036-5d37687b904c h1:0w4RVGS3RFAKYyfhf8UpaPQz9YFG9VxEeCQ+n4ezypc=
github.com/HuSharp/tidb-dashboard v0.0.0-20240130065036-5d37687b904c/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
Expand Down Expand Up @@ -440,8 +442,6 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20240111062855-41f7c8011953 h1:vY/bY5vkSvvuXB1030AUmy0LFhuEA53ryVdF/bTbFXU=
github.com/pingcap/tidb-dashboard v0.0.0-20240111062855-41f7c8011953/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
3 changes: 2 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func CheckClusterID(localClusterID types.ID, um types.URLsMap, tlsConfig *tls.Co

for _, u := range peerURLs {
trp := &http.Transport{
TLSClientConfig: tlsConfig,
TLSClientConfig: tlsConfig,
DisableKeepAlives: true,
}
remoteCluster, gerr := etcdserver.GetClusterFromRemotePeers(nil, []string{u}, trp)
trp.CloseIdleConnections()
Expand Down
92 changes: 61 additions & 31 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ import (
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/types"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.LeakOptions...)
testutil.MustTestMainWithLeakDetection(m)
}

func TestMemberHelpers(t *testing.T) {
testutil.RegisterLeakDetection(t)
re := require.New(t)
servers, client1, clean := NewTestEtcdCluster(t, 1)
defer clean()
Expand Down Expand Up @@ -81,6 +81,7 @@ func TestMemberHelpers(t *testing.T) {
}

func TestEtcdKVGet(t *testing.T) {
testutil.RegisterLeakDetection(t)
re := require.New(t)
_, client, clean := NewTestEtcdCluster(t, 1)
defer clean()
Expand Down Expand Up @@ -119,6 +120,7 @@ func TestEtcdKVGet(t *testing.T) {
}

func TestEtcdKVPutWithTTL(t *testing.T) {
testutil.RegisterLeakDetection(t)
re := require.New(t)
_, client, clean := NewTestEtcdCluster(t, 1)
defer clean()
Expand Down Expand Up @@ -147,6 +149,7 @@ func TestEtcdKVPutWithTTL(t *testing.T) {
}

func TestInitClusterID(t *testing.T) {
testutil.RegisterLeakDetection(t)
re := require.New(t)
_, client, clean := NewTestEtcdCluster(t, 1)
defer clean()
Expand All @@ -166,6 +169,7 @@ func TestInitClusterID(t *testing.T) {
}

func TestEtcdClientSync(t *testing.T) {
testutil.RegisterLeakDetection(t)
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))

Expand Down Expand Up @@ -204,6 +208,7 @@ func checkEtcdClientHealth(re *require.Assertions, client *clientv3.Client) {
}

func TestEtcdScaleInAndOut(t *testing.T) {
testutil.RegisterLeakDetection(t)
re := require.New(t)
// Start a etcd server.
servers, _, clean := NewTestEtcdCluster(t, 1)
Expand All @@ -230,6 +235,7 @@ func TestEtcdScaleInAndOut(t *testing.T) {
}

func TestRandomKillEtcd(t *testing.T) {
testutil.RegisterLeakDetection(t)
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))
// Start a etcd server.
Expand Down Expand Up @@ -260,6 +266,7 @@ func TestRandomKillEtcd(t *testing.T) {
}

func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {
testutil.RegisterLeakDetection(t)
re := require.New(t)
var err error
// Test with enable check.
Expand All @@ -285,9 +292,11 @@ func checkEtcdWithHangLeader(t *testing.T) error {
// Create a proxy to etcd1.
proxyAddr := tempurl.Alloc()
var enableDiscard atomic.Bool
go proxyWithDiscard(re, cfg1.LCUrls[0].String(), proxyAddr, &enableDiscard)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go proxyWithDiscard(ctx, re, cfg1.LCUrls[0].String(), proxyAddr, &enableDiscard)

// Create a etcd client with etcd1 as endpoint.
// Create an etcd client with etcd1 as endpoint.
urls, err := types.NewURLs([]string{proxyAddr})
re.NoError(err)
client1, err := CreateEtcdClient(nil, urls)
Expand All @@ -307,59 +316,79 @@ func checkEtcdWithHangLeader(t *testing.T) error {
return err
}

func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) {
func proxyWithDiscard(ctx context.Context, re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) {
server = strings.TrimPrefix(server, "http://")
proxy = strings.TrimPrefix(proxy, "http://")
l, err := net.Listen("tcp", proxy)
re.NoError(err)
defer l.Close()
for {
connect, err := l.Accept()
re.NoError(err)
go func(connect net.Conn) {
serverConnect, err := net.Dial("tcp", server)
re.NoError(err)
pipe(connect, serverConnect, enableDiscard)
}(connect)
type accepted struct {
conn net.Conn
err error
}
accept := make(chan accepted, 1)
go func() {
conn, err := l.Accept()
accept <- accepted{conn, err}
}()

select {
case <-ctx.Done():
return
case a := <-accept:
if a.err != nil {
return
}
go func(connect net.Conn) {
serverConnect, err := net.Dial("tcp", server)
re.NoError(err)
pipe(ctx, connect, serverConnect, enableDiscard)
}(a.conn)
}
}
}

func pipe(src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) {
func pipe(ctx context.Context, src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) {
errChan := make(chan error, 1)
go func() {
err := ioCopy(src, dst, enableDiscard)
err := ioCopy(ctx, src, dst, enableDiscard)
errChan <- err
}()
go func() {
err := ioCopy(dst, src, enableDiscard)
err := ioCopy(ctx, dst, src, enableDiscard)
errChan <- err
}()
<-errChan
dst.Close()
src.Close()
}

func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) {
func ioCopy(ctx context.Context, dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) {
buffer := make([]byte, 32*1024)
for {
if enableDiscard.Load() {
io.Copy(io.Discard, src)
}
readNum, errRead := src.Read(buffer)
if readNum > 0 {
writeNum, errWrite := dst.Write(buffer[:readNum])
if errWrite != nil {
return errWrite
select {
case <-ctx.Done():
return nil
default:
if enableDiscard.Load() {
io.Copy(io.Discard, src)
}
if readNum != writeNum {
return io.ErrShortWrite
readNum, errRead := src.Read(buffer)
if readNum > 0 {
writeNum, errWrite := dst.Write(buffer[:readNum])
if errWrite != nil {
return errWrite
}
if readNum != writeNum {
return io.ErrShortWrite
}
}
if errRead != nil {
return err
}
}
if errRead != nil {
err = errRead
break
}
}
return err
}

type loopWatcherTestSuite struct {
Expand All @@ -374,6 +403,7 @@ type loopWatcherTestSuite struct {
}

func TestLoopWatcherTestSuite(t *testing.T) {
testutil.RegisterLeakDetection(t)
suite.Run(t, new(loopWatcherTestSuite))
}

Expand Down
1 change: 1 addition & 0 deletions pkg/utils/etcdutil/health_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type testCase struct {

func check(re *require.Assertions, testCases []*testCase) {
checker := &healthChecker{}
defer checker.close()
lastEps := []string{}
for idx, tc := range testCases {
// Send the health probes to the channel.
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/etcdutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func genRandName() string {
return "test_etcd_" + strconv.FormatInt(time.Now().UnixNano()%10000, 10)
}

// NewTestEtcdCluster is used to create a etcd cluster for the unit test purpose.
// NewTestEtcdCluster is used to create an etcd cluster for the unit test purpose.
func NewTestEtcdCluster(t *testing.T, count int) (servers []*embed.Etcd, etcdClient *clientv3.Client, clean func()) {
re := require.New(t)
servers = make([]*embed.Etcd, 0, count)
Expand Down
Loading

0 comments on commit 30ddb17

Please sign in to comment.