Skip to content

Commit

Permalink
fix: observer snapshot rw race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Mar 9, 2023
1 parent e50bcd4 commit 2051619
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 21 deletions.
26 changes: 22 additions & 4 deletions dcp/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ type observer struct {
currentSnapshots map[uint16]*models.SnapshotMarker
currentSnapshotsLock sync.Mutex

uuIDs map[uint16]gocbcore.VbUUID
uuIDsLock sync.Mutex
uuIDs map[uint16]gocbcore.VbUUID

metrics map[uint16]ObserverMetric
metricsLock sync.Mutex
Expand Down Expand Up @@ -73,17 +72,23 @@ func (so *observer) sendOrSkip(args models.ListenerArgs) {
}

func (so *observer) SnapshotMarker(marker models.DcpSnapshotMarker) {
so.currentSnapshotsLock.Lock()

so.currentSnapshots[marker.VbID] = &models.SnapshotMarker{
StartSeqNo: marker.StartSeqNo,
EndSeqNo: marker.EndSeqNo,
}

so.currentSnapshotsLock.Unlock()

so.sendOrSkip(models.ListenerArgs{
Event: marker,
})
}

func (so *observer) Mutation(mutation gocbcore.DcpMutation) {
so.currentSnapshotsLock.Lock()

if currentSnapshot, ok := so.currentSnapshots[mutation.VbID]; ok && currentSnapshot != nil {
so.sendOrSkip(models.ListenerArgs{
Event: models.InternalDcpMutation{
Expand All @@ -98,6 +103,8 @@ func (so *observer) Mutation(mutation gocbcore.DcpMutation) {
})
}

so.currentSnapshotsLock.Unlock()

so.metricsLock.Lock()
defer so.metricsLock.Unlock()

Expand All @@ -111,6 +118,8 @@ func (so *observer) Mutation(mutation gocbcore.DcpMutation) {
}

func (so *observer) Deletion(deletion gocbcore.DcpDeletion) {
so.currentSnapshotsLock.Lock()

if currentSnapshot, ok := so.currentSnapshots[deletion.VbID]; ok && currentSnapshot != nil {
so.sendOrSkip(models.ListenerArgs{
Event: models.InternalDcpDeletion{
Expand All @@ -125,6 +134,8 @@ func (so *observer) Deletion(deletion gocbcore.DcpDeletion) {
})
}

so.currentSnapshotsLock.Unlock()

so.metricsLock.Lock()
defer so.metricsLock.Unlock()

Expand All @@ -138,6 +149,8 @@ func (so *observer) Deletion(deletion gocbcore.DcpDeletion) {
}

func (so *observer) Expiration(expiration gocbcore.DcpExpiration) {
so.currentSnapshotsLock.Lock()

if currentSnapshot, ok := so.currentSnapshots[expiration.VbID]; ok && currentSnapshot != nil {
so.sendOrSkip(models.ListenerArgs{
Event: models.InternalDcpExpiration{
Expand All @@ -152,6 +165,8 @@ func (so *observer) Expiration(expiration gocbcore.DcpExpiration) {
})
}

so.currentSnapshotsLock.Unlock()

so.metricsLock.Lock()
defer so.metricsLock.Unlock()

Expand Down Expand Up @@ -211,11 +226,15 @@ func (so *observer) OSOSnapshot(snapshot models.DcpOSOSnapshot) {
}

func (so *observer) SeqNoAdvanced(advanced models.DcpSeqNoAdvanced) {
so.currentSnapshotsLock.Lock()

so.currentSnapshots[advanced.VbID] = &models.SnapshotMarker{
StartSeqNo: advanced.SeqNo,
EndSeqNo: advanced.SeqNo,
}

so.currentSnapshotsLock.Unlock()

so.sendOrSkip(models.ListenerArgs{
Event: advanced,
})
Expand Down Expand Up @@ -267,8 +286,7 @@ func NewObserver(
currentSnapshots: map[uint16]*models.SnapshotMarker{},
currentSnapshotsLock: sync.Mutex{},

uuIDs: uuIDMap,
uuIDsLock: sync.Mutex{},
uuIDs: uuIDMap,

metrics: map[uint16]ObserverMetric{},
metricsLock: sync.Mutex{},
Expand Down
28 changes: 11 additions & 17 deletions dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@ package godcpclient

import (
"context"
"crypto/rand"
"fmt"
"log"
"math"
"math/big"
"os"
"sync"
"testing"
"time"

mathRand "math/rand"

gDcp "github.com/Trendyol/go-dcp-client/dcp"
"github.com/Trendyol/go-dcp-client/models"

Expand Down Expand Up @@ -87,11 +83,10 @@ func setupContainer(b *testing.B, config helpers.Config) func() {
}
}

func insertDataToContainer(b *testing.B, mockDataSize int64, config helpers.Config) {
func insertDataToContainer(b *testing.B, mockDataSize int, config helpers.Config) {
client := gDcp.NewClient(config)

_ = client.Connect()
defer client.Close()

ids := make([]int, mockDataSize)
for i := range ids {
Expand All @@ -101,8 +96,8 @@ func insertDataToContainer(b *testing.B, mockDataSize int64, config helpers.Conf
// 2048 is the default value for the max queue size of the client, so we need to make sure that we don't exceed that
chunks := helpers.ChunkSlice[int](ids, int(math.Ceil(float64(mockDataSize)/float64(2048))))

// Concurrency is limited to 10 to avoid server overload
iterations := helpers.ChunkSlice(chunks, int(math.Ceil(float64(len(chunks))/float64(10))))
// Concurrency is limited to 8 to avoid server overload
iterations := helpers.ChunkSlice(chunks, int(math.Ceil(float64(len(chunks))/float64(8))))

for _, iteration := range iterations {
for _, chunk := range iteration {
Expand Down Expand Up @@ -142,20 +137,19 @@ func insertDataToContainer(b *testing.B, mockDataSize int64, config helpers.Conf

wg.Wait()
}
time.Sleep(720 * time.Millisecond)
time.Sleep(1 * time.Second)
}

client.Close()

time.Sleep(5 * time.Second)

log.Printf("Inserted %v items", mockDataSize)
}

func BenchmarkDcp(benchmark *testing.B) {
b, err := rand.Int(rand.Reader, big.NewInt(720000)) //nolint:gosec
if err != nil {
benchmark.Error(err)
}

mockDataSize := b.Int64() + 240000
saveTarget := mathRand.Intn(int(mockDataSize))
mockDataSize := 320000
saveTarget := mockDataSize / 2

configPath, configFileClean, err := createConfigFile()
if err != nil {
Expand Down Expand Up @@ -185,7 +179,7 @@ func BenchmarkDcp(benchmark *testing.B) {
ctx.Commit()
}

if counter == int(mockDataSize) {
if counter == mockDataSize {
finish <- true
}
}
Expand Down

0 comments on commit 2051619

Please sign in to comment.