Skip to content

Commit

Permalink
Add concurrency control (#1245)
Browse files Browse the repository at this point in the history
* control file ops concurrency

* cleanup

* add batch size to multi op
  • Loading branch information
Hitenjain14 authored Oct 5, 2023
1 parent a0aa9f7 commit 160f7df
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 12 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ require (
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/klauspost/compress v1.16.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/remeh/sizedwaitgroup v1.0.0
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E=
github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw=
Expand Down
15 changes: 10 additions & 5 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ import (
)

var (
noBLOBBERS = errors.New("", "No Blobbers set in this allocation")
notInitialized = errors.New("sdk_not_initialized", "Please call InitStorageSDK Init and use GetAllocation to get the allocation object")
IsWasm = false
noBLOBBERS = errors.New("", "No Blobbers set in this allocation")
notInitialized = errors.New("sdk_not_initialized", "Please call InitStorageSDK Init and use GetAllocation to get the allocation object")
IsWasm = false
MultiOpBatchSize = 10
)

const (
Expand Down Expand Up @@ -828,8 +829,12 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error {
fmt.Sprintf("Multioperation: create connection failed. Required consensus %d got %d",
mo.consensusThresh, mo.operationMask.CountOnes()))
}

ops := 0
for ; i < len(operations); i++ {
if ops > MultiOpBatchSize {
// max batch size reached, commit
break
}
op := operations[i]
remotePath := op.RemotePath
parentPaths := GenerateParentPaths(remotePath)
Expand Down Expand Up @@ -878,7 +883,7 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error {
connectionID = newConnectionID
break
}

ops++
err = operation.Verify(a)
if err != nil {
return err
Expand Down
12 changes: 7 additions & 5 deletions zboxcore/sdk/multi_operation_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/0chain/errors"
"github.com/remeh/sizedwaitgroup"

"github.com/0chain/gosdk/core/common"
"github.com/0chain/gosdk/core/util"
Expand All @@ -28,6 +29,8 @@ const (
DefaultCreateConnectionTimeOut = 2 * time.Minute
)

var BatchSize = 5

type Operationer interface {
Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error)
buildChange(refs []fileref.RefEntity, uid uuid.UUID) []allocationchange.AllocationChange
Expand Down Expand Up @@ -150,14 +153,14 @@ func (mo *MultiOperation) Process() error {
ctx := mo.ctx
ctxCncl := mo.ctxCncl
defer ctxCncl()

swg := sizedwaitgroup.New(BatchSize)
errsSlice := make([]error, len(mo.operations))
mo.operationMask = zboxutil.NewUint128(0)
for idx, op := range mo.operations {
uid := util.GetNewUUID()
wg.Add(1)
swg.Add()
go func(op Operationer, idx int) {
defer wg.Done()
defer swg.Done()

// Check for other goroutines signal
select {
Expand All @@ -177,11 +180,10 @@ func (mo *MultiOperation) Process() error {
mo.operationMask = mo.operationMask.Or(mask)
mo.maskMU.Unlock()
changes := op.buildChange(refs, uid)

mo.changes[idx] = changes
}(op, idx)
}
wg.Wait()
swg.Wait()

// Check consensus
if mo.operationMask.CountOnes() < mo.consensusThresh || ctx.Err() != nil {
Expand Down
6 changes: 4 additions & 2 deletions zboxcore/sdk/upload_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"github.com/0chain/gosdk/constants"
"github.com/0chain/gosdk/zboxcore/allocationchange"
"github.com/0chain/gosdk/zboxcore/fileref"
l "github.com/0chain/gosdk/zboxcore/logger"
"github.com/0chain/gosdk/zboxcore/zboxutil"
"github.com/google/uuid"
"go.uber.org/zap"
)

type UploadOperation struct {
Expand All @@ -21,9 +23,9 @@ type UploadOperation struct {
func (uo *UploadOperation) Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error) {
err := uo.chunkedUpload.process()
if err != nil {
l.Logger.Error("UploadOperation Failed", zap.String("name", uo.chunkedUpload.fileMeta.RemoteName), zap.Error(err))
return nil, uo.chunkedUpload.uploadMask, err
}

var pos uint64
numList := len(uo.chunkedUpload.blobbers)
uo.refs = make([]*fileref.FileRef, numList)
Expand All @@ -32,7 +34,7 @@ func (uo *UploadOperation) Process(allocObj *Allocation, connectionID string) ([
uo.refs[pos] = uo.chunkedUpload.blobbers[pos].fileRef
uo.refs[pos].ChunkSize = uo.chunkedUpload.chunkSize
}

l.Logger.Info("UploadOperation Success", zap.String("name", uo.chunkedUpload.fileMeta.RemoteName))
return nil, uo.chunkedUpload.uploadMask, nil
}

Expand Down

0 comments on commit 160f7df

Please sign in to comment.