Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update couchbase membership rebalance #49

Closed
wants to merge 11 commits into from
8 changes: 4 additions & 4 deletions couchbase/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import (
"sync"
"time"

"github.com/couchbase/gocbcore/v10"
jsoniter "github.com/json-iterator/go"

"github.com/Trendyol/go-dcp-client/config"

"github.com/Trendyol/go-dcp-client/helpers"
"github.com/Trendyol/go-dcp-client/logger"
"github.com/Trendyol/go-dcp-client/membership"

"github.com/json-iterator/go"

"github.com/google/uuid"

"github.com/couchbase/gocbcore/v10"
"github.com/couchbase/gocbcore/v10/memd"
)

Expand Down Expand Up @@ -368,7 +368,7 @@ func (h *cbMembership) rebalance(instances []Instance) {
selfOrder := 0

for index, instance := range instances {
if *instance.ID == string(h.id) {
if *(instance.ID) == string(h.id) {
selfOrder = index + 1
break
}
Expand Down
1 change: 1 addition & 0 deletions stream/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

const (
CheckpointTypeAuto = "auto"
CheckpointTypeManual = "manual"
CheckpointAutoResetTypeLatest = "latest"
)

Expand Down
71 changes: 49 additions & 22 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ type stream struct {
stopCh chan struct{}
dirtyOffsets *wrapper.SyncMap[uint16, bool]
listener models.Listener
finishListenerBuffer chan struct{}
config *config.Dcp
metric *Metric
finishStreamWithCloseCh chan struct{}
collectionIDs map[uint32]string
activeStreams int
anyDirtyOffset bool
balancing bool
rebalanceLock sync.Mutex
}

func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) {
Expand Down Expand Up @@ -107,6 +109,7 @@ func (s *stream) listen() {
default:
}
}
s.finishListenerBuffer <- struct{}{}
}

func (s *stream) listenEnd() {
Expand Down Expand Up @@ -144,29 +147,38 @@ func (s *stream) Open() {
go s.wait()
}

func (s *stream) rebalance() {
s.balancing = true

s.Save()
s.Close()
s.Open()
func (s *stream) Rebalance() {
if s.balancing {
s.rebalanceTimer.Reset(s.config.Dcp.Group.Membership.RebalanceDelay)
logger.Log.Printf("latest rebalance time is resetted")
return
}
s.rebalanceLock.Lock()
if !s.balancing {
s.balancing = true
s.Save()
s.Close()
}

s.balancing = false
logger.Log.Printf("rebalance will start after %v", s.config.Dcp.Group.Membership.RebalanceDelay)

logger.Log.Printf("rebalance is finished")
s.rebalanceTimer = time.AfterFunc(s.config.Dcp.Group.Membership.RebalanceDelay, func() {
defer s.rebalanceLock.Unlock()
s.ResetStream()
s.Open()

s.metric.Rebalance++
}
s.balancing = false

func (s *stream) Rebalance() {
if s.rebalanceTimer != nil {
s.rebalanceTimer.Stop()
logger.Log.Printf("latest rebalance is canceled")
}
logger.Log.Printf("rebalance is finished")

s.rebalanceTimer = time.AfterFunc(s.config.Dcp.Group.Membership.RebalanceDelay, s.rebalance)
s.metric.Rebalance++
})
}

logger.Log.Printf("rebalance will start after %v", s.config.Dcp.Group.Membership.RebalanceDelay)
func (s *stream) ResetStream() {
s.finishStreamWithCloseCh = make(chan struct{}, 1)
s.finishStreamWithEndEventCh = make(chan struct{}, 1)
s.finishListenerBuffer = make(chan struct{})
}

func (s *stream) Save() {
Expand Down Expand Up @@ -235,15 +247,29 @@ func (s *stream) Close() {
s.rollbackMitigation.Stop()
}

err := s.closeAllStreams()
if err != nil {
logger.ErrorLog.Printf("cannot close all streams: %v", err)
}

s.observer.Close()
<-s.finishListenerBuffer

if s.checkpoint != nil {
s.checkpoint.StopSchedule()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

for s.anyDirtyOffset && s.config.Checkpoint.Type == CheckpointTypeManual {
select {
case <-ctx.Done():
logger.ErrorLog.Printf("there are dirty offsets while closing the stream")
break // avoid infinite loop
default:
continue
}
}

err := s.closeAllStreams()
if err != nil {
logger.ErrorLog.Printf("cannot close all streams: %v", err)
if s.checkpoint != nil {
s.checkpoint.StopSchedule()
}

s.finishStreamWithCloseCh <- struct{}{}
Expand Down Expand Up @@ -294,6 +320,7 @@ func NewStream(client couchbase.Client,
collectionIDs: collectionIDs,
finishStreamWithCloseCh: make(chan struct{}, 1),
finishStreamWithEndEventCh: make(chan struct{}, 1),
finishListenerBuffer: make(chan struct{}),
stopCh: stopCh,
bus: bus,
metric: &Metric{
Expand Down