Skip to content

Commit

Permalink
Merge pull request #153 from vponomaryov/fix-sequential-workload
Browse files Browse the repository at this point in the history
Fix row-thread mapping for `sequential` workloads
  • Loading branch information
dkropachev authored Nov 5, 2024
2 parents 86e2cca + e875ca7 commit e8d7933
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 41 deletions.
17 changes: 9 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,16 @@ func PrepareDatabase(session *gocql.Session, replicationFactor int) {
func GetWorkload(name string, threadId int, partitionOffset int64, mode string, writeRate int64, distribution string) WorkloadGenerator {
switch name {
case "sequential":
pksPerThread := partitionCount / int64(concurrency)
thisOffset := pksPerThread * int64(threadId)
var thisSize int64
if threadId+1 == concurrency {
thisSize = partitionCount - thisOffset
} else {
thisSize = pksPerThread
totalRowCount := partitionCount * clusteringRowCount
currentThreadId := int64(threadId)
rowCount, rowRemainder := totalRowCount/int64(concurrency), totalRowCount%int64(concurrency)
additionalRows, currentRemainderPartialOffset := int64(0), rowRemainder
if currentThreadId < rowRemainder {
additionalRows = 1
currentRemainderPartialOffset = currentThreadId
}
return NewSequentialVisitAll(thisOffset+partitionOffset, thisSize, clusteringRowCount)
rowOffset := partitionOffset*clusteringRowCount + currentThreadId*rowCount + currentRemainderPartialOffset
return NewSequentialVisitAll(rowOffset, rowCount+additionalRows, clusteringRowCount)
case "uniform":
return NewRandomUniform(threadId, partitionCount, partitionOffset, clusteringRowCount)
case "timeseries":
Expand Down
32 changes: 23 additions & 9 deletions pkg/workloads/workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,27 @@ type WorkloadGenerator interface {
}

type SequentialVisitAll struct {
PartitionOffset int64
PartitionCount int64
RowCount int64
ClusteringRowCount int64
StartPartition int64
NextPartition int64
StartClusteringRow int64
NextClusteringRow int64
}

func NewSequentialVisitAll(partitionOffset int64, partitionCount int64, clusteringRowCount int64) *SequentialVisitAll {
return &SequentialVisitAll{partitionOffset, partitionOffset + partitionCount, clusteringRowCount, partitionOffset, 0}
ProcessedRowCount int64
}

func NewSequentialVisitAll(rowOffset int64, rowCount int64, clusteringRowCount int64) *SequentialVisitAll {
currentPartition := rowOffset / clusteringRowCount
currentClusteringRow := rowOffset % clusteringRowCount
return &SequentialVisitAll{
rowCount,
clusteringRowCount,
currentPartition,
currentPartition,
currentClusteringRow,
currentClusteringRow,
0,
}
}

func (sva *SequentialVisitAll) NextTokenRange() TokenRange {
Expand All @@ -64,16 +76,18 @@ func (sva *SequentialVisitAll) NextPartitionKey() int64 {
func (sva *SequentialVisitAll) NextClusteringKey() int64 {
ck := sva.NextClusteringRow
sva.NextClusteringRow++
sva.ProcessedRowCount++
return ck
}

func (sva *SequentialVisitAll) IsDone() bool {
return sva.NextPartition >= sva.PartitionCount || (sva.NextPartition+1 == sva.PartitionCount && sva.NextClusteringRow >= sva.ClusteringRowCount)
return sva.ProcessedRowCount >= sva.RowCount
}

func (sva *SequentialVisitAll) Restart() {
sva.NextClusteringRow = 0
sva.NextPartition = sva.PartitionOffset
sva.NextPartition = sva.StartPartition
sva.NextClusteringRow = sva.StartClusteringRow
sva.ProcessedRowCount = 0
}

func (sva *SequentialVisitAll) IsPartitionDone() bool {
Expand Down
60 changes: 36 additions & 24 deletions pkg/workloads/workloads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,18 @@ import (
func TestSequentialWorkload(t *testing.T) {
generator := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
testCases := []struct {
partitionOffset int64
partitionCount int64
rowOffset int64
rowCount int64
clusteringRowCount int64
}{
{0, 51, 81},
{51, 51, 81},
{102, 51, 81},
{153, 51, 81},
{204, 51, 81},
{255, 50, 81},
{305, 50, 81},
{355, 50, 81},
{10, 20, 30},
{0, 1, 1},
{generator.Int63n(100), generator.Int63n(100) + 100, generator.Int63n(99) + 1},
Expand All @@ -25,49 +33,53 @@ func TestSequentialWorkload(t *testing.T) {

for i, tc := range testCases {
t.Run(fmt.Sprintf("rand%d", i), func(t *testing.T) {
wrkld := NewSequentialVisitAll(tc.partitionOffset, tc.partitionCount, tc.clusteringRowCount)

expectedPk := tc.partitionOffset
expectedCk := int64(0)

wrkld := NewSequentialVisitAll(tc.rowOffset, tc.rowCount, tc.clusteringRowCount)
currentPk := tc.rowOffset / tc.clusteringRowCount
currentCk := tc.rowOffset % tc.clusteringRowCount
lastPk := (tc.rowOffset + tc.rowCount) / tc.clusteringRowCount
lastCk := (tc.rowOffset + tc.rowCount) % tc.clusteringRowCount
rowCounter := int64(0)
for {
if wrkld.IsDone() && expectedPk < tc.partitionOffset+tc.partitionCount {
t.Errorf("got end of stream; expected %d", expectedPk)
}
if !wrkld.IsDone() && expectedPk == tc.partitionOffset+tc.partitionCount {
t.Errorf("expected end of stream at %d", expectedPk)
}

if wrkld.IsDone() {
t.Log("got end of stream")
if currentPk != lastPk {
t.Errorf("wrong last PK; got %d; expected %d", currentPk, lastPk)
}
if currentCk != lastCk {
t.Errorf("wrong last CK; got %d; expected %d", currentCk, lastCk)
}
if rowCounter != tc.rowCount {
t.Errorf("Expected '%d' rows to be processed, but got '%d'", tc.rowCount, rowCounter)
}
break
}

pk := wrkld.NextPartitionKey()
if pk != expectedPk {
t.Errorf("wrong PK; got %d; expected %d", pk, expectedPk)
if pk != currentPk {
t.Errorf("wrong PK; got %d; expected %d", pk, currentPk)
} else {
t.Logf("got PK %d", pk)
}

ck := wrkld.NextClusteringKey()
if ck != expectedCk {
t.Errorf("wrong CK; got %d; expected %d", pk, expectedCk)
if ck != currentCk {
t.Errorf("wrong CK; got %d; expected %d", pk, currentCk)
} else {
t.Logf("got CK %d", ck)
}

expectedCk++
if expectedCk == tc.clusteringRowCount {
currentCk++
rowCounter++
if currentCk == tc.clusteringRowCount {
if !wrkld.IsPartitionDone() {
t.Errorf("expected end of partition at %d", expectedCk)
t.Errorf("expected end of partition at %d", currentCk)
} else {
t.Log("got end of partition")
}
expectedCk = 0
expectedPk++
currentCk = 0
currentPk++
} else if wrkld.IsPartitionDone() {
t.Errorf("got end of partition; expected %d", expectedCk)
t.Errorf("got end of partition; expected %d", currentCk)
}
}
})
Expand Down

0 comments on commit e8d7933

Please sign in to comment.