Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve TSO proxy based on the existing TSO Follower Batching framework #6565

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add TSODispatchingStats
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Jun 8, 2023
commit e4b6fe25707f5f5bbd8931026479788f0a89cbfb
6 changes: 6 additions & 0 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type TSODispatcher struct {
tsoProxyHandleDuration prometheus.Histogram
tsoProxyBatchSize prometheus.Histogram

*TSODispatchingStats

ctx context.Context
// dispatchChs is used to dispatch different TSO requests to the corresponding forwarding TSO channels.
dispatchChs sync.Map // Store as map[string]chan Request (forwardedHost -> dispatch channel)
Expand All @@ -53,6 +55,7 @@ func NewTSODispatcher(
ctx: ctx,
tsoProxyHandleDuration: tsoProxyHandleDuration,
tsoProxyBatchSize: tsoProxyBatchSize,
TSODispatchingStats: &TSODispatchingStats{},
}
return tsoDispatcher
}
Expand Down Expand Up @@ -117,13 +120,16 @@ func (s *TSODispatcher) startDispatchLoop(
pendingTSOReqCount := 0

log.Info("start the dispatch loop", zap.String("forwarded-host", forwardedHost))
s.EnterDispatcher()

defer func() {
log.Info("exiting from the dispatch loop. cleaning up the pending requests",
zap.String("forwarded-host", forwardedHost))
if forwardStream != nil {
forwardStream.closeSend()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that a new stream is created for each request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't, because we only start one dispatch loop (this function) for all requests with the same forwarded host until there is any forwarding related error which causes the loop to exit, and we do this check and forwardStream.closeSend() in defer func() when exiting from this loop.

}
s.cleanup(forwardedHost, forwardErr, pendingRequests[:pendingTSOReqCount])
s.LeaveDispatcher()
log.Info("the dispatch loop exited", zap.String("forwarded-host", forwardedHost))
}()

Expand Down
123 changes: 123 additions & 0 deletions pkg/utils/tsoutil/tso_dispatching_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tsoutil

import (
"sync"
"sync/atomic"
)

// TSODispatchingStats records the statistics of TSO dispatching.
type TSODispatchingStats struct {
// Count the number of TSO streaming routines.
streamingRoutinesLock sync.RWMutex
aliveTSOStreamingRoutines int64
peakTSOStreamingRoutines int64

// Count the number of dispatchers.
dispatcherCountLock sync.RWMutex
aliveDispatcherCount int64
peakDispatcherCount int64

dispatcherExitCount atomic.Int64
}

// NewTSODispatchingStats creates a TSODispatchingStats.
func NewTSODispatchingStats(
aliveTSOStreamingRoutine int64,
peakTSOStreamingRoutines int64,
aliveDispatcherCount int64,
peakDispatcherCount int64,
dispatcherExitCount int64,
) *TSODispatchingStats {
stats := &TSODispatchingStats{
aliveTSOStreamingRoutines: aliveTSOStreamingRoutine,
peakTSOStreamingRoutines: peakTSOStreamingRoutines,

aliveDispatcherCount: aliveDispatcherCount,
peakDispatcherCount: peakDispatcherCount,
}
stats.dispatcherExitCount.Store(dispatcherExitCount)
return stats
}

// GetAliveTSOStreamingRoutines returns the current value.
func (s *TSODispatchingStats) GetAliveTSOStreamingRoutines() int64 {
s.streamingRoutinesLock.RLock()
defer s.streamingRoutinesLock.RUnlock()
return s.aliveTSOStreamingRoutines
}

// GetPeakTSOStreamingRoutines returns the current value of peakTSOStreamingRoutines.
func (s *TSODispatchingStats) GetPeakTSOStreamingRoutines() int64 {
s.streamingRoutinesLock.RLock()
defer s.streamingRoutinesLock.RUnlock()
return s.peakTSOStreamingRoutines
}

// GetAliveDispatcherCount returns the current value of aliveDispatcherCount.
func (s *TSODispatchingStats) GetAliveDispatcherCount() int64 {
s.dispatcherCountLock.RLock()
defer s.dispatcherCountLock.RUnlock()
return s.aliveDispatcherCount
}

// GetPeakDispatcherCount returns the current value of peakDispatcherCount.
func (s *TSODispatchingStats) GetPeakDispatcherCount() int64 {
s.dispatcherCountLock.RLock()
defer s.dispatcherCountLock.RUnlock()
return s.peakDispatcherCount
}

// GetDispatcherExitCount returns the current value of dispatcherExitCount.
func (s *TSODispatchingStats) GetDispatcherExitCount() int64 {
return s.dispatcherExitCount.Load()
}

// EnterTSOStreamingRoutine is called when entering into a TSO streaming routine.
func (s *TSODispatchingStats) EnterTSOStreamingRoutine() {
s.streamingRoutinesLock.Lock()
defer s.streamingRoutinesLock.Unlock()
s.aliveDispatcherCount++
if s.aliveDispatcherCount > s.peakDispatcherCount {
s.peakDispatcherCount = s.aliveDispatcherCount
}
}

// LeaveTSOStreamingRoutine is called when a TSO streaming routine exits.
func (s *TSODispatchingStats) LeaveTSOStreamingRoutine() {
s.streamingRoutinesLock.Lock()
defer s.streamingRoutinesLock.Unlock()
s.aliveDispatcherCount--
}

// EnterDispatcher is called when entering into a dispatcher.
func (s *TSODispatchingStats) EnterDispatcher() {
s.dispatcherCountLock.Lock()
defer s.dispatcherCountLock.Unlock()
s.aliveDispatcherCount++
if s.aliveDispatcherCount > s.peakDispatcherCount {
s.peakDispatcherCount = s.aliveDispatcherCount
}
}

// LeaveDispatcher is called when a dispatcher exits.
func (s *TSODispatchingStats) LeaveDispatcher() {
s.dispatcherCountLock.Lock()
s.aliveDispatcherCount--
s.dispatcherCountLock.Unlock()

s.dispatcherExitCount.Add(1)
}
5 changes: 5 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {

// forwardTSO forwards the incoming TSO requests to the TSO microservice.
func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update getGlobalTSOFromTSOServer?

Copy link
Contributor Author

@binshi-bing binshi-bing Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the change in this pr is proved to work as expected in dev/staging, I plan to keep the other RPCs unchanged and unimpacted.

if s.IsAPIServiceMode() {
s.tsoDispatcher.EnterTSOStreamingRoutine()
defer s.tsoDispatcher.LeaveTSOStreamingRoutine()
}

streamCtx := stream.Context()
responseCh := make(chan *pdpb.TsoResponse, 1)

Expand Down
16 changes: 4 additions & 12 deletions tests/integrations/mcs/tso/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *tsoProxyTestSuite) TearDownSuite() {

// TestTSOProxyBasic tests the TSO Proxy's basic function to forward TSO requests to TSO microservice.
func (s *tsoProxyTestSuite) TestTSOProxyBasic() {
s.verifyTSOProxy(s.streams, 100)
s.verifyTSOProxy(s.streams, 1000)
}

func (s *tsoProxyTestSuite) cleanupGRPCStreams(
Expand All @@ -117,7 +117,7 @@ func (s *tsoProxyTestSuite) verifyTSOProxy(
for i := 0; i < requestsPerClient; i++ {
reqs[i] = &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{ClusterId: s.apiLeader.GetClusterID()},
Count: uint32(i) + 1, // Make sure the count is not zero.
Count: uint32(i) + 1, // Make sure the count is positive.
}
}

Expand Down Expand Up @@ -273,7 +273,7 @@ func benchmarkTSOProxyNClients(clientCount int, sameContext bool, b *testing.B)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

grpcClientConns, streams, cancelFuns := createTSOStreams(re, ctx, suite.backendEndpoints, clientCount, sameContext)
grpcClientConns, streams, cancelFuncs := createTSOStreams(re, ctx, suite.backendEndpoints, clientCount, sameContext)

// Benchmark TSO proxy
b.ResetTimer()
Expand All @@ -293,15 +293,7 @@ func benchmarkTSOProxyNClients(clientCount int, sameContext bool, b *testing.B)
}
b.StopTimer()

for _, stream := range streams {
stream.CloseSend()
}
for _, conn := range grpcClientConns {
conn.Close()
}
for _, cancelFun := range cancelFuns {
cancelFun()
}
suite.cleanupGRPCStreams(grpcClientConns, streams, cancelFuncs)

suite.TearDownSuite()
}