Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jan 30, 2024
1 parent 1c54865 commit 580a77d
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 46 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,4 @@ replace google.golang.org/grpc v1.59.0 => google.golang.org/grpc v1.26.0
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
replace github.com/pingcap/tidb-dashboard => github.com/HuSharp/tidb-dashboard v0.0.0-20240130065036-5d37687b904c
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/AlekSi/gocov-xml v1.0.0 h1:4QctJBgXEkbzeKz6PJy6bt3JSPNSN4I2mITYW+eKUo
github.com/AlekSi/gocov-xml v1.0.0/go.mod h1:J0qYeZ6tDg4oZubW9mAAgxlqw39PDfoEkzB3HXSbEuA=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/HuSharp/tidb-dashboard v0.0.0-20240130065036-5d37687b904c h1:0w4RVGS3RFAKYyfhf8UpaPQz9YFG9VxEeCQ+n4ezypc=
github.com/HuSharp/tidb-dashboard v0.0.0-20240130065036-5d37687b904c/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
Expand Down Expand Up @@ -440,8 +442,6 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20240111062855-41f7c8011953 h1:vY/bY5vkSvvuXB1030AUmy0LFhuEA53ryVdF/bTbFXU=
github.com/pingcap/tidb-dashboard v0.0.0-20240111062855-41f7c8011953/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
3 changes: 2 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func CheckClusterID(localClusterID types.ID, um types.URLsMap, tlsConfig *tls.Co

for _, u := range peerURLs {
trp := &http.Transport{
TLSClientConfig: tlsConfig,
TLSClientConfig: tlsConfig,
DisableKeepAlives: true,
}
remoteCluster, gerr := etcdserver.GetClusterFromRemotePeers(nil, []string{u}, trp)
trp.CloseIdleConnections()
Expand Down
81 changes: 52 additions & 29 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {
err = checkEtcdWithHangLeader(t)
re.Error(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick"))
time.Sleep(5 * time.Second)
}

func checkEtcdWithHangLeader(t *testing.T) error {
Expand All @@ -285,9 +286,11 @@ func checkEtcdWithHangLeader(t *testing.T) error {
// Create a proxy to etcd1.
proxyAddr := tempurl.Alloc()
var enableDiscard atomic.Bool
go proxyWithDiscard(re, cfg1.LCUrls[0].String(), proxyAddr, &enableDiscard)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go proxyWithDiscard(ctx, re, cfg1.LCUrls[0].String(), proxyAddr, &enableDiscard)

// Create a etcd client with etcd1 as endpoint.
// Create an etcd client with etcd1 as endpoint.
urls, err := types.NewURLs([]string{proxyAddr})
re.NoError(err)
client1, err := CreateEtcdClient(nil, urls)
Expand All @@ -307,59 +310,79 @@ func checkEtcdWithHangLeader(t *testing.T) error {
return err
}

func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) {
func proxyWithDiscard(ctx context.Context, re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) {
server = strings.TrimPrefix(server, "http://")
proxy = strings.TrimPrefix(proxy, "http://")
l, err := net.Listen("tcp", proxy)
re.NoError(err)
defer l.Close()
for {
connect, err := l.Accept()
re.NoError(err)
go func(connect net.Conn) {
serverConnect, err := net.Dial("tcp", server)
re.NoError(err)
pipe(connect, serverConnect, enableDiscard)
}(connect)
type accepted struct {
conn net.Conn
err error
}
accept := make(chan accepted, 1)
go func() {
conn, err := l.Accept()
accept <- accepted{conn, err}
}()

select {
case <-ctx.Done():
return
case a := <-accept:
if a.err != nil {
return
}
go func(connect net.Conn) {
serverConnect, err := net.Dial("tcp", server)
re.NoError(err)
pipe(ctx, connect, serverConnect, enableDiscard)
}(a.conn)
}
}
}

func pipe(src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) {
func pipe(ctx context.Context, src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) {
errChan := make(chan error, 1)
go func() {
err := ioCopy(src, dst, enableDiscard)
err := ioCopy(ctx, src, dst, enableDiscard)
errChan <- err
}()
go func() {
err := ioCopy(dst, src, enableDiscard)
err := ioCopy(ctx, dst, src, enableDiscard)
errChan <- err
}()
<-errChan
dst.Close()
src.Close()
}

func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) {
func ioCopy(ctx context.Context, dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) {
buffer := make([]byte, 32*1024)
for {
if enableDiscard.Load() {
io.Copy(io.Discard, src)
}
readNum, errRead := src.Read(buffer)
if readNum > 0 {
writeNum, errWrite := dst.Write(buffer[:readNum])
if errWrite != nil {
return errWrite
select {
case <-ctx.Done():
return nil
default:
if enableDiscard.Load() {
io.Copy(io.Discard, src)
}
if readNum != writeNum {
return io.ErrShortWrite
readNum, errRead := src.Read(buffer)
if readNum > 0 {
writeNum, errWrite := dst.Write(buffer[:readNum])
if errWrite != nil {
return errWrite
}
if readNum != writeNum {
return io.ErrShortWrite
}
}
if errRead != nil {
return err
}
}
if errRead != nil {
err = errRead
break
}
}
return err
}

type loopWatcherTestSuite struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/utils/etcdutil/health_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func check(re *require.Assertions, testCases []*testCase) {
re.Equal(tc.expectedPickedEps, pickedEps, "case %d", idx)
lastEps = pickedEps
}
checker.close()
}

// Test the endpoint picking and evicting logic.
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/etcdutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func genRandName() string {
return "test_etcd_" + strconv.FormatInt(time.Now().UnixNano()%10000, 10)
}

// NewTestEtcdCluster is used to create a etcd cluster for the unit test purpose.
// NewTestEtcdCluster is used to create an etcd cluster for the unit test purpose.
func NewTestEtcdCluster(t *testing.T, count int) (servers []*embed.Etcd, etcdClient *clientv3.Client, clean func()) {
re := require.New(t)
servers = make([]*embed.Etcd, 0, count)
Expand Down
5 changes: 0 additions & 5 deletions pkg/utils/testutil/leak.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,4 @@ var LeakOptions = []goleak.Option{
goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).resetTransport"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("sync.runtime_notifyListWait"),
// TODO: remove the below options once we fixed the http connection leak problems
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"),
// natefinch/lumberjack#56, It's a goroutine leak bug. Another ignore option PR https://github.com/pingcap/tidb/pull/27405/
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
}
2 changes: 1 addition & 1 deletion scripts/ci-subtask.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

# ./ci-subtask.sh <TOTAL_TASK_N> <TASK_INDEX>

Expand Down
23 changes: 18 additions & 5 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type leaderServerTestSuite struct {
cancel context.CancelFunc
svrs map[string]*Server
leaderPath string

httpClient *http.Client
}

func TestLeaderServerTestSuite(t *testing.T) {
Expand Down Expand Up @@ -78,6 +80,11 @@ func (suite *leaderServerTestSuite) SetupSuite() {
suite.svrs[svr.GetAddr()] = svr
suite.leaderPath = svr.GetMember().GetLeaderPath()
}
suite.httpClient = &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
},
}
}

func (suite *leaderServerTestSuite) TearDownSuite() {
Expand All @@ -86,6 +93,7 @@ func (suite *leaderServerTestSuite) TearDownSuite() {
svr.Close()
testutil.CleanServer(svr.cfg.DataDir)
}
suite.httpClient.CloseIdleConnections()
}

func (suite *leaderServerTestSuite) newTestServersWithCfgs(
Expand Down Expand Up @@ -160,8 +168,13 @@ func (suite *leaderServerTestSuite) TestCheckClusterID() {
}

// Start another cluster.
_, cleanB := suite.newTestServersWithCfgs(ctx, []*config.Config{cfgB}, re)
defer cleanB()
servers, cleanB := suite.newTestServersWithCfgs(ctx, []*config.Config{cfgB}, re)
defer func() {
cleanB()
for _, svr := range servers {
svr.Close()
}
}()

// Start previous cluster, expect an error.
cfgA.InitialCluster = originInitial
Expand Down Expand Up @@ -229,7 +242,7 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderForwarded() {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/pd/apis/mock/v1/hello", svr.GetAddr()), http.NoBody)
re.NoError(err)
req.Header.Add(apiutil.XForwardedForHeader, "127.0.0.2")
resp, err := http.DefaultClient.Do(req)
resp, err := suite.httpClient.Do(req)
re.NoError(err)
re.Equal(http.StatusOK, resp.StatusCode)
defer resp.Body.Close()
Expand Down Expand Up @@ -260,7 +273,7 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderXReal() {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/pd/apis/mock/v1/hello", svr.GetAddr()), http.NoBody)
re.NoError(err)
req.Header.Add(apiutil.XRealIPHeader, "127.0.0.2")
resp, err := http.DefaultClient.Do(req)
resp, err := suite.httpClient.Do(req)
re.NoError(err)
re.Equal(http.StatusOK, resp.StatusCode)
defer resp.Body.Close()
Expand Down Expand Up @@ -292,7 +305,7 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderBoth() {
re.NoError(err)
req.Header.Add(apiutil.XForwardedForHeader, "127.0.0.2")
req.Header.Add(apiutil.XRealIPHeader, "127.0.0.3")
resp, err := http.DefaultClient.Do(req)
resp, err := suite.httpClient.Do(req)
re.NoError(err)
re.Equal(http.StatusOK, resp.StatusCode)
defer resp.Body.Close()
Expand Down
5 changes: 5 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,11 @@ func (c *TestCluster) Destroy() {
if err != nil {
log.Error("failed to destroy the cluster:", errs.ZapError(err))
}
// related https://github.com/etcd-io/etcd/blob/ea035471ce5a1ed940a4da0c62f81bff3d9cc9ff/server/config/config.go#L330
// https://github.com/etcd-io/etcd/blob/0b81fdf4186f1e6f9a3ee3c696289d53c899bbac/client/pkg/transport/timeout_conn.go#L37
// go.etcd.io/etcd/pkg/transport.timeoutConn.Read({{0x106cf7120?, 0x140016b81c0?}, 0x14001141c78?, 0x1041334cc?}, {0x1400159a000?, 0x104133074?, 0x1400039b068?})
// go/pkg/mod/go.etcd.io/etcd@v0.5.0-alpha.5.0.20220915004622-85b640cee793/pkg/transport/timeout_conn.go:43 +0xa8
time.Sleep(2 * time.Second)
}
if c.schedulingCluster != nil {
c.schedulingCluster.Destroy()
Expand Down
7 changes: 5 additions & 2 deletions tests/dashboard/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (suite *dashboardTestSuite) SetupSuite() {
// unclosed.
return http.ErrUseLastResponse
},
Transport: &http.Transport{
DisableKeepAlives: true,
},
}
}

Expand Down Expand Up @@ -139,7 +142,7 @@ func (suite *dashboardTestSuite) testDashboard(re *require.Assertions, internalP
// auto select node
dashboardAddress1 := suite.checkServiceIsStarted(re, internalProxy, servers, leader)

// pd-ctl set another addr
// set another addr
var dashboardAddress2 string
for _, srv := range servers {
if srv.GetAddr() != dashboardAddress1 {
Expand All @@ -160,7 +163,7 @@ func (suite *dashboardTestSuite) testDashboard(re *require.Assertions, internalP
suite.checkServiceIsStarted(re, internalProxy, servers, leader)
re.Equal(dashboardAddress2, leader.GetServer().GetPersistOptions().GetDashboardAddress())

// pd-ctl set stop
// set stop
input = map[string]interface{}{
"dashboard-address": "none",
}
Expand Down
1 change: 1 addition & 0 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestMemberDelete(t *testing.T) {
}

httpClient := &http.Client{Timeout: 15 * time.Second}
defer httpClient.CloseIdleConnections()
for _, table := range tables {
t.Log(time.Now(), "try to delete:", table.path)
testutil.Eventually(re, func() bool {
Expand Down

0 comments on commit 580a77d

Please sign in to comment.