From e875ca7e8ce42d5d9cf5bd869b17bda02951b03c Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Mon, 4 Nov 2024 19:38:23 +0200 Subject: [PATCH] Fix row-thread mapping for sequential workloads Before this change scylla-bench was distributing rows among threads correctly only when number of partitions was divisible by number of threads without remainder. A bit worse case was appearing when there was division remainder. And the worst case was when number of threads was bigger than number of partitions. In this case last thread was handling all of the rows and all of other threads were idle. So, fix it by spreading rows evenly among all of the threads as much as possible. Closes: #138 --- main.go | 17 +++++----- pkg/workloads/workloads.go | 32 +++++++++++++----- pkg/workloads/workloads_test.go | 60 ++++++++++++++++++++------------- 3 files changed, 68 insertions(+), 41 deletions(-) diff --git a/main.go b/main.go index 2da58b1..0ec8ae0 100644 --- a/main.go +++ b/main.go @@ -143,15 +143,16 @@ func PrepareDatabase(session *gocql.Session, replicationFactor int) { func GetWorkload(name string, threadId int, partitionOffset int64, mode string, writeRate int64, distribution string) WorkloadGenerator { switch name { case "sequential": - pksPerThread := partitionCount / int64(concurrency) - thisOffset := pksPerThread * int64(threadId) - var thisSize int64 - if threadId+1 == concurrency { - thisSize = partitionCount - thisOffset - } else { - thisSize = pksPerThread + totalRowCount := partitionCount * clusteringRowCount + currentThreadId := int64(threadId) + rowCount, rowRemainder := totalRowCount/int64(concurrency), totalRowCount%int64(concurrency) + additionalRows, currentRemainderPartialOffset := int64(0), rowRemainder + if currentThreadId < rowRemainder { + additionalRows = 1 + currentRemainderPartialOffset = currentThreadId } - return NewSequentialVisitAll(thisOffset+partitionOffset, thisSize, clusteringRowCount) + rowOffset := partitionOffset*clusteringRowCount + currentThreadId*rowCount + currentRemainderPartialOffset + return NewSequentialVisitAll(rowOffset, rowCount+additionalRows, clusteringRowCount) case "uniform": return NewRandomUniform(threadId, partitionCount, partitionOffset, clusteringRowCount) case "timeseries": diff --git a/pkg/workloads/workloads.go b/pkg/workloads/workloads.go index ab3481f..705ca2c 100644 --- a/pkg/workloads/workloads.go +++ b/pkg/workloads/workloads.go @@ -36,15 +36,27 @@ type WorkloadGenerator interface { } type SequentialVisitAll struct { - PartitionOffset int64 - PartitionCount int64 + RowCount int64 ClusteringRowCount int64 + StartPartition int64 NextPartition int64 + StartClusteringRow int64 NextClusteringRow int64 -} - -func NewSequentialVisitAll(partitionOffset int64, partitionCount int64, clusteringRowCount int64) *SequentialVisitAll { - return &SequentialVisitAll{partitionOffset, partitionOffset + partitionCount, clusteringRowCount, partitionOffset, 0} + ProcessedRowCount int64 +} + +func NewSequentialVisitAll(rowOffset int64, rowCount int64, clusteringRowCount int64) *SequentialVisitAll { + currentPartition := rowOffset / clusteringRowCount + currentClusteringRow := rowOffset % clusteringRowCount + return &SequentialVisitAll{ + rowCount, + clusteringRowCount, + currentPartition, + currentPartition, + currentClusteringRow, + currentClusteringRow, + 0, + } } func (sva *SequentialVisitAll) NextTokenRange() TokenRange { @@ -64,16 +76,18 @@ func (sva *SequentialVisitAll) NextPartitionKey() int64 { func (sva *SequentialVisitAll) NextClusteringKey() int64 { ck := sva.NextClusteringRow sva.NextClusteringRow++ + sva.ProcessedRowCount++ return ck } func (sva *SequentialVisitAll) IsDone() bool { - return sva.NextPartition >= sva.PartitionCount || (sva.NextPartition+1 == sva.PartitionCount && sva.NextClusteringRow >= sva.ClusteringRowCount) + return sva.ProcessedRowCount >= sva.RowCount } func (sva *SequentialVisitAll) Restart() { - sva.NextClusteringRow = 0 - sva.NextPartition = sva.PartitionOffset + sva.NextPartition = sva.StartPartition + sva.NextClusteringRow = sva.StartClusteringRow + sva.ProcessedRowCount = 0 } func (sva *SequentialVisitAll) IsPartitionDone() bool { diff --git a/pkg/workloads/workloads_test.go b/pkg/workloads/workloads_test.go index 31334a3..31cd713 100644 --- a/pkg/workloads/workloads_test.go +++ b/pkg/workloads/workloads_test.go @@ -10,10 +10,18 @@ import ( func TestSequentialWorkload(t *testing.T) { generator := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) testCases := []struct { - partitionOffset int64 - partitionCount int64 + rowOffset int64 + rowCount int64 clusteringRowCount int64 }{ + {0, 51, 81}, + {51, 51, 81}, + {102, 51, 81}, + {153, 51, 81}, + {204, 51, 81}, + {255, 50, 81}, + {305, 50, 81}, + {355, 50, 81}, {10, 20, 30}, {0, 1, 1}, {generator.Int63n(100), generator.Int63n(100) + 100, generator.Int63n(99) + 1}, @@ -25,49 +33,53 @@ func TestSequentialWorkload(t *testing.T) { for i, tc := range testCases { t.Run(fmt.Sprintf("rand%d", i), func(t *testing.T) { - wrkld := NewSequentialVisitAll(tc.partitionOffset, tc.partitionCount, tc.clusteringRowCount) - - expectedPk := tc.partitionOffset - expectedCk := int64(0) - + wrkld := NewSequentialVisitAll(tc.rowOffset, tc.rowCount, tc.clusteringRowCount) + currentPk := tc.rowOffset / tc.clusteringRowCount + currentCk := tc.rowOffset % tc.clusteringRowCount + lastPk := (tc.rowOffset + tc.rowCount) / tc.clusteringRowCount + lastCk := (tc.rowOffset + tc.rowCount) % tc.clusteringRowCount + rowCounter := int64(0) for { - if wrkld.IsDone() && expectedPk < tc.partitionOffset+tc.partitionCount { - t.Errorf("got end of stream; expected %d", expectedPk) - } - if !wrkld.IsDone() && expectedPk == tc.partitionOffset+tc.partitionCount { - t.Errorf("expected end of stream at %d", expectedPk) - } - if wrkld.IsDone() { t.Log("got end of stream") + if currentPk != lastPk { + t.Errorf("wrong last PK; got %d; expected %d", currentPk, lastPk) + } + if currentCk != lastCk { + t.Errorf("wrong last CK; got %d; expected %d", currentCk, lastCk) + } + if rowCounter != tc.rowCount { + t.Errorf("Expected '%d' rows to be processed, but got '%d'", tc.rowCount, rowCounter) + } break } pk := wrkld.NextPartitionKey() - if pk != expectedPk { - t.Errorf("wrong PK; got %d; expected %d", pk, expectedPk) + if pk != currentPk { + t.Errorf("wrong PK; got %d; expected %d", pk, currentPk) } else { t.Logf("got PK %d", pk) } ck := wrkld.NextClusteringKey() - if ck != expectedCk { - t.Errorf("wrong CK; got %d; expected %d", pk, expectedCk) + if ck != currentCk { + t.Errorf("wrong CK; got %d; expected %d", pk, currentCk) } else { t.Logf("got CK %d", ck) } - expectedCk++ - if expectedCk == tc.clusteringRowCount { + currentCk++ + rowCounter++ + if currentCk == tc.clusteringRowCount { if !wrkld.IsPartitionDone() { - t.Errorf("expected end of partition at %d", expectedCk) + t.Errorf("expected end of partition at %d", currentCk) } else { t.Log("got end of partition") } - expectedCk = 0 - expectedPk++ + currentCk = 0 + currentPk++ } else if wrkld.IsPartitionDone() { - t.Errorf("got end of partition; expected %d", expectedCk) + t.Errorf("got end of partition; expected %d", currentCk) } } })