Skip to content

Commit

Permalink
Transfer - Make curProcessedUploadChunks local variable (#1087)
Browse files Browse the repository at this point in the history
  • Loading branch information
yahavi authored Dec 28, 2023
1 parent c486274 commit 040b65d
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 69 deletions.
16 changes: 8 additions & 8 deletions artifactory/commands/transferfiles/fulltransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (m *fullTransferPhase) run() error {
return err
}

folderHandler := m.createFolderFullTransferHandlerFunc(node, *pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
folderHandler := m.createFolderFullTransferHandlerFunc(node, pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
_, err = pcWrapper.chunkBuilderProducerConsumer.AddTaskWithError(folderHandler(folderParams{relativePath: "."}), pcWrapper.errorsQueue.AddError)
return err
}
Expand All @@ -124,7 +124,7 @@ type folderParams struct {
relativePath string
}

func (m *fullTransferPhase) createFolderFullTransferHandlerFunc(node *reposnapshot.Node, pcWrapper producerConsumerWrapper, uploadChunkChan chan UploadedChunk,
func (m *fullTransferPhase) createFolderFullTransferHandlerFunc(node *reposnapshot.Node, pcWrapper *producerConsumerWrapper, uploadChunkChan chan UploadedChunk,
delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng) folderFullTransferHandlerFunc {
return func(params folderParams) parallel.TaskFunc {
return func(threadId int) error {
Expand All @@ -134,7 +134,7 @@ func (m *fullTransferPhase) createFolderFullTransferHandlerFunc(node *reposnapsh
}
}

func (m *fullTransferPhase) transferFolder(node *reposnapshot.Node, params folderParams, logMsgPrefix string, pcWrapper producerConsumerWrapper,
func (m *fullTransferPhase) transferFolder(node *reposnapshot.Node, params folderParams, logMsgPrefix string, pcWrapper *producerConsumerWrapper,
uploadChunkChan chan UploadedChunk, delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng) (err error) {
log.Debug(logMsgPrefix+"Handling folder:", path.Join(m.repoKey, params.relativePath))

Expand All @@ -159,15 +159,15 @@ func (m *fullTransferPhase) transferFolder(node *reposnapshot.Node, params folde

// Chunk didn't reach full size. Upload the remaining files.
if len(curUploadChunk.UploadCandidates) > 0 {
if _, err = pcWrapper.chunkUploaderProducerConsumer.AddTaskWithError(uploadChunkWhenPossibleHandler(&m.phaseBase, curUploadChunk, uploadChunkChan, errorsChannelMng), pcWrapper.errorsQueue.AddError); err != nil {
if _, err = pcWrapper.chunkUploaderProducerConsumer.AddTaskWithError(uploadChunkWhenPossibleHandler(pcWrapper, &m.phaseBase, curUploadChunk, uploadChunkChan, errorsChannelMng), pcWrapper.errorsQueue.AddError); err != nil {
return
}
}
log.Debug(logMsgPrefix+"Done transferring folder:", path.Join(m.repoKey, params.relativePath))
return
}

func (m *fullTransferPhase) searchAndHandleFolderContents(params folderParams, pcWrapper producerConsumerWrapper,
func (m *fullTransferPhase) searchAndHandleFolderContents(params folderParams, pcWrapper *producerConsumerWrapper,
uploadChunkChan chan UploadedChunk, delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng,
node *reposnapshot.Node) (curUploadChunk api.UploadChunk, err error) {
curUploadChunk = api.UploadChunk{
Expand Down Expand Up @@ -223,7 +223,7 @@ func (m *fullTransferPhase) searchAndHandleFolderContents(params folderParams, p
return
}

func (m *fullTransferPhase) handleFoundChildFolder(params folderParams, pcWrapper producerConsumerWrapper,
func (m *fullTransferPhase) handleFoundChildFolder(params folderParams, pcWrapper *producerConsumerWrapper,
uploadChunkChan chan UploadedChunk, delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng,
item servicesUtils.ResultItem) (err error) {
newRelativePath := getFolderRelativePath(item.Name, params.relativePath)
Expand All @@ -239,7 +239,7 @@ func (m *fullTransferPhase) handleFoundChildFolder(params folderParams, pcWrappe
return
}

func (m *fullTransferPhase) handleFoundFile(pcWrapper producerConsumerWrapper,
func (m *fullTransferPhase) handleFoundFile(pcWrapper *producerConsumerWrapper,
uploadChunkChan chan UploadedChunk, delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng,
node *reposnapshot.Node, item servicesUtils.ResultItem, curUploadChunk *api.UploadChunk) (err error) {
file := api.FileRepresentation{Repo: item.Repo, Path: item.Path, Name: item.Name, Size: item.Size}
Expand All @@ -255,7 +255,7 @@ func (m *fullTransferPhase) handleFoundFile(pcWrapper producerConsumerWrapper,
}
curUploadChunk.AppendUploadCandidateIfNeeded(file, m.buildInfoRepo)
if curUploadChunk.IsChunkFull() {
_, err = pcWrapper.chunkUploaderProducerConsumer.AddTaskWithError(uploadChunkWhenPossibleHandler(&m.phaseBase, *curUploadChunk, uploadChunkChan, errorsChannelMng), pcWrapper.errorsQueue.AddError)
_, err = pcWrapper.chunkUploaderProducerConsumer.AddTaskWithError(uploadChunkWhenPossibleHandler(pcWrapper, &m.phaseBase, *curUploadChunk, uploadChunkChan, errorsChannelMng), pcWrapper.errorsQueue.AddError)
if err != nil {
return
}
Expand Down
31 changes: 10 additions & 21 deletions artifactory/commands/transferfiles/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (ptm *PollingTasksManager) start(phaseBase *phaseBase, runWaitGroup *sync.W
}
go func() {
defer runWaitGroup.Done()
pollUploads(phaseBase, phaseBase.srcUpService, uploadChunkChan, ptm.doneChannel, errorsChannelMng)
pollUploads(pcWrapper, phaseBase, phaseBase.srcUpService, uploadChunkChan, ptm.doneChannel, errorsChannelMng)
}()
return nil
}
Expand All @@ -195,16 +195,6 @@ func (ptm *PollingTasksManager) stop() {
}
}

type producerConsumerWrapper struct {
// This Producer-Consumer is used to upload chunks, initialized in newProducerConsumerWrapper; each uploading thread waits to be given tasks from the queue.
chunkUploaderProducerConsumer parallel.Runner
// This Producer-Consumer is used to execute AQLs and build chunks from the AQLs' results. The chunks data is sent to the go routines that will upload them.
// Initialized in newProducerConsumerWrapper; each builder thread waits to be given tasks from the queue.
chunkBuilderProducerConsumer parallel.Runner
// Errors related to chunkUploaderProducerConsumer and chunkBuilderProducerConsumer are logged in this queue.
errorsQueue *clientUtils.ErrorsQueue
}

func newProducerConsumerWrapper() producerConsumerWrapper {
chunkUploaderProducerConsumer := parallel.NewRunner(GetChunkUploaderThreads(), tasksMaxCapacity, false)
chunkBuilderProducerConsumer := parallel.NewRunner(GetChunkBuilderThreads(), tasksMaxCapacity, false)
Expand Down Expand Up @@ -257,13 +247,12 @@ func runProducerConsumers(pcWrapper *producerConsumerWrapper) (executionErr erro
// Number of chunks is limited by the number of threads.
// Whenever the status of a chunk was received and is DONE, its token is removed from the tokens batch, making room for a new chunk to be uploaded
// and a new token to be polled on.
func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploadChunkChan chan UploadedChunk, doneChan chan bool, errorsChannelMng *ErrorsChannelMng) {
func pollUploads(pcWrapper *producerConsumerWrapper, phaseBase *phaseBase, srcUpService *srcUserPluginService, uploadChunkChan chan UploadedChunk, doneChan chan bool, errorsChannelMng *ErrorsChannelMng) {
curTokensBatch := api.UploadChunksStatusBody{}
chunksLifeCycleManager := ChunksLifeCycleManager{
deletedChunksSet: datastructures.MakeSet[api.ChunkId](),
nodeToChunksMap: make(map[api.NodeId]map[api.ChunkId]UploadedChunkData),
}
curProcessedUploadChunks = 0
var timeEstMng *state.TimeEstimationManager
if phaseBase != nil {
timeEstMng = &phaseBase.stateManager.TimeEstimationManager
Expand All @@ -276,9 +265,9 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa
time.Sleep(waitTimeBetweenChunkStatusSeconds * time.Second)

// Run once per 5 minutes
if i%100 == 0 {
if i%60 == 0 {
// 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance.
if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil {
if err := phaseBase.stateManager.SetWorkingThreads(pcWrapper.totalProcessedUploadChunks); err != nil {
log.Error("Couldn't set the current number of working threads:", err.Error())
}
log.Debug("There are", len(phaseBase.stateManager.StaleChunks), "chunks in transit for more than 30 minutes")
Expand Down Expand Up @@ -310,7 +299,7 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa
// Clear body for the next request
curTokensBatch = api.UploadChunksStatusBody{}
removeDeletedChunksFromSet(chunksStatus.DeletedChunks, chunksLifeCycleManager.deletedChunksSet)
toStop := handleChunksStatuses(phaseBase, &chunksStatus, &chunksLifeCycleManager, timeEstMng, errorsChannelMng)
toStop := handleChunksStatuses(pcWrapper, phaseBase, &chunksStatus, &chunksLifeCycleManager, timeEstMng, errorsChannelMng)
if toStop {
return
}
Expand Down Expand Up @@ -375,9 +364,9 @@ func removeDeletedChunksFromSet(deletedChunks []string, deletedChunksSet *datast
// handleChunksStatuses handles the chunk statuses from the response received from the source Artifactory Instance.
// It syncs the chunk status between the CLI and the source Artifactory instance,
// When a chunk is DONE, the progress bar is updated, and the number of working threads is decreased.
func handleChunksStatuses(phase *phaseBase, chunksStatus *api.UploadChunksStatusResponse,
func handleChunksStatuses(pcWrapper *producerConsumerWrapper, phase *phaseBase, chunksStatus *api.UploadChunksStatusResponse,
chunksLifeCycleManager *ChunksLifeCycleManager, timeEstMng *state.TimeEstimationManager, errorsChannelMng *ErrorsChannelMng) bool {
checkChunkStatusSync(chunksStatus, chunksLifeCycleManager, errorsChannelMng)
checkChunkStatusSync(pcWrapper, chunksStatus, chunksLifeCycleManager, errorsChannelMng)
for _, chunk := range chunksStatus.ChunksStatus {
if chunk.UuidToken == "" {
log.Error("Unexpected empty uuid token in status")
Expand All @@ -387,7 +376,7 @@ func handleChunksStatuses(phase *phaseBase, chunksStatus *api.UploadChunksStatus
case api.InProgress:
continue
case api.Done:
reduceCurProcessedChunks()
pcWrapper.decProcessedChunks()
log.Debug("Received status DONE for chunk '" + chunk.UuidToken + "'")

chunkSentTime := chunksLifeCycleManager.nodeToChunksMap[api.NodeId(chunksStatus.NodeId)][api.ChunkId(chunk.UuidToken)].TimeSent
Expand Down Expand Up @@ -433,7 +422,7 @@ func updateProgress(phase *phaseBase, timeEstMng *state.TimeEstimationManager,
}

// Verify and handle in progress chunks synchronization between the CLI and the Source Artifactory instance
func checkChunkStatusSync(chunkStatus *api.UploadChunksStatusResponse, chunksLifeCycleManager *ChunksLifeCycleManager, errorsChannelMng *ErrorsChannelMng) {
func checkChunkStatusSync(pcWrapper *producerConsumerWrapper, chunkStatus *api.UploadChunksStatusResponse, chunksLifeCycleManager *ChunksLifeCycleManager, errorsChannelMng *ErrorsChannelMng) {
// Compare between the number of chunks received from the latest syncChunks request to the chunks data we handle locally in nodeToChunksMap.
// If the number of the in progress chunks of a node within nodeToChunksMap differs from the chunkStatus received, there is missing data on the source side.
expectedChunksInNode := len(chunksLifeCycleManager.nodeToChunksMap[api.NodeId(chunkStatus.NodeId)])
Expand Down Expand Up @@ -463,7 +452,7 @@ func checkChunkStatusSync(chunkStatus *api.UploadChunksStatusResponse, chunksLif
addErrorToChannel(errorsChannelMng, failedFile)
}
delete(chunksLifeCycleManager.nodeToChunksMap[api.NodeId(chunkStatus.NodeId)], chunkUuid)
reduceCurProcessedChunks()
pcWrapper.decProcessedChunks()
}
}
}
Expand Down
42 changes: 42 additions & 0 deletions artifactory/commands/transferfiles/producerconsumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package transferfiles

import (
"sync"

"github.com/jfrog/gofrog/parallel"
clientUtils "github.com/jfrog/jfrog-client-go/utils"
)

type producerConsumerWrapper struct {
// This Producer-Consumer is used to upload chunks, initialized in newProducerConsumerWrapper; each uploading thread waits to be given tasks from the queue.
chunkUploaderProducerConsumer parallel.Runner
// This Producer-Consumer is used to execute AQLs and build chunks from the AQLs' results. The chunks data is sent to the go routines that will upload them.
// Initialized in newProducerConsumerWrapper; each builder thread waits to be given tasks from the queue.
chunkBuilderProducerConsumer parallel.Runner
// Errors related to chunkUploaderProducerConsumer and chunkBuilderProducerConsumer are logged in this queue.
errorsQueue *clientUtils.ErrorsQueue
// This variable holds the total number of upload chunk that were sent to the source Artifactory instance to process.
// Together with this mutex, they control the load on the user plugin and couple it to the local number of threads.
totalProcessedUploadChunks int
processedUploadChunksMutex sync.Mutex
}

// Checks whether the total number of upload chunks sent is lower than the number of threads, and if so, increments it.
// Returns true if the total number was indeed incremented.
func (producerConsumerWrapper *producerConsumerWrapper) incProcessedChunksWhenPossible() bool {
producerConsumerWrapper.processedUploadChunksMutex.Lock()
defer producerConsumerWrapper.processedUploadChunksMutex.Unlock()
if producerConsumerWrapper.totalProcessedUploadChunks < GetChunkUploaderThreads() {
producerConsumerWrapper.totalProcessedUploadChunks++
return true
}
return false
}

// Reduces the current total number of upload chunks processed. Called when an upload chunks doesn't require polling for status -
// if it's done processing, or an error occurred when sending it.
func (producerConsumerWrapper *producerConsumerWrapper) decProcessedChunks() {
producerConsumerWrapper.processedUploadChunksMutex.Lock()
defer producerConsumerWrapper.processedUploadChunksMutex.Unlock()
producerConsumerWrapper.totalProcessedUploadChunks--
}
16 changes: 9 additions & 7 deletions artifactory/commands/transferfiles/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,19 @@ func uploadChunkAndPollTwice(t *testing.T, phaseBase *phaseBase, fileSample api.
doneChan := make(chan bool, 1)
var runWaitGroup sync.WaitGroup

pcWrapper := newProducerConsumerWrapper()
chunk := api.UploadChunk{}
chunk.AppendUploadCandidateIfNeeded(fileSample, false)
stopped := uploadChunkWhenPossible(phaseBase, chunk, uploadChunksChan, nil)
stopped := uploadChunkWhenPossible(&pcWrapper, phaseBase, chunk, uploadChunksChan, nil)
assert.False(t, stopped)
stopped = uploadChunkWhenPossible(phaseBase, chunk, uploadChunksChan, nil)
stopped = uploadChunkWhenPossible(&pcWrapper, phaseBase, chunk, uploadChunksChan, nil)
assert.False(t, stopped)
assert.Equal(t, 2, curProcessedUploadChunks)
assert.Equal(t, 2, pcWrapper.totalProcessedUploadChunks)

runWaitGroup.Add(1)
go func() {
defer runWaitGroup.Done()
pollUploads(phaseBase, phaseBase.srcUpService, uploadChunksChan, doneChan, nil)
pollUploads(&pcWrapper, phaseBase, phaseBase.srcUpService, uploadChunksChan, doneChan, nil)
}()
// Let the whole process run for a few chunk status checks, then mark it as done.
time.Sleep(5 * waitTimeBetweenChunkStatusSeconds * time.Second)
Expand Down Expand Up @@ -433,14 +434,15 @@ func TestCheckChunkStatusSync(t *testing.T) {
manager.nodeToChunksMap[nodeIdForTest] = map[api.ChunkId]UploadedChunkData{}
manager.nodeToChunksMap[nodeIdForTest][firstUuidTokenForTest] = UploadedChunkData{}
manager.nodeToChunksMap[nodeIdForTest][secondUuidTokenForTest] = UploadedChunkData{}
pcWrapper := newProducerConsumerWrapper()
errChanMng := createErrorsChannelMng()
checkChunkStatusSync(&chunkStatus, &manager, &errChanMng)
checkChunkStatusSync(&pcWrapper, &chunkStatus, &manager, &errChanMng)
assert.Len(t, manager.nodeToChunksMap[nodeIdForTest], 2)
chunkStatus.ChunksStatus = chunkStatus.ChunksStatus[:len(chunkStatus.ChunksStatus)-1]
checkChunkStatusSync(&chunkStatus, &manager, &errChanMng)
checkChunkStatusSync(&pcWrapper, &chunkStatus, &manager, &errChanMng)
assert.Len(t, manager.nodeToChunksMap[nodeIdForTest], 1)
chunkStatus.ChunksStatus = chunkStatus.ChunksStatus[:len(chunkStatus.ChunksStatus)-1]
checkChunkStatusSync(&chunkStatus, &manager, &errChanMng)
checkChunkStatusSync(&pcWrapper, &chunkStatus, &manager, &errChanMng)
assert.Len(t, manager.nodeToChunksMap[nodeIdForTest], 0)
}

Expand Down
Loading

0 comments on commit 040b65d

Please sign in to comment.