Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

observer: read the server version from HTTP status #510

Merged
merged 4 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 39 additions & 13 deletions pkg/balance/observer/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package observer

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"time"
Expand All @@ -27,6 +29,12 @@ const (
statusPathSuffix = "/status"
)

type backendHttpStatusRespBody struct {
Connections int `json:"connections"`
Version string `json:"version"`
GitHash string `json:"git_hash"`
}

type DefaultHealthCheck struct {
cfg *config.HealthCheck
logger *zap.Logger
Expand Down Expand Up @@ -67,7 +75,6 @@ func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *Bac

func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, addr string, bh *BackendHealth) {
// Also dial the SQL port just in case that the SQL port hangs.
var serverVersion string
err := dhc.connectWithRetry(ctx, func() error {
startTime := monotime.Now()
conn, err := net.DialTimeout("tcp", addr, dhc.cfg.DialTimeout)
Expand All @@ -78,11 +85,9 @@ func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, addr string, bh
if err = conn.SetReadDeadline(time.Now().Add(dhc.cfg.DialTimeout)); err != nil {
return err
}
serverVersion, err = pnet.ReadServerVersion(conn)
if ignoredErr := conn.Close(); ignoredErr != nil && !pnet.IsDisconnectError(ignoredErr) {
dhc.logger.Warn("close connection in health check failed", zap.Error(ignoredErr))
}
bh.ServerVersion = serverVersion
return err
})
if err != nil {
Expand All @@ -91,6 +96,36 @@ func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, addr string, bh
}
}

func (dhc *DefaultHealthCheck) backendStatusCheck(httpCli *http.Client, url string, bh *BackendHealth) error {
resp, err := httpCli.Get(url)
if err == nil {

if resp.StatusCode != http.StatusOK {
err = backoff.Permanent(errors.Errorf("http status %d", resp.StatusCode))
}

body, err := io.ReadAll(resp.Body)
if err != nil {
dhc.logger.Error("read response body in healthy check failed ", zap.Error(err))
return err
}

var respBody backendHttpStatusRespBody
err = json.Unmarshal(body, &respBody)
if err != nil {
dhc.logger.Error("unmarshal body in healthy check failed", zap.String("resp body", string(body)), zap.Error(err))
return err
}

if ignoredErr := resp.Body.Close(); ignoredErr != nil {
dhc.logger.Warn("close http response in health check failed", zap.Error(ignoredErr))
}

bh.ServerVersion = respBody.Version
}
return err
}

// When a backend gracefully shut down, the status port returns 500 but the SQL port still accepts
// new connections.
func (dhc *DefaultHealthCheck) checkStatusPort(ctx context.Context, info *BackendInfo, bh *BackendHealth) {
Expand All @@ -109,16 +144,7 @@ func (dhc *DefaultHealthCheck) checkStatusPort(ctx context.Context, info *Backen
httpCli.Timeout = dhc.cfg.DialTimeout
url := fmt.Sprintf("%s://%s:%d%s", schema, info.IP, info.StatusPort, statusPathSuffix)
err := dhc.connectWithRetry(ctx, func() error {
resp, err := httpCli.Get(url)
if err == nil {
if resp.StatusCode != http.StatusOK {
err = backoff.Permanent(errors.Errorf("http status %d", resp.StatusCode))
}
if ignoredErr := resp.Body.Close(); ignoredErr != nil {
dhc.logger.Warn("close http response in health check failed", zap.Error(ignoredErr))
}
}
return err
return dhc.backendStatusCheck(&httpCli, url, bh)
})
if err != nil {
bh.Status = StatusCannotConnect
Expand Down
46 changes: 32 additions & 14 deletions pkg/balance/observer/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,43 @@ package observer

import (
"context"
"encoding/json"
"net"
"net/http"
"strings"
"testing"
"time"

"github.com/pingcap/tiproxy/lib/util/logger"
"github.com/pingcap/tiproxy/lib/util/waitgroup"
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
"github.com/pingcap/tiproxy/pkg/testkit"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func TestReadServerVersion(t *testing.T) {
lg, _ := logger.CreateLoggerForTest(t)
hc := NewDefaultHealthCheck(nil, newHealthCheckConfigForTest(), lg)
backend, info := newBackendServer(t)
backend.serverVersion.Store("1.0")
backend.setServerVersion("1.0")
//backend.serverVersion.Store("1.0")
health := hc.Check(context.Background(), backend.sqlAddr, info)
require.Equal(t, "1.0", health.ServerVersion)
backend.stopSQLServer()
backend.serverVersion.Store("2.0")
//backend.serverVersion.Store("2.0")
backend.setServerVersion("2.0")
backend.startSQLServer()
health = hc.Check(context.Background(), backend.sqlAddr, info)
require.Equal(t, "2.0", health.ServerVersion)
backend.stopSQLServer()

//test for respBody not ok
backend.mockHttpHandler.setHTTPRespBody("")
backend.startSQLServer()
health = hc.Check(context.Background(), backend.sqlAddr, info)
require.Equal(t, StatusCannotConnect, health.Status)
require.NotNil(t, health.PingErr)
require.Equal(t, true, strings.Contains(health.PingErr.Error(), "unexpected end of JSON input"))

backend.close()
}

Expand All @@ -39,6 +51,7 @@ func TestHealthCheck(t *testing.T) {
cfg := newHealthCheckConfigForTest()
hc := NewDefaultHealthCheck(nil, cfg, lg)
backend, info := newBackendServer(t)
backend.setServerVersion("1.0")
health := hc.Check(context.Background(), backend.sqlAddr, info)
require.Equal(t, StatusHealthy, health.Status)

Expand Down Expand Up @@ -67,12 +80,11 @@ func TestHealthCheck(t *testing.T) {
}

type backendServer struct {
t *testing.T
sqlListener net.Listener
sqlAddr string
statusServer *http.Server
statusAddr string
serverVersion atomic.String
t *testing.T
sqlListener net.Listener
sqlAddr string
statusServer *http.Server
statusAddr string
*mockHttpHandler
wg waitgroup.WaitGroup
ip string
Expand All @@ -85,13 +97,22 @@ func newBackendServer(t *testing.T) (*backendServer, *BackendInfo) {
}
backend.startHTTPServer()
backend.setHTTPResp(true)
backend.setHTTPRespBody("")
backend.startSQLServer()
return backend, &BackendInfo{
IP: backend.ip,
StatusPort: backend.statusPort,
}
}

func (srv *backendServer) setServerVersion(version string) {
resp := backendHttpStatusRespBody{
Connections: 0,
Version: version,
GitHash: "",
}
body, _ := json.Marshal(resp)
srv.mockHttpHandler.setHTTPRespBody(string(body))
}
func (srv *backendServer) startHTTPServer() {
if srv.mockHttpHandler == nil {
srv.mockHttpHandler = &mockHttpHandler{
Expand Down Expand Up @@ -121,9 +142,6 @@ func (srv *backendServer) startSQLServer() {
// listener is closed
break
}
if err = pnet.WriteServerVersion(conn, srv.serverVersion.Load()); err != nil {
break
}
_ = conn.Close()
}
})
Expand Down
14 changes: 10 additions & 4 deletions pkg/balance/observer/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"context"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/tiproxy/pkg/manager/infosync"
"go.uber.org/atomic"
)

type mockTpFetcher struct {
Expand Down Expand Up @@ -93,15 +93,20 @@ func (mhc *mockHealthCheck) removeBackend(addr string) {
}

type mockHttpHandler struct {
t *testing.T
httpOK atomic.Bool
wait atomic.Int64
t *testing.T
httpOK atomic.Bool
respBody atomic.String
wait atomic.Int64
}

func (handler *mockHttpHandler) setHTTPResp(succeed bool) {
handler.httpOK.Store(succeed)
}

func (handler *mockHttpHandler) setHTTPRespBody(body string) {
handler.respBody.Store(body)
}

func (handler *mockHttpHandler) setHTTPWait(wait time.Duration) {
handler.wait.Store(int64(wait))
}
Expand All @@ -113,6 +118,7 @@ func (handler *mockHttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
}
if handler.httpOK.Load() {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(handler.respBody.Load()))
} else {
w.WriteHeader(http.StatusInternalServerError)
}
Expand Down