diff --git a/base/sequence_clock.go b/base/sequence_clock.go
index 59fc76257b..f6f1a4c62d 100644
--- a/base/sequence_clock.go
+++ b/base/sequence_clock.go
@@ -35,7 +35,7 @@ type SequenceClock interface {
GetHashedValue() string // Returns previously hashed value, if present. If not present, does NOT generate hash
SetHashedValue(value string) // Returns previously hashed value, if present. If not present, does NOT generate hash
Equals(otherClock SequenceClock) bool // Evaluates whether two clocks are identical
- IsEmptyClock () bool // Evaluates if this an empty clock
+ IsEmptyClock() bool // Evaluates if this an empty clock
AllAfter(otherClock SequenceClock) bool // True if all entries in clock are greater than or equal to the corresponding values in otherClock
AllBefore(otherClock SequenceClock) bool // True if all entries in clock are less than or equal to the corresponding values in otherClock
AnyAfter(otherClock SequenceClock) bool // True if any entries in clock are greater than the corresponding values in otherClock
@@ -327,9 +327,10 @@ func (c *SyncSequenceClock) SetMaxSequence(vbNo uint16, vbSequence uint64) {
defer c.lock.Unlock()
c.Clock.SetMaxSequence(vbNo, vbSequence)
}
+
func (c *SyncSequenceClock) GetSequence(vbNo uint16) (sequence uint64) {
- c.lock.Lock()
- defer c.lock.Unlock()
+ c.lock.RLock()
+ defer c.lock.RUnlock()
return c.Clock.GetSequence(vbNo)
}
diff --git a/base/util_test.go b/base/util_test.go
index 0dc6146c70..0edaad9816 100644
--- a/base/util_test.go
+++ b/base/util_test.go
@@ -159,9 +159,9 @@ func TestValueToStringArray(t *testing.T) {
result := ValueToStringArray("foobar")
assert.DeepEquals(t, result, []string{"foobar"})
- result = ValueToStringArray([]string{"foobar","moocar"})
- assert.DeepEquals(t, result, []string{"foobar","moocar"})
+ result = ValueToStringArray([]string{"foobar", "moocar"})
+ assert.DeepEquals(t, result, []string{"foobar", "moocar"})
- result = ValueToStringArray([]interface{}{"foobar",1,true})
+ result = ValueToStringArray([]interface{}{"foobar", 1, true})
assert.DeepEquals(t, result, []string{"foobar"})
}
diff --git a/db/kv_channel_index.go b/db/kv_channel_index.go
index 543b789ae9..d1250ce1e1 100644
--- a/db/kv_channel_index.go
+++ b/db/kv_channel_index.go
@@ -44,6 +44,7 @@ type KvChannelIndex struct {
onChange func(base.Set) // Notification callback
clock *base.SequenceClockImpl // Channel clock
channelStorage ChannelStorageReader // Channel storage - manages interaction with the index format
+ partitions *base.IndexPartitions // Partition map
}
func NewKvChannelIndex(channelName string, bucket base.Bucket, partitions *base.IndexPartitions, onChangeCallback func(base.Set)) *KvChannelIndex {
@@ -53,6 +54,7 @@ func NewKvChannelIndex(channelName string, bucket base.Bucket, partitions *base.
indexBucket: bucket,
onChange: onChangeCallback,
channelStorage: NewDenseStorageReader(bucket, channelName, partitions),
+ partitions: partitions,
}
// Initialize and load channel clock
@@ -92,13 +94,9 @@ func (k *KvChannelIndex) pollForChanges(stableClock base.SequenceClock, newChann
k.lastPolledValidTo = k.clock.Copy()
}
- // Ensure we haven't read a channel clock that's later than the stable sequence (since writers persist channel clocks
- // before the stable sequence). If so, set the channel clock as the minimum of (channel clock, stable clock) per vbucket
- if newChannelClock.AnyAfter(stableClock) {
- newChannelClock = base.GetMinimumClock(newChannelClock, stableClock)
- }
+ isChanged, changedPartitions := k.calculateChanges(k.lastPolledChannelClock, stableClock, newChannelClock)
- if !newChannelClock.AnyAfter(k.lastPolledChannelClock) {
+ if !isChanged {
// No changes to channel clock - update validTo based on the new stable sequence
k.lastPolledValidTo.SetTo(stableClock)
// If we've exceeded empty poll count, return hasChanges=true to trigger the "is
@@ -111,7 +109,7 @@ func (k *KvChannelIndex) pollForChanges(stableClock base.SequenceClock, newChann
}
// The clock has changed - load the changes and store in last polled
- if err := k.updateLastPolled(stableClock, newChannelClock); err != nil {
+ if err := k.updateLastPolled(stableClock, newChannelClock, changedPartitions); err != nil {
base.Warn("Error updating last polled for channel %s: %v", k.channelName, err)
return false, false
}
@@ -123,7 +121,41 @@ func (k *KvChannelIndex) pollForChanges(stableClock base.SequenceClock, newChann
return true, false
}
-func (k *KvChannelIndex) updateLastPolled(stableSequence base.SequenceClock, newChannelClock base.SequenceClock) error {
+func (k *KvChannelIndex) calculateChanges(lastPolledClock base.SequenceClock, stableClock base.SequenceClock, newChannelClock base.SequenceClock) (isChanged bool, changedPartitions []*PartitionRange) {
+
+ // One iteration through the new channel clock, to:
+ // 1. Check whether it's later than the previous channel clock
+ // 2. If it's later than the stable clock, roll back to the stable clock
+ // 3. If it still represents a change, add to set of changed vbs and partition ranges
+ changedPartitions = make([]*PartitionRange, k.partitions.PartitionCount())
+ for vbNoInt, newChannelClockSeq := range newChannelClock.Value() {
+ vbNo := uint16(vbNoInt)
+ lastPolledClockSeq := lastPolledClock.GetSequence(vbNo)
+ if lastPolledClockSeq < newChannelClockSeq {
+ // Handle rollback to stable sequence if needed
+ stableSeq := stableClock.GetSequence(vbNo)
+ if stableSeq < newChannelClockSeq {
+ newChannelClock.SetSequence(vbNo, stableSeq)
+ // We've rolled back the new channel clock to the stable sequence. Ensure that we still have a change for this vb.
+ if !(lastPolledClockSeq < stableSeq) {
+ continue
+ }
+ newChannelClockSeq = stableSeq
+ }
+ isChanged = true
+ partitionNo := k.partitions.PartitionForVb(vbNo)
+ if changedPartitions[partitionNo] == nil {
+ partitionRange := NewPartitionRange()
+ changedPartitions[partitionNo] = &partitionRange
+ }
+ changedPartitions[partitionNo].SetRange(vbNo, lastPolledClockSeq, newChannelClockSeq)
+ }
+ }
+ return isChanged, changedPartitions
+
+}
+
+func (k *KvChannelIndex) updateLastPolled(stableSequence base.SequenceClock, newChannelClock base.SequenceClock, changedPartitions []*PartitionRange) error {
timingFrom := uint64(0)
if base.TimingExpvarsEnabled {
@@ -131,7 +163,7 @@ func (k *KvChannelIndex) updateLastPolled(stableSequence base.SequenceClock, new
}
// Update the storage cache, if present
- err := k.channelStorage.UpdateCache(k.lastPolledChannelClock, newChannelClock)
+ err := k.channelStorage.UpdateCache(k.lastPolledChannelClock, newChannelClock, changedPartitions)
if err != nil {
return err
}
diff --git a/db/kv_channel_storage.go b/db/kv_channel_storage.go
index 04968f921d..540194cc7d 100644
--- a/db/kv_channel_storage.go
+++ b/db/kv_channel_storage.go
@@ -35,7 +35,7 @@ const (
type ChannelStorageReader interface {
// GetAllEntries returns all entries for the channel in the specified range, for all vbuckets
GetChanges(fromSeq base.SequenceClock, channelClock base.SequenceClock, limit int) ([]*LogEntry, error)
- UpdateCache(fromSeq base.SequenceClock, channelClock base.SequenceClock) error
+ UpdateCache(fromSeq base.SequenceClock, channelClock base.SequenceClock, changedPartitions []*PartitionRange) error
}
type ChannelStorageWriter interface {
@@ -323,7 +323,7 @@ func (b *BitFlagStorage) GetChanges(fromSeq base.SequenceClock, toSeq base.Seque
}
-func (b *BitFlagStorage) UpdateCache(sinceClock base.SequenceClock, toClock base.SequenceClock) error {
+func (b *BitFlagStorage) UpdateCache(sinceClock base.SequenceClock, toClock base.SequenceClock, changedPartitions []*PartitionRange) error {
// no-op, not a caching reader
return nil
}
diff --git a/db/kv_dense_channel_storage.go b/db/kv_dense_channel_storage.go
index ea3a04ab43..06130a021a 100644
--- a/db/kv_dense_channel_storage.go
+++ b/db/kv_dense_channel_storage.go
@@ -75,30 +75,32 @@ func (clock PartitionClock) String() string {
return result
}
+type SequenceRange struct {
+ since uint64
+ to uint64
+}
+
// PartitionRange is a pair of clocks defining a range of sequences with a partition.
// Defines helper functions for range comparison
type PartitionRange struct {
- Since PartitionClock
- To PartitionClock
+ seqRanges map[uint16]SequenceRange
}
func NewPartitionRange() PartitionRange {
return PartitionRange{
- Since: make(PartitionClock),
- To: make(PartitionClock),
+ seqRanges: make(map[uint16]SequenceRange),
}
}
func (p PartitionRange) SetRange(vbNo uint16, sinceSeq, toSeq uint64) {
- p.Since[vbNo] = sinceSeq
- p.To[vbNo] = toSeq
+ p.seqRanges[vbNo] = SequenceRange{sinceSeq, toSeq}
}
// StartsBefore returns true if any non-nil since sequences in the partition range
// are earlier than the partition clock
func (p PartitionRange) SinceBefore(clock PartitionClock) bool {
- for vbNo, sinceSeq := range p.Since {
- if sinceSeq < clock.GetSequence(vbNo) {
+ for vbNo, seqRange := range p.seqRanges {
+ if seqRange.since < clock.GetSequence(vbNo) {
return true
}
}
@@ -108,14 +110,18 @@ func (p PartitionRange) SinceBefore(clock PartitionClock) bool {
// StartsAfter returns true if all since sequences in the partition range are
// equal to or later than the partition clock
func (p PartitionRange) SinceAfter(clock PartitionClock) bool {
- for vbNo, sinceSeq := range p.Since {
- if sinceSeq < clock.GetSequence(vbNo) {
+ for vbNo, seqRange := range p.seqRanges {
+ if seqRange.since < clock.GetSequence(vbNo) {
return false
}
}
return true
}
+func (p PartitionRange) GetSequenceRange(vbNo uint16) SequenceRange {
+ return p.seqRanges[vbNo]
+}
+
// PartitionRange.Compare Outcomes:
// Within, Before, After are returned if the sequence is within/before/after the range
// Unknown is returned if the range doesn't include since/to values for the vbno
@@ -130,23 +136,17 @@ const (
// Identifies where the specified vbNo, sequence is relative to the partition range
func (p PartitionRange) Compare(vbNo uint16, sequence uint64) PartitionRangeCompare {
- var sinceSeq, toSeq uint64
- var ok bool
- sinceSeq, ok = p.Since[vbNo]
- if !ok {
- return PartitionRangeUnknown
- }
- toSeq, ok = p.To[vbNo]
+ seqRange, ok := p.seqRanges[vbNo]
if !ok {
return PartitionRangeUnknown
}
- if sequence <= sinceSeq {
+ if sequence <= seqRange.since {
return PartitionRangeBefore
}
- if sequence > toSeq {
+ if sequence > seqRange.to {
return PartitionRangeAfter
}
diff --git a/db/kv_dense_channel_storage_reader.go b/db/kv_dense_channel_storage_reader.go
index a8e8a392a9..ba48c47b09 100644
--- a/db/kv_dense_channel_storage_reader.go
+++ b/db/kv_dense_channel_storage_reader.go
@@ -40,15 +40,11 @@ func NewDenseStorageReader(bucket base.Bucket, channelName string, partitions *b
// Number of blocks to store in channel cache, per partition
const kCachedBlocksPerShard = 2
-func (ds *DenseStorageReader) UpdateCache(sinceClock base.SequenceClock, toClock base.SequenceClock) error {
+func (ds *DenseStorageReader) UpdateCache(sinceClock base.SequenceClock, toClock base.SequenceClock, changedPartitions []*PartitionRange) error {
- // Identify what's changed:
- // changedVbuckets: ordered list of vbuckets that have changes, based on the clock comparison
- // partitionRanges: array indexed by partitionNo of PartitionRange for each partition that's changed
- _, partitionRanges := ds.calculateChanged(sinceClock, toClock)
var wg sync.WaitGroup
- errCh := make(chan error, len(partitionRanges))
- for partitionNo, partitionRange := range partitionRanges {
+ errCh := make(chan error, len(changedPartitions))
+ for partitionNo, partitionRange := range changedPartitions {
if partitionRange == nil {
continue
}
@@ -63,7 +59,7 @@ func (ds *DenseStorageReader) UpdateCache(sinceClock base.SequenceClock, toClock
err := reader.UpdateCache(kCachedBlocksPerShard)
if err != nil {
base.Warn("Unable to update cache for channel:[%s] partition:[%d] : %v", ds.channelName, partitionNo, err)
- errCh <-err
+ errCh <- err
}
}(uint16(partitionNo), partitionRange)
}
@@ -86,10 +82,10 @@ func (ds *DenseStorageReader) GetChanges(sinceClock base.SequenceClock, toClock
// Identify what's changed:
// changedVbuckets: ordered list of vbuckets that have changes, based on the clock comparison
- // partitionRanges: array of PartitionRange, indexed by partitionNo
+ // partitionRanges: array of PartitionRange, indexed by partitionNo
changedVbuckets, partitionRanges := ds.calculateChanged(sinceClock, toClock)
-
+
// changed partitions is a cache of changes for a partition, for reuse by multiple vbs
changedPartitions := make(map[uint16]*PartitionChanges, len(partitionRanges))
@@ -102,7 +98,7 @@ func (ds *DenseStorageReader) GetChanges(sinceClock base.SequenceClock, toClock
return changes, err
}
changedPartitions[partitionNo] = partitionChanges
- }
+ }
changes = append(changes, partitionChanges.GetVbChanges(vbNo)...)
if limit > 0 && len(changes) > limit {
break
@@ -227,7 +223,6 @@ func (r *DensePartitionStorageReaderNonCaching) GetChanges(partitionRange Partit
changes := NewPartitionChanges()
-
// Initialize the block list to the starting range, then find the starting block for the partition range
blockList := r.GetBlockListForRange(partitionRange)
if blockList == nil {
@@ -453,8 +448,8 @@ func (pr *DensePartitionStorageReader) getCachedChanges(partitionRange Partition
}
switch compare := partitionRange.Compare(blockEntry.getVbNo(), blockEntry.getSequence()); compare {
case PartitionRangeAfter:
- // Possible when processing the most recent block in the range, when block is ahead of stable seq
- case PartitionRangeWithin:
+ // Possible when processing the most recent block in the range, when block is ahead of stable seq
+ case PartitionRangeWithin:
// Deduplication check
docIdString := string(blockEntry.getDocId())
if keySet[docIdString] {
@@ -504,12 +499,10 @@ func (pr *DensePartitionStorageReader) getIndexedChanges(partitionRange Partitio
changes = NewPartitionChanges()
keySet := make(map[string]bool, 0)
for i := len(blockList.blocks) - 1; i >= 0; i-- {
-
blockListEntry := blockList.blocks[i]
blockKey := blockListEntry.Key(blockList)
currBlock, err := pr.loadBlock(blockKey, blockListEntry.StartClock)
-
if err != nil {
base.Warn("Unexpected missing block from index. blockKey:[%s] err:[%v]",
blockKey, err)
@@ -583,4 +576,3 @@ func (pr *DensePartitionStorageReader) loadBlock(key string, startClock Partitio
IndexExpvars.Add("indexReader.blocksLoaded", 1)
return block, nil
}
-
diff --git a/db/kv_dense_channel_storage_test.go b/db/kv_dense_channel_storage_test.go
index 67e3f44c61..b58e9f5949 100644
--- a/db/kv_dense_channel_storage_test.go
+++ b/db/kv_dense_channel_storage_test.go
@@ -573,12 +573,12 @@ func TestCalculateChangedPartitions(t *testing.T) {
assertTrue(t, partition == 0 || partition == 6 || partition == 12, "Unexpected changed partition")
}
}
- assert.Equals(t, changedPartitions[0].Since.GetSequence(0), uint64(0))
- assert.Equals(t, changedPartitions[6].Since.GetSequence(100), uint64(0))
- assert.Equals(t, changedPartitions[12].Since.GetSequence(200), uint64(0))
- assert.Equals(t, changedPartitions[0].To.GetSequence(0), uint64(5))
- assert.Equals(t, changedPartitions[6].To.GetSequence(100), uint64(10))
- assert.Equals(t, changedPartitions[12].To.GetSequence(200), uint64(15))
+ assert.Equals(t, changedPartitions[0].GetSequenceRange(0).since, uint64(0))
+ assert.Equals(t, changedPartitions[6].GetSequenceRange(100).since, uint64(0))
+ assert.Equals(t, changedPartitions[12].GetSequenceRange(200).since, uint64(0))
+ assert.Equals(t, changedPartitions[0].GetSequenceRange(0).to, uint64(5))
+ assert.Equals(t, changedPartitions[6].GetSequenceRange(100).to, uint64(10))
+ assert.Equals(t, changedPartitions[12].GetSequenceRange(200).to, uint64(15))
assert.Equals(t, changedPartitionCount, 3)
}
diff --git a/manifest/default.xml b/manifest/default.xml
index 1e639617d2..0b31df77e3 100644
--- a/manifest/default.xml
+++ b/manifest/default.xml
@@ -31,7 +31,7 @@
-
+