Skip to content

Commit

Permalink
pkg/schedule: put merge operators together (#8050)
Browse files Browse the repository at this point in the history
ref #7897, close #8049

pkg/schedule: put merge operators together to maintain atomicity

Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
nolouch and ti-chi-bot[bot] authored Apr 11, 2024
1 parent f0eb74b commit 96a69fc
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
6 changes: 4 additions & 2 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,16 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int {
}
continue
}
oc.wop.PutOperator(op)

if isMerge {
// count two merge operators as one, so wopStatus.ops[desc] should
// not be updated here
// TODO: call checkAddOperator ...
oc.wop.PutMergeOperators([]*Operator{op, ops[i+1]})
i++
added++
oc.wop.PutOperator(ops[i])
} else {
oc.wop.PutOperator(op)
}
operatorCounter.WithLabelValues(desc, "put").Inc()
oc.wopStatus.incCount(desc)
Expand Down
16 changes: 16 additions & 0 deletions pkg/schedule/operator/waiting_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var priorityWeight = []float64{1.0, 4.0, 9.0, 16.0}
// WaitingOperator is an interface of waiting operators.
type WaitingOperator interface {
PutOperator(op *Operator)
PutMergeOperators(op []*Operator)
GetOperator() []*Operator
ListOperator() []*Operator
}
Expand Down Expand Up @@ -66,6 +67,21 @@ func (b *randBuckets) PutOperator(op *Operator) {
bucket.ops = append(bucket.ops, op)
}

// PutMergeOperators puts two operators into the random buckets.
func (b *randBuckets) PutMergeOperators(ops []*Operator) {
b.mu.Lock()
defer b.mu.Unlock()
if len(ops) != 2 && (ops[0].Kind()&OpMerge == 0 || ops[1].Kind()&OpMerge == 0) {
return
}
priority := ops[0].GetPriorityLevel()
bucket := b.buckets[priority]
if len(bucket.ops) == 0 {
b.totalWeight += bucket.weight
}
bucket.ops = append(bucket.ops, ops...)
}

// ListOperator lists all operator in the random buckets.
func (b *randBuckets) ListOperator() []*Operator {
b.mu.Lock()
Expand Down

0 comments on commit 96a69fc

Please sign in to comment.