Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#39932
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hackersean authored and ti-chi-bot committed Dec 22, 2022
1 parent 1164063 commit 97354fb
Show file tree
Hide file tree
Showing 17 changed files with 571 additions and 85 deletions.
3 changes: 1 addition & 2 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package kv

import (
"context"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/mpp"
Expand Down Expand Up @@ -81,7 +80,7 @@ type MPPDispatchRequest struct {
type MPPClient interface {
// ConstructMPPTasks schedules task for a plan fragment.
// TODO:: This interface will be refined after we support more executors.
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, *sync.Map, time.Duration) ([]MPPTaskMeta, error)
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response
Expand Down
2 changes: 2 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TokenGauge)
prometheus.MustRegister(ConfigStatus)
prometheus.MustRegister(TiFlashQueryTotalCounter)
prometheus.MustRegister(TiFlashFailedMPPStoreState)
prometheus.MustRegister(SmallTxnWriteDuration)
prometheus.MustRegister(TxnWriteThroughput)
prometheus.MustRegister(LoadSysVarCacheCounter)
Expand Down Expand Up @@ -236,6 +237,7 @@ func ToggleSimplifiedMode(simplified bool) {
InfoCacheCounters,
ReadFromTableCacheCounter,
TiFlashQueryTotalCounter,
TiFlashFailedMPPStoreState,
CampaignOwnerCounter,
NonTransactionalDMLCount,
MemoryUsage,
Expand Down
8 changes: 8 additions & 0 deletions metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ var (
Help: "Counter of TiFlash queries.",
}, []string{LblType, LblResult})

TiFlashFailedMPPStoreState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "tiflash_failed_store",
Help: "Statues of failed tiflash mpp store,-1 means detector heartbeat,0 means reachable,1 means abnormal.",
}, []string{LblAddress})

PDAPIExecutionHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Expand Down
2 changes: 1 addition & 1 deletion planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic
logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err))
ttl = 30 * time.Second
}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, e.ctx.GetSessionVars().MPPStoreLastFailTime, ttl)
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, ttl)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
3 changes: 0 additions & 3 deletions sessionctx/sessionstates/session_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package sessionstates

import (
"time"

"github.com/pingcap/tidb/errno"
ptypes "github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand Down Expand Up @@ -79,7 +77,6 @@ type SessionStates struct {
FoundInPlanCache bool `json:"in-plan-cache,omitempty"`
FoundInBinding bool `json:"in-binding,omitempty"`
SequenceLatestValues map[int64]int64 `json:"seq-values,omitempty"`
MPPStoreLastFailTime map[string]time.Time `json:"store-fail-time,omitempty"`
LastAffectedRows int64 `json:"affected-rows,omitempty"`
LastInsertID uint64 `json:"last-insert-id,omitempty"`
Warnings []stmtctx.SQLWarn `json:"warnings,omitempty"`
Expand Down
19 changes: 0 additions & 19 deletions sessionctx/sessionstates/session_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -378,23 +376,6 @@ func TestSessionCtx(t *testing.T) {
tk.MustQuery("select nextval(test.s)").Check(testkit.Rows("2"))
},
},
{
// check MPPStoreLastFailTime
setFunc: func(tk *testkit.TestKit) any {
m := sync.Map{}
m.Store("store1", time.Now())
tk.Session().GetSessionVars().MPPStoreLastFailTime = &m
return tk.Session().GetSessionVars().MPPStoreLastFailTime
},
checkFunc: func(tk *testkit.TestKit, param any) {
failTime := tk.Session().GetSessionVars().MPPStoreLastFailTime
tm, ok := failTime.Load("store1")
require.True(t, ok)
v, ok := (param.(*sync.Map)).Load("store1")
require.True(t, ok)
require.True(t, tm.(time.Time).Equal(v.(time.Time)))
},
},
{
// check FoundInPlanCache
setFunc: func(tk *testkit.TestKit) any {
Expand Down
13 changes: 0 additions & 13 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,9 +1170,6 @@ type SessionVars struct {
// TemporaryTableData stores committed kv values for temporary table for current session.
TemporaryTableData TemporaryTableData

// MPPStoreLastFailTime records the lastest fail time that a TiFlash store failed. It maps store address(string) to fail time(time.Time).
MPPStoreLastFailTime *sync.Map

// MPPStoreFailTTL indicates the duration that protect TiDB from sending task to a new recovered TiFlash.
MPPStoreFailTTL string

Expand Down Expand Up @@ -1660,7 +1657,6 @@ func NewSessionVars(hctx HookContext) *SessionVars {
AllowFallbackToTiKV: make(map[kv.StoreType]struct{}),
CTEMaxRecursionDepth: DefCTEMaxRecursionDepth,
TMPTableSize: DefTiDBTmpTableMaxSize,
MPPStoreLastFailTime: new(sync.Map),
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
Rng: mathutil.NewWithTime(),
StatsLoadSyncWait: StatsLoadSyncWait.Load(),
Expand Down Expand Up @@ -2287,12 +2283,6 @@ func (s *SessionVars) EncodeSessionStates(ctx context.Context, sessionStates *se
}
sessionStates.LastFoundRows = s.LastFoundRows
sessionStates.SequenceLatestValues = s.SequenceState.GetAllStates()
sessionStates.MPPStoreLastFailTime = make(map[string]time.Time, 0)
s.MPPStoreLastFailTime.Range(
func(key, value interface{}) bool {
sessionStates.MPPStoreLastFailTime[key.(string)] = value.(time.Time)
return true
})
sessionStates.FoundInPlanCache = s.PrevFoundInPlanCache
sessionStates.FoundInBinding = s.PrevFoundInBinding

Expand Down Expand Up @@ -2328,9 +2318,6 @@ func (s *SessionVars) DecodeSessionStates(ctx context.Context, sessionStates *se
}
s.LastFoundRows = sessionStates.LastFoundRows
s.SequenceState.SetAllStates(sessionStates.SequenceLatestValues)
for k, v := range sessionStates.MPPStoreLastFailTime {
s.MPPStoreLastFailTime.Store(k, v)
}
s.FoundInPlanCache = sessionStates.FoundInPlanCache
s.FoundInBinding = sessionStates.FoundInBinding

Expand Down
9 changes: 9 additions & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"coprocessor_cache.go",
"key_ranges.go",
"mpp.go",
"mpp_probe.go",
"region_cache.go",
"store.go",
],
Expand Down Expand Up @@ -65,6 +66,7 @@ go_test(
"coprocessor_test.go",
"key_ranges_test.go",
"main_test.go",
"mpp_probe_test.go",
],
embed = [":copr"],
flaky = True,
Expand All @@ -74,11 +76,18 @@ go_test(
"//store/driver/backoff",
"//testkit/testsetup",
"//util/paging",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
<<<<<<< HEAD
=======
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_stathat_consistent//:consistent",
>>>>>>> aeccf77637 (*: optimize mpp probe (#39932))
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@org_uber_go_goleak//:goleak",
],
)
Loading

0 comments on commit 97354fb

Please sign in to comment.