Skip to content

Commit

Permalink
telemetry: add transaction usage info (#23470)
Browse files Browse the repository at this point in the history
  • Loading branch information
youjiali1995 authored Mar 24, 2021
1 parent 86c8bf0 commit 906828b
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 15 deletions.
3 changes: 3 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
c.mu.RUnlock()
if !committed && !undetermined {
c.cleanup(ctx)
metrics.TwoPCTxnCounterError.Inc()
} else {
metrics.TwoPCTxnCounterOk.Inc()
}
c.txn.commitTS = c.commitTS
if binlogSkipped {
Expand Down
54 changes: 53 additions & 1 deletion store/tikv/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

package metrics

import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

// Client metrics.
var (
Expand Down Expand Up @@ -48,6 +51,7 @@ var (
TiKVPessimisticLockKeysDuration prometheus.Histogram
TiKVTTLLifeTimeReachCounter prometheus.Counter
TiKVNoAvailableConnectionCounter prometheus.Counter
TiKVTwoPCTxnCounter *prometheus.CounterVec
TiKVAsyncCommitTxnCounter *prometheus.CounterVec
TiKVOnePCTxnCounter *prometheus.CounterVec
TiKVStoreLimitErrorCounter *prometheus.CounterVec
Expand Down Expand Up @@ -337,6 +341,14 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of no available batch client.",
})

TiKVTwoPCTxnCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "commit_txn_counter",
Help: "Counter of 2PC transactions.",
}, []string{LblType})

TiKVAsyncCommitTxnCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -440,6 +452,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVPessimisticLockKeysDuration)
prometheus.MustRegister(TiKVTTLLifeTimeReachCounter)
prometheus.MustRegister(TiKVNoAvailableConnectionCounter)
prometheus.MustRegister(TiKVTwoPCTxnCounter)
prometheus.MustRegister(TiKVAsyncCommitTxnCounter)
prometheus.MustRegister(TiKVOnePCTxnCounter)
prometheus.MustRegister(TiKVStoreLimitErrorCounter)
Expand All @@ -448,3 +461,42 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVForwardRequestCounter)
prometheus.MustRegister(TiKVTSFutureWaitDuration)
}

// readCounter reads the value of a prometheus.Counter.
// Returns -1 when failing to read the value.
func readCounter(m prometheus.Counter) int64 {
// Actually, it's not recommended to read the value of prometheus metric types directly:
// https://github.com/prometheus/client_golang/issues/486#issuecomment-433345239
pb := &dto.Metric{}
// It's impossible to return an error though.
if err := m.Write(pb); err != nil {
return -1
}
return int64(pb.GetCounter().GetValue())
}

// TxnCommitCounter is the counter of transactions committed with
// different protocols, i.e. 2PC, async-commit, 1PC.
type TxnCommitCounter struct {
TwoPC int64 `json:"twoPC"`
AsyncCommit int64 `json:"asyncCommit"`
OnePC int64 `json:"onePC"`
}

// Sub returns the difference of two counters.
func (c TxnCommitCounter) Sub(rhs TxnCommitCounter) TxnCommitCounter {
new := TxnCommitCounter{}
new.TwoPC = c.TwoPC - rhs.TwoPC
new.AsyncCommit = c.AsyncCommit - rhs.AsyncCommit
new.OnePC = c.OnePC - rhs.OnePC
return new
}

// GetTxnCommitCounter gets the TxnCommitCounter.
func GetTxnCommitCounter() TxnCommitCounter {
return TxnCommitCounter{
TwoPC: readCounter(TwoPCTxnCounterOk),
AsyncCommit: readCounter(AsyncCommitTxnCounterOk),
OnePC: readCounter(OnePCTxnCounterOk),
}
}
6 changes: 6 additions & 0 deletions store/tikv/metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ var (
SecondaryLockCleanupFailureCounterCommit prometheus.Counter
SecondaryLockCleanupFailureCounterRollback prometheus.Counter

TwoPCTxnCounterOk prometheus.Counter
TwoPCTxnCounterError prometheus.Counter

AsyncCommitTxnCounterOk prometheus.Counter
AsyncCommitTxnCounterError prometheus.Counter

Expand Down Expand Up @@ -166,6 +169,9 @@ func initShortcuts() {
SecondaryLockCleanupFailureCounterCommit = TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit")
SecondaryLockCleanupFailureCounterRollback = TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback")

TwoPCTxnCounterOk = TiKVTwoPCTxnCounter.WithLabelValues("ok")
TwoPCTxnCounterError = TiKVTwoPCTxnCounter.WithLabelValues("err")

AsyncCommitTxnCounterOk = TiKVAsyncCommitTxnCounter.WithLabelValues("ok")
AsyncCommitTxnCounterError = TiKVAsyncCommitTxnCounter.WithLabelValues("err")

Expand Down
42 changes: 42 additions & 0 deletions store/tikv/tests/1pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/util"
)
Expand Down Expand Up @@ -268,3 +269,44 @@ func (s *testOnePCSuite) Test1PCWithMultiDC(c *C) {
c.Assert(err, IsNil)
c.Assert(globalTxn.GetCommitter().IsOnePC(), IsTrue)
}

func (s *testOnePCSuite) TestTxnCommitCounter(c *C) {
initial := metrics.GetTxnCommitCounter()

// 2PC
txn := s.begin(c)
err := txn.Set([]byte("k"), []byte("v"))
c.Assert(err, IsNil)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
err = txn.Commit(ctx)
c.Assert(err, IsNil)
curr := metrics.GetTxnCommitCounter()
diff := curr.Sub(initial)
c.Assert(diff.TwoPC, Equals, int64(1))
c.Assert(diff.AsyncCommit, Equals, int64(0))
c.Assert(diff.OnePC, Equals, int64(0))

// AsyncCommit
txn = s.beginAsyncCommit(c)
err = txn.Set([]byte("k1"), []byte("v1"))
c.Assert(err, IsNil)
err = txn.Commit(ctx)
c.Assert(err, IsNil)
curr = metrics.GetTxnCommitCounter()
diff = curr.Sub(initial)
c.Assert(diff.TwoPC, Equals, int64(1))
c.Assert(diff.AsyncCommit, Equals, int64(1))
c.Assert(diff.OnePC, Equals, int64(0))

// 1PC
txn = s.begin1PC(c)
err = txn.Set([]byte("k2"), []byte("v2"))
c.Assert(err, IsNil)
err = txn.Commit(ctx)
c.Assert(err, IsNil)
curr = metrics.GetTxnCommitCounter()
diff = curr.Sub(initial)
c.Assert(diff.TwoPC, Equals, int64(1))
c.Assert(diff.AsyncCommit, Equals, int64(1))
c.Assert(diff.OnePC, Equals, int64(1))
}
4 changes: 4 additions & 0 deletions telemetry/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ func generateTelemetryData(ctx sessionctx.Context, trackingID string) telemetryD
r.TelemetryHostExtra = getTelemetryHostExtraInfo()
return r
}

func postReportTelemetryData() {
postReportTxnUsage()
}
47 changes: 33 additions & 14 deletions telemetry/data_feature_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (

"github.com/cznic/mathutil"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/util/sqlexec"
)

type featureUsageInfo struct {
AsyncCommitUsed bool `json:"asyncCommitUsed"`
TxnUsageInfo *TxnUsageInfo `json:"txnUsageInfo"`
CoprCacheUsed []*CoprCacheUsedWindowItem `json:"coprCacheUsed"`
ClusterIndexUsed map[string]bool `json:"clusterIndexUsed"`
TiFlashUsed []*TiFlashUsageItem `json:"tiFlashUsed"`
Expand Down Expand Up @@ -225,20 +227,37 @@ func getTelemetryFeatureUsageInfo(ctx sessionctx.Context) (*featureUsageInfo, er
usageInfo.ClusterIndexUsed[row.GetString(0)] = isClustered
}

// async commit
stmt, err = exec.ParseWithParams(context.TODO(), `show config where name = 'storage.enable-async-apply-prewrite'`)
if err != nil {
return nil, err
}
rows, _, err = exec.ExecRestrictedStmt(context.TODO(), stmt)
if err != nil {
return nil, err
// transaction related feature
usageInfo.TxnUsageInfo = GetTxnUsageInfo(ctx)

return &usageInfo, nil
}

// TxnUsageInfo records the usage info of transaction related features, including
// async-commit, 1PC and counters of transactions committed with different protocols.
type TxnUsageInfo struct {
AsyncCommitUsed bool `json:"asyncCommitUsed"`
OnePCUsed bool `json:"onePCUsed"`
TxnCommitCounter metrics.TxnCommitCounter `json:"txnCommitCounter"`
}

var initialTxnCommitCounter metrics.TxnCommitCounter

// GetTxnUsageInfo gets the usage info of transaction related features. It's exported for tests.
func GetTxnUsageInfo(ctx sessionctx.Context) *TxnUsageInfo {
asyncCommitUsed := false
if val, err := variable.GetGlobalSystemVar(ctx.GetSessionVars(), variable.TiDBEnableAsyncCommit); err == nil {
asyncCommitUsed = val == variable.BoolOn
}
if len(rows) > 0 {
if rows[0].GetString(3) == "true" {
usageInfo.AsyncCommitUsed = true
}
onePCUsed := false
if val, err := variable.GetGlobalSystemVar(ctx.GetSessionVars(), variable.TiDBEnable1PC); err == nil {
onePCUsed = val == variable.BoolOn
}
curr := metrics.GetTxnCommitCounter()
diff := curr.Sub(initialTxnCommitCounter)
return &TxnUsageInfo{asyncCommitUsed, onePCUsed, diff}
}

return &usageInfo, nil
func postReportTxnUsage() {
initialTxnCommitCounter = metrics.GetTxnCommitCounter()
}
92 changes: 92 additions & 0 deletions telemetry/data_feature_usage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package telemetry_test

import (
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/util/testkit"
)

var _ = Suite(&testFeatureInfoSuite{})

type testFeatureInfoSuite struct {
cluster cluster.Cluster
store kv.Storage
dom *domain.Domain
se session.Session
}

func (s *testFeatureInfoSuite) SetUpTest(c *C) {
store, err := mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c cluster.Cluster) {
mockstore.BootstrapWithSingleStore(c)
s.cluster = c
}),
mockstore.WithStoreType(mockstore.EmbedUnistore),
)
c.Assert(err, IsNil)
s.store = store
dom, err := session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.dom = dom
se, err := session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
s.se = se
}

func (s *testFeatureInfoSuite) TearDownSuite(c *C) {
s.se.Close()
s.dom.Close()
s.store.Close()
}

func (s *testFeatureInfoSuite) TestTxnUsageInfo(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(fmt.Sprintf("set global %s = 0", variable.TiDBEnableAsyncCommit))
tk.MustExec(fmt.Sprintf("set global %s = 0", variable.TiDBEnable1PC))
txnUsage := telemetry.GetTxnUsageInfo(s.se)
c.Assert(txnUsage.AsyncCommitUsed, IsFalse)
c.Assert(txnUsage.OnePCUsed, IsFalse)
tk.MustExec(fmt.Sprintf("set global %s = 1", variable.TiDBEnableAsyncCommit))
tk.MustExec(fmt.Sprintf("set global %s = 1", variable.TiDBEnable1PC))
txnUsage = telemetry.GetTxnUsageInfo(s.se)
c.Assert(txnUsage.AsyncCommitUsed, IsTrue)
c.Assert(txnUsage.OnePCUsed, IsTrue)

tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec("use test")
tk1.MustExec("drop table if exists txn_usage_info")
tk1.MustExec("create table txn_usage_info (a int)")
tk1.MustExec(fmt.Sprintf("set %s = 1", variable.TiDBEnableAsyncCommit))
tk1.MustExec(fmt.Sprintf("set %s = 1", variable.TiDBEnable1PC))
tk1.MustExec("insert into txn_usage_info values (1)")
tk1.MustExec(fmt.Sprintf("set %s = 0", variable.TiDBEnable1PC))
tk1.MustExec("insert into txn_usage_info values (2)")
tk1.MustExec(fmt.Sprintf("set %s = 0", variable.TiDBEnableAsyncCommit))
tk1.MustExec("insert into txn_usage_info values (3)")
txnUsage = telemetry.GetTxnUsageInfo(tk1.Se)
c.Assert(txnUsage.AsyncCommitUsed, IsTrue)
c.Assert(txnUsage.OnePCUsed, IsTrue)
c.Assert(txnUsage.TxnCommitCounter.AsyncCommit, Greater, int64(0))
c.Assert(txnUsage.TxnCommitCounter.OnePC, Greater, int64(0))
c.Assert(txnUsage.TxnCommitCounter.TwoPC, Greater, int64(0))
}
1 change: 1 addition & 0 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func reportUsageData(ctx sessionctx.Context, etcdClient *clientv3.Client) (bool,
}

data := generateTelemetryData(ctx, trackingID)
postReportTelemetryData()

rawJSON, err := json.Marshal(data)
if err != nil {
Expand Down

0 comments on commit 906828b

Please sign in to comment.