Skip to content

Commit

Permalink
gcworker: export resolveLocks api for test (#18064)
Browse files Browse the repository at this point in the history
  • Loading branch information
youjiali1995 authored Jun 19, 2020
1 parent e581250 commit df6898d
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 11 deletions.
71 changes: 70 additions & 1 deletion server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/gcworker"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -1654,7 +1655,7 @@ type dbTableInfo struct {
SchemaVersion int64 `json:"schema_version"`
}

//ServeHTTP handles request of database information and table information by tableID.
// ServeHTTP handles request of database information and table information by tableID.
func (h dbTableHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
params := mux.Vars(req)
tableID := params[pTableID]
Expand Down Expand Up @@ -1696,3 +1697,71 @@ func (h dbTableHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
dbTblInfo.DBInfo = dbInfo
writeData(w, dbTblInfo)
}

// testHandler is the handler for tests. It's convenient to provide some APIs for integration tests.
type testHandler struct {
*tikvHandlerTool
gcIsRunning uint32
}

// ServeHTTP handles test related requests.
func (h *testHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
params := mux.Vars(req)
mod := strings.ToLower(params["mod"])
op := strings.ToLower(params["op"])

switch mod {
case "gc":
h.handleGC(op, w, req)
default:
writeError(w, errors.NotSupportedf("module(%s)", mod))
}
}

// Supported operations:
// * resolvelock?safepoint={uint64}&physical={bool}:
// * safepoint: resolve all locks whose timestamp is less than the safepoint.
// * physical: whether it uses physical(green GC) mode to scan locks. Default is true.
func (h *testHandler) handleGC(op string, w http.ResponseWriter, req *http.Request) {
if !atomic.CompareAndSwapUint32(&h.gcIsRunning, 0, 1) {
writeError(w, errors.New("GC is running"))
return
}
defer atomic.StoreUint32(&h.gcIsRunning, 0)

switch op {
case "resolvelock":
h.handleGCResolveLocks(w, req)
default:
writeError(w, errors.NotSupportedf("operation(%s)", op))
}
}

func (h *testHandler) handleGCResolveLocks(w http.ResponseWriter, req *http.Request) {
s := req.FormValue("safepoint")
safePoint, err := strconv.ParseUint(s, 10, 64)
if err != nil {
writeError(w, errors.Errorf("parse safePoint(%s) failed", s))
return
}
usePhysical := true
s = req.FormValue("physical")
if s != "" {
usePhysical, err = strconv.ParseBool(s)
if err != nil {
writeError(w, errors.Errorf("parse physical(%s) failed", s))
return
}
}

ctx := req.Context()
logutil.Logger(ctx).Info("start resolving locks", zap.Uint64("safePoint", safePoint), zap.Bool("physical", usePhysical))
physicalUsed, err := gcworker.RunResolveLocks(ctx, h.Store, h.RegionCache.PDClient(), safePoint, "testGCWorker", 3, usePhysical)
if err != nil {
writeError(w, errors.Annotate(err, "resolveLocks failed"))
} else {
writeData(w, map[string]interface{}{
"physicalUsed": physicalUsed,
})
}
}
49 changes: 47 additions & 2 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,13 +1191,58 @@ func (ts *HTTPHandlerTestSuite) TestFailpointHandler(c *C) {
ts.stopServer(c)

// enable failpoint integration and start server
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/integrateFailpoint", "return"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/enableTestAPI", "return"), IsNil)
ts.startServer(c)
resp, err = ts.fetchStatus("/fail/")
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusOK)
b, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
c.Assert(strings.Contains(string(b), "github.com/pingcap/tidb/server/integrateFailpoint=return"), IsTrue)
c.Assert(strings.Contains(string(b), "github.com/pingcap/tidb/server/enableTestAPI=return"), IsTrue)
c.Assert(resp.Body.Close(), IsNil)
}

func (ts *HTTPHandlerTestSuite) TestTestHandler(c *C) {
defer ts.stopServer(c)

// start server without enabling failpoint integration
ts.startServer(c)
resp, err := ts.fetchStatus("/test")
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusNotFound)
ts.stopServer(c)

// enable failpoint integration and start server
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/enableTestAPI", "return"), IsNil)
ts.startServer(c)

resp, err = ts.fetchStatus("/test/gc/gc")
c.Assert(err, IsNil)
resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusBadRequest)

resp, err = ts.fetchStatus("/test/gc/resolvelock")
c.Assert(err, IsNil)
resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusBadRequest)

resp, err = ts.fetchStatus("/test/gc/resolvelock?safepoint=a")
c.Assert(err, IsNil)
resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusBadRequest)

resp, err = ts.fetchStatus("/test/gc/resolvelock?physical=1")
c.Assert(err, IsNil)
resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusBadRequest)

resp, err = ts.fetchStatus("/test/gc/resolvelock?physical=true")
c.Assert(err, IsNil)
resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusBadRequest)

resp, err = ts.fetchStatus("/test/gc/resolvelock?safepoint=10000&physical=true")
c.Assert(err, IsNil)
resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
}
5 changes: 4 additions & 1 deletion server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,14 @@ func (s *Server) startHTTPServer() {
fetcher := sqlInfoFetcher{store: tikvHandlerTool.Store}
serverMux.HandleFunc("/debug/sub-optimal-plan", fetcher.zipInfoForSQL)

failpoint.Inject("integrateFailpoint", func() {
// failpoint is enabled only for tests so we can add some http APIs here for tests.
failpoint.Inject("enableTestAPI", func() {
serverMux.HandleFunc("/fail/", func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/fail")
new(failpoint.HttpHandler).ServeHTTP(w, r)
})

router.Handle("/test/{mod}/{op}", &testHandler{tikvHandlerTool, 0})
})

var (
Expand Down
25 changes: 18 additions & 7 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i
if err != nil {
return errors.Trace(err)
}
err = w.resolveLocks(ctx, safePoint, concurrency, usePhysical)
_, err = w.resolveLocks(ctx, safePoint, concurrency, usePhysical)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] resolve locks returns an error",
zap.String("uuid", w.uuid),
Expand Down Expand Up @@ -928,23 +928,23 @@ func (w *GCWorker) checkUsePhysicalScanLock() (bool, error) {
return false, nil
}

func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurrency int, usePhysical bool) error {
func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurrency int, usePhysical bool) (bool, error) {
if !usePhysical {
return w.legacyResolveLocks(ctx, safePoint, concurrency)
return false, w.legacyResolveLocks(ctx, safePoint, concurrency)
}

// First try resolve locks with physical scan
err := w.resolveLocksPhysical(ctx, safePoint)
if err == nil {
return nil
return true, nil
}

logutil.Logger(ctx).Error("[gc worker] resolve locks with physical scan failed, trying fallback to legacy resolve lock",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint),
zap.Error(err))

return w.legacyResolveLocks(ctx, safePoint, concurrency)
return false, w.legacyResolveLocks(ctx, safePoint, concurrency)
}

func (w *GCWorker) legacyResolveLocks(ctx context.Context, safePoint uint64, concurrency int) error {
Expand Down Expand Up @@ -1750,7 +1750,7 @@ func RunGCJob(ctx context.Context, s tikv.Storage, pd pd.Client, safePoint uint6
return errors.Trace(err)
}

err = gcWorker.resolveLocks(ctx, safePoint, concurrency, false)
_, err = gcWorker.resolveLocks(ctx, safePoint, concurrency, false)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1783,7 +1783,7 @@ func RunDistributedGCJob(ctx context.Context, s tikv.Storage, pd pd.Client, safe
return errors.Trace(err)
}

err = gcWorker.resolveLocks(ctx, safePoint, concurrency, false)
_, err = gcWorker.resolveLocks(ctx, safePoint, concurrency, false)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1803,6 +1803,17 @@ func RunDistributedGCJob(ctx context.Context, s tikv.Storage, pd pd.Client, safe
return nil
}

// RunResolveLocks resolves all locks before the safePoint and returns whether the physical scan mode is used.
// It is exported only for test, do not use it in the production environment.
func RunResolveLocks(ctx context.Context, s tikv.Storage, pd pd.Client, safePoint uint64, identifier string, concurrency int, usePhysical bool) (bool, error) {
gcWorker := &GCWorker{
store: s,
uuid: identifier,
pdClient: pd,
}
return gcWorker.resolveLocks(ctx, safePoint, concurrency, usePhysical)
}

// MockGCWorker is for test.
type MockGCWorker struct {
worker *GCWorker
Expand Down

0 comments on commit df6898d

Please sign in to comment.