Skip to content

Commit

Permalink
Performance improvements
Browse files Browse the repository at this point in the history
 - Refactors PartitionRange to use a single map instead of two
 - Switches Lock to RLock on SyncSequenceClock.GetSequence
 - Refactors pollForChanges to reduce clock iterations involving stable sequence, last polled clock, channel clock
  • Loading branch information
adamcfraser committed Jan 18, 2017
1 parent c74f93d commit adedf1a
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 60 deletions.
7 changes: 4 additions & 3 deletions base/sequence_clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type SequenceClock interface {
GetHashedValue() string // Returns previously hashed value, if present. If not present, does NOT generate hash
SetHashedValue(value string) // Returns previously hashed value, if present. If not present, does NOT generate hash
Equals(otherClock SequenceClock) bool // Evaluates whether two clocks are identical
IsEmptyClock () bool // Evaluates if this an empty clock
IsEmptyClock() bool // Evaluates if this an empty clock
AllAfter(otherClock SequenceClock) bool // True if all entries in clock are greater than or equal to the corresponding values in otherClock
AllBefore(otherClock SequenceClock) bool // True if all entries in clock are less than or equal to the corresponding values in otherClock
AnyAfter(otherClock SequenceClock) bool // True if any entries in clock are greater than the corresponding values in otherClock
Expand Down Expand Up @@ -327,9 +327,10 @@ func (c *SyncSequenceClock) SetMaxSequence(vbNo uint16, vbSequence uint64) {
defer c.lock.Unlock()
c.Clock.SetMaxSequence(vbNo, vbSequence)
}

func (c *SyncSequenceClock) GetSequence(vbNo uint16) (sequence uint64) {
c.lock.Lock()
defer c.lock.Unlock()
c.lock.RLock()
defer c.lock.RUnlock()
return c.Clock.GetSequence(vbNo)
}

Expand Down
6 changes: 3 additions & 3 deletions base/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ func TestValueToStringArray(t *testing.T) {
result := ValueToStringArray("foobar")
assert.DeepEquals(t, result, []string{"foobar"})

result = ValueToStringArray([]string{"foobar","moocar"})
assert.DeepEquals(t, result, []string{"foobar","moocar"})
result = ValueToStringArray([]string{"foobar", "moocar"})
assert.DeepEquals(t, result, []string{"foobar", "moocar"})

result = ValueToStringArray([]interface{}{"foobar",1,true})
result = ValueToStringArray([]interface{}{"foobar", 1, true})
assert.DeepEquals(t, result, []string{"foobar"})
}
50 changes: 41 additions & 9 deletions db/kv_channel_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type KvChannelIndex struct {
onChange func(base.Set) // Notification callback
clock *base.SequenceClockImpl // Channel clock
channelStorage ChannelStorageReader // Channel storage - manages interaction with the index format
partitions *base.IndexPartitions // Partition map
}

func NewKvChannelIndex(channelName string, bucket base.Bucket, partitions *base.IndexPartitions, onChangeCallback func(base.Set)) *KvChannelIndex {
Expand All @@ -53,6 +54,7 @@ func NewKvChannelIndex(channelName string, bucket base.Bucket, partitions *base.
indexBucket: bucket,
onChange: onChangeCallback,
channelStorage: NewDenseStorageReader(bucket, channelName, partitions),
partitions: partitions,
}

// Initialize and load channel clock
Expand Down Expand Up @@ -92,13 +94,9 @@ func (k *KvChannelIndex) pollForChanges(stableClock base.SequenceClock, newChann
k.lastPolledValidTo = k.clock.Copy()
}

// Ensure we haven't read a channel clock that's later than the stable sequence (since writers persist channel clocks
// before the stable sequence). If so, set the channel clock as the minimum of (channel clock, stable clock) per vbucket
if newChannelClock.AnyAfter(stableClock) {
newChannelClock = base.GetMinimumClock(newChannelClock, stableClock)
}
isChanged, changedPartitions := k.calculateChanges(k.lastPolledChannelClock, stableClock, newChannelClock)

if !newChannelClock.AnyAfter(k.lastPolledChannelClock) {
if !isChanged {
// No changes to channel clock - update validTo based on the new stable sequence
k.lastPolledValidTo.SetTo(stableClock)
// If we've exceeded empty poll count, return hasChanges=true to trigger the "is
Expand All @@ -111,7 +109,7 @@ func (k *KvChannelIndex) pollForChanges(stableClock base.SequenceClock, newChann
}

// The clock has changed - load the changes and store in last polled
if err := k.updateLastPolled(stableClock, newChannelClock); err != nil {
if err := k.updateLastPolled(stableClock, newChannelClock, changedPartitions); err != nil {
base.Warn("Error updating last polled for channel %s: %v", k.channelName, err)
return false, false
}
Expand All @@ -123,15 +121,49 @@ func (k *KvChannelIndex) pollForChanges(stableClock base.SequenceClock, newChann
return true, false
}

func (k *KvChannelIndex) updateLastPolled(stableSequence base.SequenceClock, newChannelClock base.SequenceClock) error {
func (k *KvChannelIndex) calculateChanges(lastPolledClock base.SequenceClock, stableClock base.SequenceClock, newChannelClock base.SequenceClock) (isChanged bool, changedPartitions []*PartitionRange) {

// One iteration through the new channel clock, to:
// 1. Check whether it's later than the previous channel clock
// 2. If it's later than the stable clock, roll back to the stable clock
// 3. If it still represents a change, add to set of changed vbs and partition ranges
changedPartitions = make([]*PartitionRange, k.partitions.PartitionCount())
for vbNoInt, newChannelClockSeq := range newChannelClock.Value() {
vbNo := uint16(vbNoInt)
lastPolledClockSeq := lastPolledClock.GetSequence(vbNo)
if lastPolledClockSeq < newChannelClockSeq {
// Handle rollback to stable sequence if needed
stableSeq := stableClock.GetSequence(vbNo)
if stableSeq < newChannelClockSeq {
newChannelClock.SetSequence(vbNo, stableSeq)
// We've rolled back the new channel clock to the stable sequence. Ensure that we still have a change for this vb.
if !(lastPolledClockSeq < stableSeq) {
continue
}
newChannelClockSeq = stableSeq
}
isChanged = true
partitionNo := k.partitions.PartitionForVb(vbNo)
if changedPartitions[partitionNo] == nil {
partitionRange := NewPartitionRange()
changedPartitions[partitionNo] = &partitionRange
}
changedPartitions[partitionNo].SetRange(vbNo, lastPolledClockSeq, newChannelClockSeq)
}
}
return isChanged, changedPartitions

}

func (k *KvChannelIndex) updateLastPolled(stableSequence base.SequenceClock, newChannelClock base.SequenceClock, changedPartitions []*PartitionRange) error {

timingFrom := uint64(0)
if base.TimingExpvarsEnabled {
timingFrom = k.lastPolledChannelClock.GetSequence(base.KTimingExpvarVbNo)
}

// Update the storage cache, if present
err := k.channelStorage.UpdateCache(k.lastPolledChannelClock, newChannelClock)
err := k.channelStorage.UpdateCache(k.lastPolledChannelClock, newChannelClock, changedPartitions)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions db/kv_channel_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
type ChannelStorageReader interface {
// GetAllEntries returns all entries for the channel in the specified range, for all vbuckets
GetChanges(fromSeq base.SequenceClock, channelClock base.SequenceClock, limit int) ([]*LogEntry, error)
UpdateCache(fromSeq base.SequenceClock, channelClock base.SequenceClock) error
UpdateCache(fromSeq base.SequenceClock, channelClock base.SequenceClock, changedPartitions []*PartitionRange) error
}

type ChannelStorageWriter interface {
Expand Down Expand Up @@ -323,7 +323,7 @@ func (b *BitFlagStorage) GetChanges(fromSeq base.SequenceClock, toSeq base.Seque

}

func (b *BitFlagStorage) UpdateCache(sinceClock base.SequenceClock, toClock base.SequenceClock) error {
func (b *BitFlagStorage) UpdateCache(sinceClock base.SequenceClock, toClock base.SequenceClock, changedPartitions []*PartitionRange) error {
// no-op, not a caching reader
return nil
}
Expand Down
38 changes: 19 additions & 19 deletions db/kv_dense_channel_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,32 @@ func (clock PartitionClock) String() string {
return result
}

type SequenceRange struct {
since uint64
to uint64
}

// PartitionRange is a pair of clocks defining a range of sequences with a partition.
// Defines helper functions for range comparison
type PartitionRange struct {
Since PartitionClock
To PartitionClock
seqRanges map[uint16]SequenceRange
}

func NewPartitionRange() PartitionRange {
return PartitionRange{
Since: make(PartitionClock),
To: make(PartitionClock),
seqRanges: make(map[uint16]SequenceRange),
}
}

func (p PartitionRange) SetRange(vbNo uint16, sinceSeq, toSeq uint64) {
p.Since[vbNo] = sinceSeq
p.To[vbNo] = toSeq
p.seqRanges[vbNo] = SequenceRange{sinceSeq, toSeq}
}

// StartsBefore returns true if any non-nil since sequences in the partition range
// are earlier than the partition clock
func (p PartitionRange) SinceBefore(clock PartitionClock) bool {
for vbNo, sinceSeq := range p.Since {
if sinceSeq < clock.GetSequence(vbNo) {
for vbNo, seqRange := range p.seqRanges {
if seqRange.since < clock.GetSequence(vbNo) {
return true
}
}
Expand All @@ -108,14 +110,18 @@ func (p PartitionRange) SinceBefore(clock PartitionClock) bool {
// StartsAfter returns true if all since sequences in the partition range are
// equal to or later than the partition clock
func (p PartitionRange) SinceAfter(clock PartitionClock) bool {
for vbNo, sinceSeq := range p.Since {
if sinceSeq < clock.GetSequence(vbNo) {
for vbNo, seqRange := range p.seqRanges {
if seqRange.since < clock.GetSequence(vbNo) {
return false
}
}
return true
}

func (p PartitionRange) GetSequenceRange(vbNo uint16) SequenceRange {
return p.seqRanges[vbNo]
}

// PartitionRange.Compare Outcomes:
// Within, Before, After are returned if the sequence is within/before/after the range
// Unknown is returned if the range doesn't include since/to values for the vbno
Expand All @@ -130,23 +136,17 @@ const (

// Identifies where the specified vbNo, sequence is relative to the partition range
func (p PartitionRange) Compare(vbNo uint16, sequence uint64) PartitionRangeCompare {
var sinceSeq, toSeq uint64
var ok bool

sinceSeq, ok = p.Since[vbNo]
if !ok {
return PartitionRangeUnknown
}
toSeq, ok = p.To[vbNo]
seqRange, ok := p.seqRanges[vbNo]
if !ok {
return PartitionRangeUnknown
}

if sequence <= sinceSeq {
if sequence <= seqRange.since {
return PartitionRangeBefore
}

if sequence > toSeq {
if sequence > seqRange.to {
return PartitionRangeAfter
}

Expand Down
26 changes: 9 additions & 17 deletions db/kv_dense_channel_storage_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,11 @@ func NewDenseStorageReader(bucket base.Bucket, channelName string, partitions *b
// Number of blocks to store in channel cache, per partition
const kCachedBlocksPerShard = 2

func (ds *DenseStorageReader) UpdateCache(sinceClock base.SequenceClock, toClock base.SequenceClock) error {
func (ds *DenseStorageReader) UpdateCache(sinceClock base.SequenceClock, toClock base.SequenceClock, changedPartitions []*PartitionRange) error {

// Identify what's changed:
// changedVbuckets: ordered list of vbuckets that have changes, based on the clock comparison
// partitionRanges: array indexed by partitionNo of PartitionRange for each partition that's changed
_, partitionRanges := ds.calculateChanged(sinceClock, toClock)
var wg sync.WaitGroup
errCh := make(chan error, len(partitionRanges))
for partitionNo, partitionRange := range partitionRanges {
errCh := make(chan error, len(changedPartitions))
for partitionNo, partitionRange := range changedPartitions {
if partitionRange == nil {
continue
}
Expand All @@ -63,7 +59,7 @@ func (ds *DenseStorageReader) UpdateCache(sinceClock base.SequenceClock, toClock
err := reader.UpdateCache(kCachedBlocksPerShard)
if err != nil {
base.Warn("Unable to update cache for channel:[%s] partition:[%d] : %v", ds.channelName, partitionNo, err)
errCh <-err
errCh <- err
}
}(uint16(partitionNo), partitionRange)
}
Expand All @@ -86,10 +82,10 @@ func (ds *DenseStorageReader) GetChanges(sinceClock base.SequenceClock, toClock

// Identify what's changed:
// changedVbuckets: ordered list of vbuckets that have changes, based on the clock comparison
// partitionRanges: array of PartitionRange, indexed by partitionNo
// partitionRanges: array of PartitionRange, indexed by partitionNo

changedVbuckets, partitionRanges := ds.calculateChanged(sinceClock, toClock)

// changed partitions is a cache of changes for a partition, for reuse by multiple vbs
changedPartitions := make(map[uint16]*PartitionChanges, len(partitionRanges))

Expand All @@ -102,7 +98,7 @@ func (ds *DenseStorageReader) GetChanges(sinceClock base.SequenceClock, toClock
return changes, err
}
changedPartitions[partitionNo] = partitionChanges
}
}
changes = append(changes, partitionChanges.GetVbChanges(vbNo)...)
if limit > 0 && len(changes) > limit {
break
Expand Down Expand Up @@ -227,7 +223,6 @@ func (r *DensePartitionStorageReaderNonCaching) GetChanges(partitionRange Partit

changes := NewPartitionChanges()


// Initialize the block list to the starting range, then find the starting block for the partition range
blockList := r.GetBlockListForRange(partitionRange)
if blockList == nil {
Expand Down Expand Up @@ -453,8 +448,8 @@ func (pr *DensePartitionStorageReader) getCachedChanges(partitionRange Partition
}
switch compare := partitionRange.Compare(blockEntry.getVbNo(), blockEntry.getSequence()); compare {
case PartitionRangeAfter:
// Possible when processing the most recent block in the range, when block is ahead of stable seq
case PartitionRangeWithin:
// Possible when processing the most recent block in the range, when block is ahead of stable seq
case PartitionRangeWithin:
// Deduplication check
docIdString := string(blockEntry.getDocId())
if keySet[docIdString] {
Expand Down Expand Up @@ -504,12 +499,10 @@ func (pr *DensePartitionStorageReader) getIndexedChanges(partitionRange Partitio
changes = NewPartitionChanges()
keySet := make(map[string]bool, 0)
for i := len(blockList.blocks) - 1; i >= 0; i-- {

blockListEntry := blockList.blocks[i]
blockKey := blockListEntry.Key(blockList)

currBlock, err := pr.loadBlock(blockKey, blockListEntry.StartClock)

if err != nil {
base.Warn("Unexpected missing block from index. blockKey:[%s] err:[%v]",
blockKey, err)
Expand Down Expand Up @@ -583,4 +576,3 @@ func (pr *DensePartitionStorageReader) loadBlock(key string, startClock Partitio
IndexExpvars.Add("indexReader.blocksLoaded", 1)
return block, nil
}

12 changes: 6 additions & 6 deletions db/kv_dense_channel_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,12 +573,12 @@ func TestCalculateChangedPartitions(t *testing.T) {
assertTrue(t, partition == 0 || partition == 6 || partition == 12, "Unexpected changed partition")
}
}
assert.Equals(t, changedPartitions[0].Since.GetSequence(0), uint64(0))
assert.Equals(t, changedPartitions[6].Since.GetSequence(100), uint64(0))
assert.Equals(t, changedPartitions[12].Since.GetSequence(200), uint64(0))
assert.Equals(t, changedPartitions[0].To.GetSequence(0), uint64(5))
assert.Equals(t, changedPartitions[6].To.GetSequence(100), uint64(10))
assert.Equals(t, changedPartitions[12].To.GetSequence(200), uint64(15))
assert.Equals(t, changedPartitions[0].GetSequenceRange(0).since, uint64(0))
assert.Equals(t, changedPartitions[6].GetSequenceRange(100).since, uint64(0))
assert.Equals(t, changedPartitions[12].GetSequenceRange(200).since, uint64(0))
assert.Equals(t, changedPartitions[0].GetSequenceRange(0).to, uint64(5))
assert.Equals(t, changedPartitions[6].GetSequenceRange(100).to, uint64(10))
assert.Equals(t, changedPartitions[12].GetSequenceRange(200).to, uint64(15))
assert.Equals(t, changedPartitionCount, 3)

}
Expand Down
2 changes: 1 addition & 1 deletion manifest/default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


<!-- Sync Gateway Accel-->
<project groups="notdefault,sg-accel" name="sync-gateway-accel" path="godeps/src/github.com/couchbaselabs/sync-gateway-accel" remote="couchbaselabs_private" revision="9ca9f8aaf101668372b224db175e5e65cbb9f6bd"/>
<project groups="notdefault,sg-accel" name="sync-gateway-accel" path="godeps/src/github.com/couchbaselabs/sync-gateway-accel" remote="couchbaselabs_private" revision="64abf2a5e335150ed129a18f3ceac4cbf31b9f4f"/>


<!-- Dependencies specific to Sync Gateway Accel-->
Expand Down

0 comments on commit adedf1a

Please sign in to comment.