Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type changeCache struct {
started base.AtomicBool // Set by the Start method
stopped base.AtomicBool // Set by the Stop method
skippedSeqs *SkippedSequenceSkiplist // Skipped sequences still pending on the DCP caching feed
lock sync.RWMutex // Coordinates access to struct fields
lock sync.Mutex // Coordinates access to struct fields
options CacheOptions // Cache config
terminator chan bool // Signal termination of background goroutines
backgroundTasks []BackgroundTask // List of background tasks.
Expand All @@ -93,8 +93,8 @@ type changeCacheStats struct {

func (c *changeCache) updateStats(ctx context.Context) {

c.lock.RLock()
defer c.lock.RUnlock()
c.lock.Lock()
defer c.lock.Unlock()
if c.db == nil {
return
}
Expand Down Expand Up @@ -990,9 +990,9 @@ func (c *changeCache) _setInitialSequence(initialSequence uint64) {

// Concurrent-safe get value of nextSequence
func (c *changeCache) getNextSequence() (nextSequence uint64) {
c.lock.RLock()
c.lock.Lock()
nextSequence = c.nextSequence
c.lock.RUnlock()
c.lock.Unlock()
return nextSequence
}

Expand Down
5 changes: 4 additions & 1 deletion db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ func (db *DatabaseContext) PushSkipped(ctx context.Context, seq uint64) {
}

func (db *DatabaseContext) RemoveSkipped(seq uint64) {
db.changeCache.RemoveSkipped(seq)
err := db.changeCache.RemoveSkipped(seq)
if err != nil {
base.WarnfCtx(context.TODO(), "Error removing skipped sequence %d: %v", seq, err)
}
}

type Scope struct {
Expand Down