From ec9f2012943785e0d1e1474759df59e956b68f96 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 6 Jul 2022 16:03:02 +0800 Subject: [PATCH] topsql: compress plan instead of dropping it (#35973) ref pingcap/tidb#35964 --- DEPS.bzl | 4 +-- executor/seqtest/BUILD.bazel | 1 + go.mod | 2 +- go.sum | 4 +-- meta/BUILD.bazel | 2 +- session/BUILD.bazel | 1 + util/topsql/collector/mock/mock.go | 6 +++- util/topsql/reporter/BUILD.bazel | 1 + util/topsql/reporter/datamodel.go | 43 ++++++++++++++++++++------ util/topsql/reporter/datamodel_test.go | 42 +++++++++++++++---------- util/topsql/reporter/reporter.go | 15 ++++++--- util/topsql/reporter/reporter_test.go | 18 ++++++----- util/topsql/topsql.go | 9 ++---- util/topsql/topsql_test.go | 10 ++++-- 14 files changed, 102 insertions(+), 56 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 70b5a1e3cd504..909a95484d13c 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2523,8 +2523,8 @@ def go_deps(): name = "com_github_pingcap_tipb", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/tipb", - sum = "h1:oYn6UiUSnVlMBr4rLOweNWtdAon5wCLnLGDSFf/8kMA=", - version = "v0.0.0-20220704030114-0f4f873beca8", + sum = "h1:XaTE4ZhQbQtQZtAVzlZh/Pf6SjFfMSTe1ia2nGcl36Y=", + version = "v0.0.0-20220706024432-7be3cc83a7d5", ) go_repository( name = "com_github_pkg_browser", diff --git a/executor/seqtest/BUILD.bazel b/executor/seqtest/BUILD.bazel index 50c4bbdb65c55..c248e2e1fd30a 100644 --- a/executor/seqtest/BUILD.bazel +++ b/executor/seqtest/BUILD.bazel @@ -13,6 +13,7 @@ go_test( "//ddl/util", "//errno", "//executor", + "//infoschema", "//kv", "//meta/autoid", "//metrics", diff --git a/go.mod b/go.mod index 2eb56cd25c14e..2d67c63904fc3 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/pingcap/log v1.1.0 github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e - github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8 + github.com/pingcap/tipb v0.0.0-20220706024432-7be3cc83a7d5 github.com/prometheus/client_golang v1.12.2 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.32.1 diff --git a/go.sum b/go.sum index bf3c60774f1c1..f4e4fb8b40f4f 100644 --- a/go.sum +++ b/go.sum @@ -675,8 +675,8 @@ github.com/pingcap/log v1.1.0 h1:ELiPxACz7vdo1qAvvaWJg1NrYFoY6gqAh/+Uo6aXdD8= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops= -github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8 h1:oYn6UiUSnVlMBr4rLOweNWtdAon5wCLnLGDSFf/8kMA= -github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tipb v0.0.0-20220706024432-7be3cc83a7d5 h1:XaTE4ZhQbQtQZtAVzlZh/Pf6SjFfMSTe1ia2nGcl36Y= +github.com/pingcap/tipb v0.0.0-20220706024432-7be3cc83a7d5/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/meta/BUILD.bazel b/meta/BUILD.bazel index 281b570cb29f5..a266aa48d2494 100644 --- a/meta/BUILD.bazel +++ b/meta/BUILD.bazel @@ -19,7 +19,6 @@ go_library( "//util/logutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/kvrpcpb", - "@org_golang_x_exp//slices", "@org_uber_go_zap//:zap", ], ) @@ -33,6 +32,7 @@ go_test( embed = [":meta"], flaky = True, deps = [ + "//ddl", "//kv", "//parser/model", "//store/mockstore", diff --git a/session/BUILD.bazel b/session/BUILD.bazel index 864e2d563679d..b75f26dc03634 100644 --- a/session/BUILD.bazel +++ b/session/BUILD.bazel @@ -123,6 +123,7 @@ go_test( "//domain", "//errno", "//executor", + "//infoschema", "//kv", "//meta", "//parser/ast", diff --git a/util/topsql/collector/mock/mock.go b/util/topsql/collector/mock/mock.go index 50d44bdca0b86..0320ca2c60aa8 100644 --- a/util/topsql/collector/mock/mock.go +++ b/util/topsql/collector/mock/mock.go @@ -161,7 +161,11 @@ func (c *TopSQLCollector) RegisterSQL(sqlDigest []byte, normalizedSQL string, is } // RegisterPlan uses for testing. -func (c *TopSQLCollector) RegisterPlan(planDigest []byte, normalizedPlan string) { +func (c *TopSQLCollector) RegisterPlan(planDigest []byte, normalizedPlan string, isLarge bool) { + if isLarge { + return + } + digestStr := string(hack.String(planDigest)) c.Lock() _, ok := c.planMap[digestStr] diff --git a/util/topsql/reporter/BUILD.bazel b/util/topsql/reporter/BUILD.bazel index 26cedc90a0bcf..04f402018c9a2 100644 --- a/util/topsql/reporter/BUILD.bazel +++ b/util/topsql/reporter/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//config", "//metrics", "//util", + "//util/hack", "//util/logutil", "//util/topsql/collector", "//util/topsql/state", diff --git a/util/topsql/reporter/datamodel.go b/util/topsql/reporter/datamodel.go index 7de6f4e4392e3..747d9ceeae77f 100644 --- a/util/topsql/reporter/datamodel.go +++ b/util/topsql/reporter/datamodel.go @@ -20,6 +20,7 @@ import ( "sync" "sync/atomic" + "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/topsql/collector" topsqlstate "github.com/pingcap/tidb/util/topsql/state" @@ -594,6 +595,13 @@ type sqlMeta struct { isInternal bool } +// planMeta contains a binaryNormalizedPlan and a bool field isLarge to indicate +// whether that binaryNormalizedPlan is too large to decode quickly +type planMeta struct { + binaryNormalizedPlan string + isLarge bool +} + // normalizedSQLMap is a wrapped map used to register normalizedSQL. type normalizedSQLMap struct { data atomic.Value // *sync.Map @@ -654,6 +662,10 @@ func (m *normalizedSQLMap) toProto() []tipb.SQLMeta { // normalizedPlanMap to protobuf representation. type planBinaryDecodeFunc func(string) (string, error) +// planBinaryCompressFunc is used to compress large normalized plan +// into encoded format +type planBinaryCompressFunc func([]byte) string + // normalizedSQLMap is a wrapped map used to register normalizedPlan. type normalizedPlanMap struct { data atomic.Value // *sync.Map @@ -668,13 +680,16 @@ func newNormalizedPlanMap() *normalizedPlanMap { // register saves the relationship between planDigest and normalizedPlan. // If the internal map size exceeds the limit, the relationship will be discarded. -func (m *normalizedPlanMap) register(planDigest []byte, normalizedPlan string) { +func (m *normalizedPlanMap) register(planDigest []byte, normalizedPlan string, isLarge bool) { if m.length.Load() >= topsqlstate.GlobalState.MaxCollect.Load() { ignoreExceedPlanCounter.Inc() return } data := m.data.Load().(*sync.Map) - _, loaded := data.LoadOrStore(string(planDigest), normalizedPlan) + _, loaded := data.LoadOrStore(string(planDigest), planMeta{ + binaryNormalizedPlan: normalizedPlan, + isLarge: isLarge, + }) if !loaded { m.length.Add(1) } @@ -693,18 +708,26 @@ func (m *normalizedPlanMap) take() *normalizedPlanMap { } // toProto converts the normalizedPlanMap to the corresponding protobuf representation. -func (m *normalizedPlanMap) toProto(decodePlan planBinaryDecodeFunc) []tipb.PlanMeta { +func (m *normalizedPlanMap) toProto(decodePlan planBinaryDecodeFunc, compressPlan planBinaryCompressFunc) []tipb.PlanMeta { metas := make([]tipb.PlanMeta, 0, m.length.Load()) m.data.Load().(*sync.Map).Range(func(k, v interface{}) bool { - planDecoded, errDecode := decodePlan(v.(string)) - if errDecode != nil { - logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode)) + originalMeta := v.(planMeta) + protoMeta := tipb.PlanMeta{ + PlanDigest: hack.Slice(k.(string)), + } + + var err error + if originalMeta.isLarge { + protoMeta.EncodedNormalizedPlan = compressPlan(hack.Slice(originalMeta.binaryNormalizedPlan)) + } else { + protoMeta.NormalizedPlan, err = decodePlan(originalMeta.binaryNormalizedPlan) + } + if err != nil { + logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(err)) return true } - metas = append(metas, tipb.PlanMeta{ - PlanDigest: []byte(k.(string)), - NormalizedPlan: planDecoded, - }) + + metas = append(metas, protoMeta) return true }) return metas diff --git a/util/topsql/reporter/datamodel_test.go b/util/topsql/reporter/datamodel_test.go index b898fa152260e..82e09bbb07a23 100644 --- a/util/topsql/reporter/datamodel_test.go +++ b/util/topsql/reporter/datamodel_test.go @@ -400,16 +400,22 @@ func Test_normalizedSQLMap_toProto(t *testing.T) { func Test_normalizedPlanMap_register(t *testing.T) { topsqlstate.GlobalState.MaxCollect.Store(2) m := newNormalizedPlanMap() - m.register([]byte("PLAN-1"), "PLAN-1") - m.register([]byte("PLAN-2"), "PLAN-2") - m.register([]byte("PLAN-3"), "PLAN-3") + m.register([]byte("PLAN-1"), "PLAN-1", false) + m.register([]byte("PLAN-2"), "PLAN-2", true) + m.register([]byte("PLAN-3"), "PLAN-3", false) require.Equal(t, int64(2), m.length.Load()) v, ok := m.data.Load().(*sync.Map).Load("PLAN-1") require.True(t, ok) - require.Equal(t, "PLAN-1", v.(string)) + require.Equal(t, planMeta{ + binaryNormalizedPlan: "PLAN-1", + isLarge: false, + }, v.(planMeta)) v, ok = m.data.Load().(*sync.Map).Load("PLAN-2") require.True(t, ok) - require.Equal(t, "PLAN-2", v.(string)) + require.Equal(t, planMeta{ + binaryNormalizedPlan: "PLAN-2", + isLarge: true, + }, v.(planMeta)) _, ok = m.data.Load().(*sync.Map).Load("PLAN-3") require.False(t, ok) } @@ -417,9 +423,9 @@ func Test_normalizedPlanMap_register(t *testing.T) { func Test_normalizedPlanMap_take(t *testing.T) { topsqlstate.GlobalState.MaxCollect.Store(999) m1 := newNormalizedPlanMap() - m1.register([]byte("PLAN-1"), "PLAN-1") - m1.register([]byte("PLAN-2"), "PLAN-2") - m1.register([]byte("PLAN-3"), "PLAN-3") + m1.register([]byte("PLAN-1"), "PLAN-1", false) + m1.register([]byte("PLAN-2"), "PLAN-2", false) + m1.register([]byte("PLAN-3"), "PLAN-3", false) m2 := m1.take() require.Equal(t, int64(0), m1.length.Load()) require.Equal(t, int64(3), m2.length.Load()) @@ -442,26 +448,28 @@ func Test_normalizedPlanMap_take(t *testing.T) { func Test_normalizedPlanMap_toProto(t *testing.T) { topsqlstate.GlobalState.MaxCollect.Store(999) m := newNormalizedPlanMap() - m.register([]byte("PLAN-1"), "PLAN-1") - m.register([]byte("PLAN-2"), "PLAN-2") - m.register([]byte("PLAN-3"), "PLAN-3") - pb := m.toProto(func(s string) (string, error) { return s, nil }) + m.register([]byte("PLAN-1"), "PLAN-1", false) + m.register([]byte("PLAN-2"), "PLAN-2", true) + m.register([]byte("PLAN-3"), "PLAN-3", false) + pb := m.toProto( + func(s string) (string, error) { return "[decoded] " + s, nil }, + func(s []byte) string { return "[encoded] " + string(s) }) require.Len(t, pb, 3) hash := map[string]tipb.PlanMeta{} for _, meta := range pb { - hash[meta.NormalizedPlan] = meta + hash[string(meta.PlanDigest)] = meta } require.Equal(t, tipb.PlanMeta{ PlanDigest: []byte("PLAN-1"), - NormalizedPlan: "PLAN-1", + NormalizedPlan: "[decoded] PLAN-1", }, hash["PLAN-1"]) require.Equal(t, tipb.PlanMeta{ - PlanDigest: []byte("PLAN-2"), - NormalizedPlan: "PLAN-2", + PlanDigest: []byte("PLAN-2"), + EncodedNormalizedPlan: "[encoded] PLAN-2", }, hash["PLAN-2"]) require.Equal(t, tipb.PlanMeta{ PlanDigest: []byte("PLAN-3"), - NormalizedPlan: "PLAN-3", + NormalizedPlan: "[decoded] PLAN-3", }, hash["PLAN-3"]) } diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index 501d9bd64b0b2..d4b7cf571b0d1 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -51,7 +51,8 @@ type TopSQLReporter interface { RegisterSQL(sqlDigest []byte, normalizedSQL string, isInternal bool) // RegisterPlan like RegisterSQL, but for normalized plan strings. - RegisterPlan(planDigest []byte, normalizedPlan string) + // isLarge indicates the size of normalizedPlan is big. + RegisterPlan(planDigest []byte, normalizedPlan string, isLarge bool) // Close uses to close and release the reporter resource. Close() @@ -80,12 +81,15 @@ type RemoteTopSQLReporter struct { // calling decodePlan this can take a while, so should not block critical paths. decodePlan planBinaryDecodeFunc + + // Instead of dropping large plans, we compress it into encoded format and report + compressPlan planBinaryCompressFunc } // NewRemoteTopSQLReporter creates a new RemoteTopSQLReporter. // // decodePlan is a decoding function which will be called asynchronously to decode the plan binary to string. -func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter { +func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc, compressPlan planBinaryCompressFunc) *RemoteTopSQLReporter { ctx, cancel := context.WithCancel(context.Background()) tsr := &RemoteTopSQLReporter{ DefaultDataSinkRegisterer: NewDefaultDataSinkRegisterer(ctx), @@ -99,6 +103,7 @@ func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc) *RemoteTopSQLRepor normalizedPlanMap: newNormalizedPlanMap(), stmtStatsBuffer: map[uint64]stmtstats.StatementStatsMap{}, decodePlan: decodePlan, + compressPlan: compressPlan, } tsr.sqlCPUCollector = collector.NewSQLCPUCollector(tsr) return tsr @@ -153,8 +158,8 @@ func (tsr *RemoteTopSQLReporter) RegisterSQL(sqlDigest []byte, normalizedSQL str // RegisterPlan implements TopSQLReporter. // // This function is thread-safe and efficient. -func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedPlan string) { - tsr.normalizedPlanMap.register(planDigest, normalizedPlan) +func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedPlan string, isLarge bool) { + tsr.normalizedPlanMap.register(planDigest, normalizedPlan, isLarge) } // Close implements TopSQLReporter. @@ -270,7 +275,7 @@ func (tsr *RemoteTopSQLReporter) reportWorker() { tsr.doReport(&ReportData{ DataRecords: rs.toProto(), SQLMetas: data.normalizedSQLMap.toProto(), - PlanMetas: data.normalizedPlanMap.toProto(tsr.decodePlan), + PlanMetas: data.normalizedPlanMap.toProto(tsr.decodePlan, tsr.compressPlan), }) case <-tsr.ctx.Done(): return diff --git a/util/topsql/reporter/reporter_test.go b/util/topsql/reporter/reporter_test.go index 7dce8079eeaf3..c57e4ba3ff906 100644 --- a/util/topsql/reporter/reporter_test.go +++ b/util/topsql/reporter/reporter_test.go @@ -44,7 +44,7 @@ func populateCache(tsr *RemoteTopSQLReporter, begin, end int, timestamp uint64) for i := begin; i < end; i++ { key := []byte("planDigest" + strconv.Itoa(i+1)) value := "planNormalized" + strconv.Itoa(i+1) - tsr.RegisterPlan(key, value) + tsr.RegisterPlan(key, value, false) } // collect var records []collector.SQLCPUTimeRecord @@ -63,7 +63,7 @@ func reportCache(tsr *RemoteTopSQLReporter) { tsr.doReport(&ReportData{ DataRecords: tsr.collecting.take().getReportRecords().toProto(), SQLMetas: tsr.normalizedSQLMap.take().toProto(), - PlanMetas: tsr.normalizedPlanMap.take().toProto(tsr.decodePlan), + PlanMetas: tsr.normalizedPlanMap.take().toProto(tsr.decodePlan, tsr.compressPlan), }) } @@ -71,6 +71,10 @@ func mockPlanBinaryDecoderFunc(plan string) (string, error) { return plan, nil } +func mockPlanBinaryCompressFunc(plan []byte) string { + return string(plan) +} + type mockDataSink struct { ch chan *ReportData } @@ -94,7 +98,7 @@ func setupRemoteTopSQLReporter(maxStatementsNum, interval int) (*RemoteTopSQLRep topsqlstate.GlobalState.MaxCollect.Store(10000) topsqlstate.GlobalState.ReportIntervalSeconds.Store(int64(interval)) topsqlstate.EnableTopSQL() - ts := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + ts := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc, mockPlanBinaryCompressFunc) ds := newMockDataSink2() if err := ts.Register(ds); err != nil { panic(err) @@ -194,7 +198,7 @@ func newSQLCPUTimeRecord(tsr *RemoteTopSQLReporter, sqlID int, cpuTimeMs uint32) key = []byte("planDigest" + strconv.Itoa(sqlID)) value = "planNormalized" + strconv.Itoa(sqlID) - tsr.RegisterPlan(key, value) + tsr.RegisterPlan(key, value, false) return collector.SQLCPUTimeRecord{ SQLDigest: []byte("sqlDigest" + strconv.Itoa(sqlID)), @@ -317,7 +321,7 @@ func TestCollectCapacity(t *testing.T) { for i := 0; i < n; i++ { key := []byte("planDigest" + strconv.Itoa(i)) value := "planNormalized" + strconv.Itoa(i) - tsr.RegisterPlan(key, value) + tsr.RegisterPlan(key, value, false) } } genRecord := func(n int) []collector.SQLCPUTimeRecord { @@ -391,7 +395,7 @@ func TestMultipleDataSinks(t *testing.T) { topsqlstate.GlobalState.ReportIntervalSeconds.Store(1) topsqlstate.EnableTopSQL() - tsr := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + tsr := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc, mockPlanBinaryCompressFunc) var chs []chan *ReportData for i := 0; i < 7; i++ { @@ -477,7 +481,7 @@ func TestMultipleDataSinks(t *testing.T) { func TestReporterWorker(t *testing.T) { topsqlstate.GlobalState.ReportIntervalSeconds.Store(3) - r := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + r := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc, mockPlanBinaryCompressFunc) r.Start() defer r.Close() diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index f416de28ed7f6..61ffca2b0d460 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -45,7 +45,7 @@ var ( ) func init() { - remoteReporter := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan) + remoteReporter := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan, plancodec.Compress) globalTopSQLReport = remoteReporter singleTargetDataSink = reporter.NewSingleTargetDataSink(remoteReporter) } @@ -182,10 +182,5 @@ func linkSQLTextWithDigest(sqlDigest []byte, normalizedSQL string, isInternal bo } func linkPlanTextWithDigest(planDigest []byte, normalizedBinaryPlan string) { - if len(normalizedBinaryPlan) > MaxBinaryPlanSize { - // ignore the huge size plan - return - } - - globalTopSQLReport.RegisterPlan(planDigest, normalizedBinaryPlan) + globalTopSQLReport.RegisterPlan(planDigest, normalizedBinaryPlan, len(normalizedBinaryPlan) > MaxBinaryPlanSize) } diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index 1d9c1ccdda5f9..04855ac163011 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -86,6 +86,10 @@ func mockPlanBinaryDecoderFunc(plan string) (string, error) { return plan, nil } +func mockPlanBinaryCompressFunc(plan []byte) string { + return string(plan) +} + func TestTopSQLReporter(t *testing.T) { err := cpuprofile.StartCPUProfiler() require.NoError(t, err) @@ -100,7 +104,7 @@ func TestTopSQLReporter(t *testing.T) { }) topsqlstate.EnableTopSQL() - report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc, mockPlanBinaryCompressFunc) report.Start() ds := reporter.NewSingleTargetDataSink(report) ds.Start() @@ -222,7 +226,7 @@ func TestTopSQLPubSub(t *testing.T) { topsqlstate.GlobalState.ReportIntervalSeconds.Store(1) topsqlstate.EnableTopSQL() - report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc, mockPlanBinaryCompressFunc) report.Start() defer report.Close() topsql.SetupTopSQLForTest(report) @@ -341,7 +345,7 @@ func TestTopSQLPubSub(t *testing.T) { func TestPubSubWhenReporterIsStopped(t *testing.T) { topsqlstate.EnableTopSQL() - report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc) + report := reporter.NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc, mockPlanBinaryCompressFunc) report.Start() server, err := mockServer.NewMockPubSubServer()