Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiupload completed callback #1172

Merged
merged 17 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions mobilesdk/zbox/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ type MultiOperationOption struct {
}

type MultiUploadOption struct {
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
IsWebstreaming bool `json:"isWebstreaming,omitempty"`
}

type MultiDownloadOption struct {
Expand Down Expand Up @@ -336,6 +337,7 @@ func MultiUpload(allocationID string, workdir string, jsonMultiUploadOptions str
encrypts := make([]bool, totalUploads)
chunkNumbers := make([]int, totalUploads)
isUpdates := make([]bool, totalUploads)
isWebstreaming := make([]bool, totalUploads)
for idx, option := range options {
filePaths[idx] = option.FilePath
fileNames[idx] = option.FileName
Expand All @@ -344,13 +346,14 @@ func MultiUpload(allocationID string, workdir string, jsonMultiUploadOptions str
chunkNumbers[idx] = option.ChunkNumber
encrypts[idx] = option.Encrypt
isUpdates[idx] = false
isWebstreaming[idx] = option.IsWebstreaming
}

a, err := getAllocation(allocationID)
if err != nil {
return err
}
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, &StatusCallbackWrapped{Callback: statusCb})
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, &StatusCallbackWrapped{Callback: statusCb})

}

Expand All @@ -373,6 +376,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str
encrypts := make([]bool, totalUploads)
chunkNumbers := make([]int, totalUploads)
isUpdates := make([]bool, totalUploads)
isWebstreaming := make([]bool, totalUploads)
for idx, option := range options {
filePaths[idx] = option.FilePath
fileNames[idx] = option.FileName
Expand All @@ -381,6 +385,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str
chunkNumbers[idx] = option.ChunkNumber
encrypts[idx] = option.Encrypt
isUpdates[idx] = true
isWebstreaming[idx] = option.IsWebstreaming
}
if err != nil {
return err
Expand All @@ -390,7 +395,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str
if err != nil {
return err
}
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, &StatusCallbackWrapped{Callback: statusCb})
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, &StatusCallbackWrapped{Callback: statusCb})

}

Expand Down
12 changes: 7 additions & 5 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ type BulkUploadOption struct {
Webstreaming bool `json:"webstreaming,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
IsRepair bool `json:"isRepair,omitempty"`
IsWebstreaming bool `json:"isWebstreaming,omitempty"`

NumBlocks int `json:"numBlocks,omitempty"`
FileSize int64 `json:"fileSize,omitempty"`
Expand Down Expand Up @@ -671,11 +672,12 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
sdk.WithChunkNumber(numBlocks),
}
operationRequests[idx] = sdk.OperationRequest{
FileMeta: fileMeta,
FileReader: fileReader,
OperationType: FileOperationInsert,
Opts: options,
Workdir: "/",
FileMeta: fileMeta,
FileReader: fileReader,
OperationType: FileOperationInsert,
Opts: options,
Workdir: "/",
IsWebstreaming: option.IsWebstreaming,
}

}
Expand Down
7 changes: 4 additions & 3 deletions winsdk/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ type UploadFile struct {
Path string
ThumbnailPath string

RemotePath string
Encrypt bool
IsUpdate bool
RemotePath string
Encrypt bool
IsUpdate bool
IsWebstreaming bool

ChunkNumber int
}
19 changes: 11 additions & 8 deletions winsdk/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,14 @@ type MultiOperationOption struct {
}

type MultiUploadOption struct {
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
IsWebstreaming bool `json:"isWebstreaming,omitempty"`
}

// MultiOperation - do copy, move, delete and createdir operation together
Expand Down Expand Up @@ -323,6 +324,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char {
chunkNumbers := make([]int, totalUploads)
encrypts := make([]bool, totalUploads)
isUpdates := make([]bool, totalUploads)
isWebstreaming := make([]bool, totalUploads)

statusBar := &StatusCallback{
status: make(map[string]*Status),
Expand All @@ -335,6 +337,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char {
remotePaths[idx] = option.RemotePath
chunkNumbers[idx] = option.ChunkNumber
isUpdates[idx] = option.IsUpdate
isWebstreaming[idx] = option.IsWebstreaming
encrypts[idx] = option.Encrypt
statusBar.status[option.RemotePath+option.Name] = &Status{}
}
Expand All @@ -346,7 +349,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char {

statusCaches.Add(C.GoString(uploadID), statusBar)

err = a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, statusBar)
err = a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, statusBar)
if err != nil {
return WithJSON(nil, err)
}
Expand Down
35 changes: 24 additions & 11 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,13 @@ type Allocation struct {
}

type OperationRequest struct {
OperationType string
LocalPath string
RemotePath string
DestName string // Required only for rename operation
DestPath string // Required for copy and move operation
IsUpdate bool
OperationType string
LocalPath string
RemotePath string
DestName string // Required only for rename operation
DestPath string // Required for copy and move operation
IsUpdate bool
IsWebstreaming bool

// Required for uploads
Workdir string
Expand Down Expand Up @@ -387,6 +388,7 @@ func (a *Allocation) CreateDir(remotePath string) error {
remotePath: remotePath,
wg: &sync.WaitGroup{},
timestamp: timestamp,
alreadyExists: map[uint64]bool{},
Consensus: Consensus{
RWMutex: &sync.RWMutex{},
consensusThresh: a.consensusThreshold,
Expand Down Expand Up @@ -495,7 +497,7 @@ func (a *Allocation) EncryptAndUploadFileWithThumbnail(
)
}

func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileNames []string, thumbnailPaths []string, encrypts []bool, chunkNumbers []int, remotePaths []string, isUpdate []bool, status StatusCallback) error {
func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileNames []string, thumbnailPaths []string, encrypts []bool, chunkNumbers []int, remotePaths []string, isUpdate []bool, isWebstreaming []bool, status StatusCallback) error {
if len(localPaths) != len(thumbnailPaths) {
return errors.New("invalid_value", "length of localpaths and thumbnailpaths must be equal")
}
Expand Down Expand Up @@ -554,7 +556,7 @@ func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileN
if err != nil {
return err
}
fmt.Println("fullRemotepath and localpath", fullRemotePath, localPath)

fileMeta := FileMeta{
Path: localPath,
ActualSize: fileInfo.Size(),
Expand Down Expand Up @@ -588,6 +590,9 @@ func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileN
if isUpdate[idx] {
operationRequests[idx].OperationType = constants.FileOperationUpdate
}
if isWebstreaming[idx] {
operationRequests[idx].IsWebstreaming = true
}

}
err := a.DoMultiOperation(operationRequests)
Expand Down Expand Up @@ -831,6 +836,7 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error {
mo.connectionID = zboxutil.NewConnectionId()

previousPaths := make(map[string]bool)
connectionErrors := make([]error, len(mo.allocationObj.Blobbers))

var wg sync.WaitGroup
for blobberIdx := range mo.allocationObj.Blobbers {
Expand All @@ -840,14 +846,21 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error {
err := mo.createConnectionObj(pos)
if err != nil {
l.Logger.Error(err.Error())
connectionErrors[pos] = err
}
}(blobberIdx)
}
wg.Wait()
// Check consensus
if mo.operationMask.CountOnes() < mo.consensusThresh {
majorErr := zboxutil.MajorError(connectionErrors)
if majorErr != nil {
return errors.New("consensus_not_met",
fmt.Sprintf("Multioperation: create connection failed. Required consensus %d got %d. Major error: %s",
mo.consensusThresh, mo.operationMask.CountOnes(), majorErr.Error()))
}
return errors.New("consensus_not_met",
fmt.Sprintf("Multioperation failed. Required consensus %d got %d",
fmt.Sprintf("Multioperation: create connection failed. Required consensus %d got %d",
mo.consensusThresh, mo.operationMask.CountOnes()))
}

Expand All @@ -873,13 +886,13 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error {
operation = NewMoveOperation(op.RemotePath, op.DestPath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)

case constants.FileOperationInsert:
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, false, op.Opts...)
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, false, op.IsWebstreaming, op.Opts...)

case constants.FileOperationDelete:
operation = NewDeleteOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)

case constants.FileOperationUpdate:
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, true, op.Opts...)
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, true, op.IsWebstreaming, op.Opts...)

case constants.FileOperationCreateDir:
operation = NewDirOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)
Expand Down
7 changes: 6 additions & 1 deletion zboxcore/sdk/commitworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,15 @@ func (commitreq *CommitRequest) processCommit() {
var req *http.Request
var lR ReferencePathResult
req, err := zboxutil.NewReferencePathRequest(commitreq.blobber.Baseurl, commitreq.allocationID, commitreq.allocationTx, paths)
if err != nil || len(paths) == 0 {
if err != nil {
l.Logger.Error("Creating ref path req", err)
return
}
if len(paths) == 0 {
l.Logger.Info("Nothing to commit")
commitreq.result = SuccessCommitResult()
return
}

ctx, cncl := context.WithTimeout(context.Background(), (time.Second * 30))
err = zboxutil.HttpDo(ctx, cncl, req, func(resp *http.Response, err error) error {
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/deleteworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (dop *DeleteOperation) Process(allocObj *Allocation, connectionID string) (
fmt.Sprintf("Delete failed. Required consensus %d, got %d",
deleteReq.consensus.consensusThresh, deleteReq.consensus.consensus))
}
l.Logger.Info("Delete Processs Ended ")
l.Logger.Info("Delete Process Ended ")
return objectTreeRefs, deleteReq.deleteMask, nil
}

Expand Down
54 changes: 31 additions & 23 deletions zboxcore/sdk/dirworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type DirRequest struct {
mu *sync.Mutex
connectionID string
timestamp int64
alreadyExists map[uint64]bool
Consensus
}

Expand All @@ -63,6 +64,7 @@ func (req *DirRequest) ProcessWithBlobbers(a *Allocation) int {
}
if alreadyExists {
countMu.Lock()
req.alreadyExists[pos] = true
existingDirCount++
countMu.Unlock()
}
Expand Down Expand Up @@ -258,9 +260,6 @@ func (req *DirRequest) createDirInBlobber(blobber *blockchain.StorageNode, pos u
l.Logger.Error(blobber.Baseurl, " Response: ", msg)
if strings.Contains(msg, DirectoryExists) {
req.Consensus.Done()
req.mu.Lock()
req.dirMask = req.dirMask.And(zboxutil.NewUint128(1).Lsh(pos).Not())
req.mu.Unlock()
alreadyExists = true
return
}
Expand All @@ -286,28 +285,30 @@ func (req *DirRequest) createDirInBlobber(blobber *blockchain.StorageNode, pos u
}

type DirOperation struct {
remotePath string
ctx context.Context
ctxCncl context.CancelFunc
dirMask zboxutil.Uint128
maskMU *sync.Mutex
remotePath string
ctx context.Context
ctxCncl context.CancelFunc
dirMask zboxutil.Uint128
maskMU *sync.Mutex
alreadyExists map[uint64]bool

Consensus
}

func (dirOp *DirOperation) Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error) {
refs := make([]fileref.RefEntity, len(allocObj.Blobbers))
dR := &DirRequest{
allocationID: allocObj.ID,
allocationTx: allocObj.Tx,
connectionID: connectionID,
blobbers: allocObj.Blobbers,
remotePath: dirOp.remotePath,
ctx: dirOp.ctx,
ctxCncl: dirOp.ctxCncl,
dirMask: dirOp.dirMask,
mu: dirOp.maskMU,
wg: &sync.WaitGroup{},
allocationID: allocObj.ID,
allocationTx: allocObj.Tx,
connectionID: connectionID,
blobbers: allocObj.Blobbers,
remotePath: dirOp.remotePath,
ctx: dirOp.ctx,
ctxCncl: dirOp.ctxCncl,
dirMask: dirOp.dirMask,
mu: dirOp.maskMU,
wg: &sync.WaitGroup{},
alreadyExists: make(map[uint64]bool),
}
dR.Consensus = Consensus{
RWMutex: &sync.RWMutex{},
Expand All @@ -316,6 +317,7 @@ func (dirOp *DirOperation) Process(allocObj *Allocation, connectionID string) ([
}

_ = dR.ProcessWithBlobbers(allocObj)
dirOp.alreadyExists = dR.alreadyExists

if !dR.isConsensusOk() {
return nil, dR.dirMask, errors.New("consensus_not_met", "directory creation failed due to consensus not met")
Expand All @@ -330,12 +332,17 @@ func (dirOp *DirOperation) buildChange(refs []fileref.RefEntity, uid uuid.UUID)
changes := make([]allocationchange.AllocationChange, len(refs))
for i := dirOp.dirMask; !i.Equals(zboxutil.NewUint128(0)); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) {
pos = uint64(i.TrailingZeros())
newChange := &allocationchange.DirCreateChange{
RemotePath: dirOp.remotePath,
Uuid: uid,
Timestamp: common.Now(),
if dirOp.alreadyExists[pos] {
newChange := &allocationchange.EmptyFileChange{}
changes[pos] = newChange
} else {
newChange := &allocationchange.DirCreateChange{
RemotePath: dirOp.remotePath,
Uuid: uid,
Timestamp: common.Now(),
}
changes[pos] = newChange
}
changes[pos] = newChange
}
return changes
}
Expand Down Expand Up @@ -367,5 +374,6 @@ func NewDirOperation(remotePath string, dirMask zboxutil.Uint128, maskMU *sync.M
dirOp.consensusThresh = consensusTh
dirOp.fullconsensus = fullConsensus
dirOp.ctx, dirOp.ctxCncl = context.WithCancel(ctx)
dirOp.alreadyExists = make(map[uint64]bool)
return dirOp
}
Loading
Loading