Skip to content

Commit

Permalink
*: use values to reduce GC pressure (#2474) (#2497)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 12, 2021
1 parent 89aa3d9 commit 89de355
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 102 deletions.
39 changes: 21 additions & 18 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,13 @@ type regionEvent struct {
resolvedTs *cdcpb.ResolvedTs
}

// A special event that indicates singleEventFeed is closed.
var emptyRegionEvent = regionEvent{}

type regionFeedState struct {
sri singleRegionInfo
requestID uint64
regionEventCh chan *regionEvent
regionEventCh chan regionEvent
stopped int32

lock sync.RWMutex
Expand All @@ -164,7 +167,7 @@ func newRegionFeedState(sri singleRegionInfo, requestID uint64) *regionFeedState
return &regionFeedState{
sri: sri,
requestID: requestID,
regionEventCh: make(chan *regionEvent, 16),
regionEventCh: make(chan regionEvent, 16),
stopped: 0,
}
}
Expand Down Expand Up @@ -352,7 +355,7 @@ type CDCKVClient interface {
enableOldValue bool,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
eventCh chan<- *model.RegionFeedEvent,
eventCh chan<- model.RegionFeedEvent,
) error
Close() error
}
Expand Down Expand Up @@ -492,7 +495,7 @@ func (c *CDCClient) EventFeed(
enableOldValue bool,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
eventCh chan<- *model.RegionFeedEvent,
eventCh chan<- model.RegionFeedEvent,
) error {
s := newEventFeedSession(ctx, c, c.regionCache, c.kvStorage, span,
lockResolver, isPullerInit,
Expand Down Expand Up @@ -523,7 +526,7 @@ type eventFeedSession struct {
totalSpan regionspan.ComparableSpan

// The channel to send the processed events.
eventCh chan<- *model.RegionFeedEvent
eventCh chan<- model.RegionFeedEvent
// The token based region router, it controls the uninitialized regions with
// a given size limit.
regionRouter LimitRegionRouter
Expand Down Expand Up @@ -565,7 +568,7 @@ func newEventFeedSession(
isPullerInit PullerInitialization,
enableOldValue bool,
startTs uint64,
eventCh chan<- *model.RegionFeedEvent,
eventCh chan<- model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
kvClientCfg := config.GetGlobalServerConfig().KVClient
Expand Down Expand Up @@ -950,7 +953,7 @@ func (s *eventFeedSession) dispatchRequest(
// distribution in puller, so this resolved ts event is needed.
// After this resolved ts event is sent, we don't need to send one more
// resolved ts event when the region starts to work.
resolvedEv := &model.RegionFeedEvent{
resolvedEv := model.RegionFeedEvent{
RegionID: sri.verID.GetID(),
Resolved: &model.ResolvedSpan{
Span: sri.span,
Expand Down Expand Up @@ -1291,7 +1294,7 @@ func (s *eventFeedSession) receiveFromStream(

for _, state := range regionStates {
select {
case state.regionEventCh <- nil:
case state.regionEventCh <- emptyRegionEvent:
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -1390,7 +1393,7 @@ func (s *eventFeedSession) sendRegionChangeEvent(
}

select {
case state.regionEventCh <- &regionEvent{
case state.regionEventCh <- regionEvent{
changeEvent: event,
}:
case <-ctx.Done():
Expand All @@ -1416,7 +1419,7 @@ func (s *eventFeedSession) sendResolvedTs(
continue
}
select {
case state.regionEventCh <- &regionEvent{
case state.regionEventCh <- regionEvent{
resolvedTs: resolvedTs,
}:
case <-ctx.Done():
Expand All @@ -1438,7 +1441,7 @@ func (s *eventFeedSession) singleEventFeed(
span regionspan.ComparableSpan,
startTs uint64,
storeAddr string,
receiverCh <-chan *regionEvent,
receiverCh <-chan regionEvent,
) (lastResolvedTs uint64, initialized bool, err error) {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
Expand Down Expand Up @@ -1471,7 +1474,7 @@ func (s *eventFeedSession) singleEventFeed(
return nil
}
// emit a checkpointTs
revent := &model.RegionFeedEvent{
revent := model.RegionFeedEvent{
RegionID: regionID,
Resolved: &model.ResolvedSpan{
Span: span,
Expand All @@ -1495,7 +1498,7 @@ func (s *eventFeedSession) singleEventFeed(
})

for {
var event *regionEvent
var event regionEvent
var ok bool
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1543,12 +1546,12 @@ func (s *eventFeedSession) singleEventFeed(
case event, ok = <-receiverCh:
}

if !ok || event == nil {
if !ok || event == emptyRegionEvent {
log.Debug("singleEventFeed closed by error")
err = cerror.ErrEventFeedAborted.GenWithStackByArgs()
return
}
var revent *model.RegionFeedEvent
var revent model.RegionFeedEvent
lastReceivedEventTime = time.Now()
if event.changeEvent != nil {
metricEventSize.Observe(float64(event.changeEvent.Event.Size()))
Expand Down Expand Up @@ -1703,18 +1706,18 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can
return
}

func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bool) (*model.RegionFeedEvent, error) {
func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bool) (model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
case cdcpb.Event_Row_DELETE:
opType = model.OpTypeDelete
case cdcpb.Event_Row_PUT:
opType = model.OpTypePut
default:
return nil, cerror.ErrUnknownKVEventType.GenWithStackByArgs(entry.GetOpType(), entry)
return model.RegionFeedEvent{}, cerror.ErrUnknownKVEventType.GenWithStackByArgs(entry.GetOpType(), entry)
}

revent := &model.RegionFeedEvent{
revent := model.RegionFeedEvent{
RegionID: regionID,
Val: &model.RawKVEntry{
OpType: opType,
Expand Down
8 changes: 4 additions & 4 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
*sync.Map, /* regionID -> requestID/storeID */
*sync.WaitGroup, /* ensure eventfeed routine exit */
context.CancelFunc, /* cancle both mock server and cdc kv client */
chan *model.RegionFeedEvent, /* kv client output channel */
chan model.RegionFeedEvent, /* kv client output channel */
[]chan *cdcpb.ChangeDataEvent, /* mock server data channels */
) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -189,7 +189,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
lockresolver := txnutil.NewLockerResolver(kvStorage)
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 1000000)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
Expand Down Expand Up @@ -223,7 +223,7 @@ func prepareBench(b *testing.B, regionNum int) (
*sync.Map, /* regionID -> requestID */
*sync.WaitGroup, /* ensure eventfeed routine exit */
context.CancelFunc, /* cancle both mock server and cdc kv client */
chan *model.RegionFeedEvent, /* kv client output channel */
chan model.RegionFeedEvent, /* kv client output channel */
chan *cdcpb.ChangeDataEvent, /* mock server data channel */
) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -277,7 +277,7 @@ func prepareBench(b *testing.B, regionNum int) (
lockresolver := txnutil.NewLockerResolver(kvStorage)
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 1000000)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh)
Expand Down
Loading

0 comments on commit 89de355

Please sign in to comment.