Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions api/ws_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/minio/console/models"
Expand All @@ -29,15 +30,16 @@ import (

func (wsc *wsMinioClient) objectManager(session *models.Principal) {
// Storage of Cancel Contexts for this connection
cancelContexts := make(map[int64]context.CancelFunc)
var cancelContexts sync.Map
// Initial goroutine
defer func() {
// We close socket at the end of requests
wsc.conn.close()
for _, c := range cancelContexts {
// invoke cancel
c()
}
cancelContexts.Range(func(_, value interface{}) bool {
cancelFunc := value.(context.CancelFunc)
cancelFunc()
return true
})
}()

writeChannel := make(chan WSResponse)
Expand Down Expand Up @@ -80,26 +82,28 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
ctx, cancel := context.WithCancel(context.Background())

// We store the cancel func associated with this request
cancelContexts[messageRequest.RequestID] = cancel
cancelContexts.Store(messageRequest.RequestID, cancel)

const itemsPerBatch = 1000
switch messageRequest.Mode {
case "close":
return
case "cancel":
// if we have that request id, cancel it
if cancelFunc, ok := cancelContexts[messageRequest.RequestID]; ok {
cancelFunc()
delete(cancelContexts, messageRequest.RequestID)
if cancelFunc, ok := cancelContexts.Load(messageRequest.RequestID); ok {
cancelFunc.(context.CancelFunc)()
cancelContexts.Delete(messageRequest.RequestID)
}
case "objects":
// cancel all previous open objects requests for listing
for rid, c := range cancelContexts {
cancelContexts.Range(func(key, value interface{}) bool {
rid := key.(int64)
if rid < messageRequest.RequestID {
// invoke cancel
c()
cancelFunc := value.(context.CancelFunc)
cancelFunc()
}
}
return true
})

// start listing and writing to web socket
go func() {
Expand All @@ -118,9 +122,10 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
}
var buffer []ObjectResponse
for lsObj := range startObjectsListing(ctx, wsc.client, objectRqConfigs) {
if cancelContexts[messageRequest.RequestID] == nil {
if _, ok := cancelContexts.Load(messageRequest.RequestID); !ok {
return
}

if lsObj.Err != nil {
sendWSResponse(WSResponse{
RequestID: messageRequest.RequestID,
Expand Down Expand Up @@ -162,16 +167,18 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
})

// remove the cancellation context
delete(cancelContexts, messageRequest.RequestID)
cancelContexts.Delete(messageRequest.RequestID)
}()
case "rewind":
// cancel all previous open objects requests for listing
for rid, c := range cancelContexts {
cancelContexts.Range(func(key, value interface{}) bool {
rid := key.(int64)
if rid < messageRequest.RequestID {
// invoke cancel
c()
cancelFunc := value.(context.CancelFunc)
cancelFunc()
}
}
return true
})

// start listing and writing to web socket
go func() {
Expand Down Expand Up @@ -253,7 +260,7 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
})

// remove the cancellation context
delete(cancelContexts, messageRequest.RequestID)
cancelContexts.Delete(messageRequest.RequestID)
}()
}
}
Expand Down