Skip to content

Commit

Permalink
Merge branch 'master' into incremental
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Dec 21, 2021
2 parents 504e09f + a8a858b commit 092237a
Show file tree
Hide file tree
Showing 7 changed files with 537 additions and 22 deletions.
2 changes: 2 additions & 0 deletions server/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/topsql"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -64,6 +65,7 @@ func NewRPCServer(config *config.Config, dom *domain.Domain, sm util.SessionMana
}
diagnosticspb.RegisterDiagnosticsServer(s, rpcSrv)
tikvpb.RegisterTikvServer(s, rpcSrv)
topsql.RegisterPubSubServer(s)
return s
}

Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,5 +832,5 @@ type TopSQL struct {

// TopSQLEnabled uses to check whether enabled the top SQL feature.
func TopSQLEnabled() bool {
return TopSQLVariable.Enable.Load() && config.GetGlobalConfig().TopSQL.ReceiverAddress != ""
return TopSQLVariable.Enable.Load()
}
67 changes: 67 additions & 0 deletions util/topsql/reporter/mock/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mock

import (
"fmt"
"net"

"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type mockPubSubServer struct {
addr string
listen net.Listener
grpcServer *grpc.Server
}

// NewMockPubSubServer creates a mock publisher server.
func NewMockPubSubServer() (*mockPubSubServer, error) {
addr := "127.0.0.1:0"
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
server := grpc.NewServer()

return &mockPubSubServer{
addr: fmt.Sprintf("127.0.0.1:%d", lis.Addr().(*net.TCPAddr).Port),
listen: lis,
grpcServer: server,
}, nil
}

func (svr *mockPubSubServer) Serve() {
err := svr.grpcServer.Serve(svr.listen)
if err != nil {
logutil.BgLogger().Warn("[top-sql] mock pubsub server serve failed", zap.Error(err))
}
}

func (svr *mockPubSubServer) Server() *grpc.Server {
return svr.grpcServer
}

func (svr *mockPubSubServer) Address() string {
return svr.addr
}

func (svr *mockPubSubServer) Stop() {
if svr.grpcServer != nil {
svr.grpcServer.Stop()
}
}
267 changes: 267 additions & 0 deletions util/topsql/reporter/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reporter

import (
"context"
"errors"
"time"

"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

// TopSQLPubSubService implements tipb.TopSQLPubSubServer.
//
// If a client subscribes to TopSQL records, the TopSQLPubSubService is responsible
// for registering an associated DataSink to the reporter. Then the DataSink sends
// data to the client periodically.
type TopSQLPubSubService struct {
dataSinkRegisterer DataSinkRegisterer
}

// NewTopSQLPubSubService creates a new TopSQLPubSubService.
func NewTopSQLPubSubService(dataSinkRegisterer DataSinkRegisterer) *TopSQLPubSubService {
return &TopSQLPubSubService{dataSinkRegisterer: dataSinkRegisterer}
}

var _ tipb.TopSQLPubSubServer = &TopSQLPubSubService{}

// Subscribe registers dataSinks to the reporter and redirects data received from reporter
// to subscribers associated with those dataSinks.
func (ps *TopSQLPubSubService) Subscribe(_ *tipb.TopSQLSubRequest, stream tipb.TopSQLPubSub_SubscribeServer) error {
ds := newPubSubDataSink(stream, ps.dataSinkRegisterer)
if err := ps.dataSinkRegisterer.Register(ds); err != nil {
return err
}
return ds.run()
}

type pubSubDataSink struct {
ctx context.Context
cancel context.CancelFunc

stream tipb.TopSQLPubSub_SubscribeServer
sendTaskCh chan sendTask

// for deregister
registerer DataSinkRegisterer
}

func newPubSubDataSink(stream tipb.TopSQLPubSub_SubscribeServer, registerer DataSinkRegisterer) *pubSubDataSink {
ctx, cancel := context.WithCancel(stream.Context())

return &pubSubDataSink{
ctx: ctx,
cancel: cancel,

stream: stream,
sendTaskCh: make(chan sendTask, 1),

registerer: registerer,
}
}

var _ DataSink = &pubSubDataSink{}

func (ds *pubSubDataSink) TrySend(data *ReportData, deadline time.Time) error {
select {
case ds.sendTaskCh <- sendTask{data: data, deadline: deadline}:
return nil
case <-ds.ctx.Done():
return ds.ctx.Err()
default:
ignoreReportChannelFullCounter.Inc()
return errors.New("the channel of pubsub dataSink is full")
}
}

func (ds *pubSubDataSink) OnReporterClosing() {
ds.cancel()
}

func (ds *pubSubDataSink) run() error {
defer func() {
ds.registerer.Deregister(ds)
ds.cancel()
}()

for {
select {
case task := <-ds.sendTaskCh:
ctx, cancel := context.WithDeadline(ds.ctx, task.deadline)
var err error

start := time.Now()
go util.WithRecovery(func() {
defer cancel()
err = ds.doSend(ctx, task.data)

if err != nil {
reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds())
} else {
reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds())
}
}, nil)

// When the deadline is exceeded, the closure inside `go util.WithRecovery` above may not notice that
// immediately because it can be blocked by `stream.Send`.
// In order to clean up resources as quickly as possible, we let that closure run in an individual goroutine,
// and wait for timeout here.
<-ctx.Done()

if errors.Is(ctx.Err(), context.DeadlineExceeded) {
logutil.BgLogger().Warn(
"[top-sql] pubsub datasink failed to send data to subscriber due to deadline exceeded",
zap.Time("deadline", task.deadline),
)
return ctx.Err()
}

if err != nil {
logutil.BgLogger().Warn(
"[top-sql] pubsub datasink failed to send data to subscriber",
zap.Error(err),
)
return err
}
case <-ds.ctx.Done():
return ds.ctx.Err()
}
}
}

func (ds *pubSubDataSink) doSend(ctx context.Context, data *ReportData) error {
if err := ds.sendCPUTime(ctx, data.CPUTimeRecords); err != nil {
return err
}
if err := ds.sendSQLMeta(ctx, data.SQLMetas); err != nil {
return err
}
return ds.sendPlanMeta(ctx, data.PlanMetas)
}

func (ds *pubSubDataSink) sendCPUTime(ctx context.Context, records []tipb.CPUTimeRecord) (err error) {
if len(records) == 0 {
return
}

start := time.Now()
sentCount := 0
defer func() {
topSQLReportRecordCounterHistogram.Observe(float64(sentCount))
if err != nil {
reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds())
} else {
reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds())
}
}()

cpuRecord := &tipb.TopSQLSubResponse_Record{}
r := &tipb.TopSQLSubResponse{RespOneof: cpuRecord}

for i := range records {
cpuRecord.Record = &records[i]
if err = ds.stream.Send(r); err != nil {
return
}
sentCount += 1

select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
}

return
}

func (ds *pubSubDataSink) sendSQLMeta(ctx context.Context, sqlMetas []tipb.SQLMeta) (err error) {
if len(sqlMetas) == 0 {
return
}

start := time.Now()
sentCount := 0
defer func() {
topSQLReportSQLCountHistogram.Observe(float64(sentCount))
if err != nil {
reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds())
} else {
reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds())
}
}()

sqlMeta := &tipb.TopSQLSubResponse_SqlMeta{}
r := &tipb.TopSQLSubResponse{RespOneof: sqlMeta}

for i := range sqlMetas {
sqlMeta.SqlMeta = &sqlMetas[i]
if err = ds.stream.Send(r); err != nil {
return
}
sentCount += 1

select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
}

return
}

func (ds *pubSubDataSink) sendPlanMeta(ctx context.Context, planMetas []tipb.PlanMeta) (err error) {
if len(planMetas) == 0 {
return
}

start := time.Now()
sentCount := 0
defer func() {
topSQLReportPlanCountHistogram.Observe(float64(sentCount))
if err != nil {
reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds())
} else {
reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds())
}
}()

planMeta := &tipb.TopSQLSubResponse_PlanMeta{}
r := &tipb.TopSQLSubResponse{RespOneof: planMeta}

for i := range planMetas {
planMeta.PlanMeta = &planMetas[i]
if err = ds.stream.Send(r); err != nil {
return
}
sentCount += 1

select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
}

return
}
Loading

0 comments on commit 092237a

Please sign in to comment.