Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: optimize mpp probe (#39932) #40104

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ type MPPDispatchRequest struct {
type MPPClient interface {
// ConstructMPPTasks schedules task for a plan fragment.
// TODO:: This interface will be refined after we support more executors.
<<<<<<< HEAD
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, map[string]time.Time, time.Duration) ([]MPPTaskMeta, error)
=======
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error)
>>>>>>> aeccf77637 (*: optimize mpp probe (#39932))

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, startTs uint64) Response
Expand Down
55 changes: 55 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TokenGauge)
prometheus.MustRegister(ConfigStatus)
prometheus.MustRegister(TiFlashQueryTotalCounter)
prometheus.MustRegister(TiFlashFailedMPPStoreState)
prometheus.MustRegister(SmallTxnWriteDuration)
prometheus.MustRegister(TxnWriteThroughput)
prometheus.MustRegister(LoadSysVarCacheCounter)
Expand All @@ -162,3 +163,57 @@ func RegisterMetrics() {
tikvmetrics.RegisterMetrics()
tikvmetrics.TiKVPanicCounter = PanicCounter // reset tidb metrics for tikv metrics
}
<<<<<<< HEAD
=======

var mode struct {
sync.Mutex
isSimplified bool
}

// ToggleSimplifiedMode is used to register/unregister the metrics that unused by grafana.
func ToggleSimplifiedMode(simplified bool) {
var unusedMetricsByGrafana = []prometheus.Collector{
StatementDeadlockDetectDuration,
ValidateReadTSFromPDCount,
LoadTableCacheDurationHistogram,
TxnWriteThroughput,
SmallTxnWriteDuration,
InfoCacheCounters,
ReadFromTableCacheCounter,
TiFlashQueryTotalCounter,
TiFlashFailedMPPStoreState,
CampaignOwnerCounter,
NonTransactionalDMLCount,
MemoryUsage,
TokenGauge,
tikvmetrics.TiKVRawkvSizeHistogram,
tikvmetrics.TiKVRawkvCmdHistogram,
tikvmetrics.TiKVReadThroughput,
tikvmetrics.TiKVSmallReadDuration,
tikvmetrics.TiKVBatchWaitOverLoad,
tikvmetrics.TiKVBatchClientRecycle,
tikvmetrics.TiKVRequestRetryTimesHistogram,
tikvmetrics.TiKVStatusDuration,
}
mode.Lock()
defer mode.Unlock()
if mode.isSimplified == simplified {
return
}
mode.isSimplified = simplified
if simplified {
for _, m := range unusedMetricsByGrafana {
prometheus.Unregister(m)
}
} else {
for _, m := range unusedMetricsByGrafana {
err := prometheus.Register(m)
if err != nil {
logutil.BgLogger().Error("cannot register metrics", zap.Error(err))
break
}
}
}
}
>>>>>>> aeccf77637 (*: optimize mpp probe (#39932))
53 changes: 53 additions & 0 deletions metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,59 @@ var (
Name: "tiflash_query_total",
Help: "Counter of TiFlash queries.",
}, []string{LblType, LblResult})
<<<<<<< HEAD
=======

TiFlashFailedMPPStoreState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "tiflash_failed_store",
Help: "Statues of failed tiflash mpp store,-1 means detector heartbeat,0 means reachable,1 means abnormal.",
}, []string{LblAddress})

PDAPIExecutionHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "pd_api_execution_duration_seconds",
Help: "Bucketed histogram of all pd api execution time (s)",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 524s
}, []string{LblType})

PDAPIRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "pd_api_request_total",
Help: "Counter of the pd http api requests",
}, []string{LblType, LblResult})

CPUProfileCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "cpu_profile_total",
Help: "Counter of cpu profiling",
})

LoadTableCacheDurationHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "load_table_cache_seconds",
Help: "Duration (us) for loading table cache.",
Buckets: prometheus.ExponentialBuckets(1, 2, 30), // 1us ~ 528s
})

RCCheckTSWriteConfilictCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "rc_check_ts_conflict_total",
Help: "Counter of WriteConflict caused by RCCheckTS.",
}, []string{LblType})
>>>>>>> aeccf77637 (*: optimize mpp probe (#39932))
)

// ExecuteErrorToLabel converts an execute error to label.
Expand Down
2 changes: 1 addition & 1 deletion planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (e *mppTaskGenerator) constructMPPTasksForSinglePartitionTable(ctx context.
logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err))
ttl = 30 * time.Second
}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, e.ctx.GetSessionVars().MPPStoreLastFailTime, ttl)
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, ttl)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
85 changes: 85 additions & 0 deletions sessionctx/sessionstates/session_states.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sessionstates

import (
"github.com/pingcap/tidb/errno"
ptypes "github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/dbterror"
)

// SessionStateType is the type of session states.
type SessionStateType int

var (
// ErrCannotMigrateSession indicates the session cannot be migrated.
ErrCannotMigrateSession = dbterror.ClassSession.NewStd(errno.ErrCannotMigrateSession)
)

// These enums represents the types of session state handlers.
const (
// StatePrepareStmt represents prepared statements.
StatePrepareStmt SessionStateType = iota
// StateBinding represents session SQL bindings.
StateBinding
)

// PreparedStmtInfo contains the information about prepared statements, both text and binary protocols.
type PreparedStmtInfo struct {
Name string `json:"name,omitempty"`
StmtText string `json:"text"`
StmtDB string `json:"db,omitempty"`
ParamTypes []byte `json:"types,omitempty"`
}

// QueryInfo represents the information of last executed query. It's used to expose information for test purpose.
type QueryInfo struct {
TxnScope string `json:"txn_scope"`
StartTS uint64 `json:"start_ts"`
ForUpdateTS uint64 `json:"for_update_ts"`
ErrMsg string `json:"error,omitempty"`
}

// LastDDLInfo represents the information of last DDL. It's used to expose information for test purpose.
type LastDDLInfo struct {
Query string `json:"query"`
SeqNum uint64 `json:"seq_num"`
}

// SessionStates contains all the states in the session that should be migrated when the session
// is migrated to another server. It is shown by `show session_states` and recovered by `set session_states`.
type SessionStates struct {
UserVars map[string]*types.Datum `json:"user-var-values,omitempty"`
UserVarTypes map[string]*ptypes.FieldType `json:"user-var-types,omitempty"`
SystemVars map[string]string `json:"sys-vars,omitempty"`
PreparedStmts map[uint32]*PreparedStmtInfo `json:"prepared-stmts,omitempty"`
PreparedStmtID uint32 `json:"prepared-stmt-id,omitempty"`
Status uint16 `json:"status,omitempty"`
CurrentDB string `json:"current-db,omitempty"`
LastTxnInfo string `json:"txn-info,omitempty"`
LastQueryInfo *QueryInfo `json:"query-info,omitempty"`
LastDDLInfo *LastDDLInfo `json:"ddl-info,omitempty"`
LastFoundRows uint64 `json:"found-rows,omitempty"`
FoundInPlanCache bool `json:"in-plan-cache,omitempty"`
FoundInBinding bool `json:"in-binding,omitempty"`
SequenceLatestValues map[int64]int64 `json:"seq-values,omitempty"`
LastAffectedRows int64 `json:"affected-rows,omitempty"`
LastInsertID uint64 `json:"last-insert-id,omitempty"`
Warnings []stmtctx.SQLWarn `json:"warnings,omitempty"`
// Define it as string to avoid cycle import.
Bindings string `json:"bindings,omitempty"`
}
Loading