Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer - Fix files count and size after stop #1013

Merged
merged 2 commits into from
Nov 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/fulltransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (m *fullTransferPhase) handleFoundFile(pcWrapper producerConsumerWrapper,
return
}
// Increment the files count in the directory's node in the snapshot manager, to track its progress.
err = node.IncrementFilesCount()
err = node.IncrementFilesCount(uint64(file.Size))
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func assertGetTransferStateAndSnapshot(t *testing.T, reset bool, expectedTransfe
func getRootAndAddSnapshotData(t *testing.T, stateManager *TransferStateManager) (root *reposnapshot.Node) {
root, err := stateManager.LookUpNode(".")
assert.NoError(t, err)
assert.NoError(t, root.IncrementFilesCount())
assert.NoError(t, root.IncrementFilesCount(10))
assert.NoError(t, root.AddChildNode("child", nil))
return
}
18 changes: 16 additions & 2 deletions artifactory/commands/transferfiles/state/statemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/jfrog/jfrog-cli-core/v2/utils/lock"
"github.com/jfrog/jfrog-client-go/utils/errorutils"
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
"github.com/jfrog/jfrog-client-go/utils/log"
)

// The interval in which to save the state and run transfer files to the file system.
Expand Down Expand Up @@ -67,6 +68,8 @@ func (ts *TransferStateManager) UnlockTransferStateManager() error {
// buildInfoRepo - True if build info repository
// reset - Delete the repository's previous transfer info
func (ts *TransferStateManager) SetRepoState(repoKey string, totalSizeBytes, totalFiles int64, buildInfoRepo, reset bool) error {
var transferredFiles uint32 = 0
var transferredSizeBytes uint64 = 0
err := ts.Action(func(*TransferState) error {
transferState, repoTransferSnapshot, err := getTransferStateAndSnapshot(repoKey, reset)
if err != nil {
Expand All @@ -75,6 +78,17 @@ func (ts *TransferStateManager) SetRepoState(repoKey string, totalSizeBytes, tot
transferState.CurrentRepo.Phase1Info.TotalSizeBytes = totalSizeBytes
transferState.CurrentRepo.Phase1Info.TotalUnits = totalFiles

if repoTransferSnapshot != nil && repoTransferSnapshot.loadedFromSnapshot {
transferredFiles, transferredSizeBytes, err = repoTransferSnapshot.snapshotManager.CalculateTransferredFilesAndSize()
if err != nil {
return err
}
log.Info("Calculated transferred files from previous run:", transferredFiles)
log.Info("Calculated transferred bytes from previous run:", transferredSizeBytes)
transferState.CurrentRepo.Phase1Info.TransferredUnits = int64(transferredFiles)
transferState.CurrentRepo.Phase1Info.TransferredSizeBytes = int64(transferredSizeBytes)
}

ts.TransferState = transferState
ts.repoTransferSnapshot = repoTransferSnapshot
return nil
Expand All @@ -87,8 +101,8 @@ func (ts *TransferStateManager) SetRepoState(repoKey string, totalSizeBytes, tot
transferRunStatus.BuildInfoRepo = buildInfoRepo
transferRunStatus.VisitedFolders = 0

transferRunStatus.OverallTransfer.TransferredUnits += ts.CurrentRepo.Phase1Info.TransferredUnits
transferRunStatus.OverallTransfer.TransferredSizeBytes += ts.CurrentRepo.Phase1Info.TransferredSizeBytes
transferRunStatus.OverallTransfer.TransferredUnits += int64(transferredFiles)
transferRunStatus.OverallTransfer.TransferredSizeBytes += int64(transferredSizeBytes)
return nil
})
}
Expand Down
57 changes: 49 additions & 8 deletions utils/reposnapshot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ type Node struct {
// Mutex is on the Node level to allow modifying non-conflicting content on multiple nodes simultaneously.
mutex sync.Mutex
// The files count is used to identify when handling a node is completed. It is only used during runtime, and is not persisted to disk for future runs.
filesCount uint32
filesCount uint32
totalFilesCount uint32
totalFilesSize uint64
NodeStatus
}

Expand All @@ -34,9 +36,11 @@ const (
// The wrapper only contains fields that are used in future runs, hence not all fields from Node are persisted.
// In addition, it does not hold the parent pointer to avoid cyclic reference on export.
type NodeExportWrapper struct {
Name string `json:"name,omitempty"`
Children []*NodeExportWrapper `json:"children,omitempty"`
Completed bool `json:"completed,omitempty"`
Name string `json:"name,omitempty"`
Children []*NodeExportWrapper `json:"children,omitempty"`
Completed bool `json:"completed,omitempty"`
TotalFilesCount uint32 `json:"total_files_count,omitempty"`
TotalFilesSize uint64 `json:"total_files_size,omitempty"`
}

type ActionOnNodeFunc func(node *Node) error
Expand All @@ -55,8 +59,10 @@ func (node *Node) convertToWrapper() (wrapper *NodeExportWrapper, err error) {
var children []*Node
err = node.action(func(node *Node) error {
wrapper = &NodeExportWrapper{
Name: node.name,
Completed: node.NodeStatus == Completed,
Name: node.name,
Completed: node.NodeStatus == Completed,
TotalFilesCount: node.totalFilesCount,
TotalFilesSize: node.totalFilesSize,
}
children = node.children
return nil
Expand All @@ -78,7 +84,9 @@ func (node *Node) convertToWrapper() (wrapper *NodeExportWrapper, err error) {
// Convert the loaded node export wrapper to node.
func (wrapper *NodeExportWrapper) convertToNode() *Node {
node := &Node{
name: wrapper.Name,
name: wrapper.Name,
totalFilesCount: wrapper.TotalFilesCount,
totalFilesSize: wrapper.TotalFilesSize,
}
// If node wasn't previously completed, we will start exploring it from scratch.
if wrapper.Completed {
Expand Down Expand Up @@ -128,18 +136,49 @@ func (node *Node) setCompleted() (err error) {
return
}

// Sum up all subtree directories with status "completed"
func (node *Node) CalculateTransferredFilesAndSize() (totalFilesCount uint32, totalFilesSize uint64, err error) {
var children []*Node
err = node.action(func(node *Node) error {
children = node.children
if node.NodeStatus == Completed {
totalFilesCount = node.totalFilesCount
totalFilesSize = node.totalFilesSize
}
return nil
})
if err != nil {
return
}
for _, child := range children {
childFilesCount, childTotalFilesSize, childErr := child.CalculateTransferredFilesAndSize()
if childErr != nil {
return 0, 0, childErr
}
totalFilesCount += childFilesCount
totalFilesSize += childTotalFilesSize
}
return
}

// Check if node completed - if done exploring, done handling files, children are completed.
func (node *Node) CheckCompleted() error {
isCompleted := false
err := node.action(func(node *Node) error {
if node.NodeStatus == Exploring || node.filesCount > 0 {
return nil
}
var totalFilesCount uint32 = 0
var totalFilesSize uint64 = 0
for _, child := range node.children {
totalFilesCount += child.totalFilesCount
totalFilesSize += child.totalFilesSize
if child.NodeStatus < Completed {
return nil
}
}
node.totalFilesCount += totalFilesCount
node.totalFilesSize += totalFilesSize
isCompleted = true
return nil
})
Expand All @@ -150,9 +189,11 @@ func (node *Node) CheckCompleted() error {
return node.setCompleted()
}

func (node *Node) IncrementFilesCount() error {
func (node *Node) IncrementFilesCount(fileSize uint64) error {
return node.action(func(node *Node) error {
node.filesCount++
node.totalFilesCount++
node.totalFilesSize += fileSize
return nil
})
}
Expand Down
81 changes: 80 additions & 1 deletion utils/reposnapshot/node_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package reposnapshot

import (
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

// Convert node to wrapper and back to verify conversions.
Expand All @@ -20,3 +21,81 @@ func TestConversions(t *testing.T) {
assert.Equal(t, ".", node2.parent.name)
assert.Equal(t, Completed, node2converted.NodeStatus)
}

func TestCheckCompleted(t *testing.T) {
zero, one, two := createThreeNodesTree(t)

// Set completed and expect false
checkCompleted(t, false, zero, one, two)

// Mark done exploring and zero all file counts
markDoneExploring(t, zero, one, two)
decrementFilesCount(t, one, two, two)

// Run check completed one all nodes from down to top
checkCompleted(t, true, two, one, zero)
}

func TestCalculateTransferredFilesAndSize(t *testing.T) {
zero, one, two := createThreeNodesTree(t)

// Run calculate and expect that the total files count and size in "zero" are zero
totalFilesCount, totalFilesSize, err := zero.CalculateTransferredFilesAndSize()
assert.NoError(t, err)
assert.Zero(t, totalFilesSize)
assert.Zero(t, totalFilesCount)

// Mark done exploring
markDoneExploring(t, zero, one, two)

// Zero the files count of "two"
decrementFilesCount(t, two, two)
checkCompleted(t, true, two)

// Run calculate and expect that "zero" will contain the files count and size of "two"
totalFilesCount, totalFilesSize, err = zero.CalculateTransferredFilesAndSize()
assert.NoError(t, err)
assert.EqualValues(t, 1, totalFilesSize)
assert.EqualValues(t, 2, totalFilesCount)

// Zero the file count of "one"
decrementFilesCount(t, one)
checkCompleted(t, true, one, zero)

// Run calculate and expect that "zero" will contain the files count and size of "one" and "two"
totalFilesCount, totalFilesSize, err = zero.CalculateTransferredFilesAndSize()
assert.NoError(t, err)
assert.EqualValues(t, 1, totalFilesSize)
assert.EqualValues(t, 3, totalFilesCount)
}

// Create the following tree structure 0 --> 1 -- > 2
func createThreeNodesTree(t *testing.T) (zero, one, two *Node) {
zero = createNodeBase(t, "0", 0, nil)
one = createNodeBase(t, "1", 1, zero)
two = createNodeBase(t, "2", 2, one)
addChildren(zero, one)
addChildren(one, two)
return
}

func checkCompleted(t *testing.T, expected bool, nodes ...*Node) {
for _, node := range nodes {
assert.NoError(t, node.CheckCompleted())
actual, err := node.IsCompleted()
assert.NoError(t, err)
assert.Equal(t, expected, actual)
}
}

func markDoneExploring(t *testing.T, nodes ...*Node) {
for _, node := range nodes {
assert.NoError(t, node.MarkDoneExploring())
}
}

func decrementFilesCount(t *testing.T, nodes ...*Node) {
for _, node := range nodes {
assert.NoError(t, node.DecrementFilesCount())
}
}
9 changes: 8 additions & 1 deletion utils/reposnapshot/snapshotmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/jfrog/gofrog/lru"
"github.com/jfrog/jfrog-client-go/utils/errorutils"
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
"strings"
)

// Represents a snapshot of a repository being traversed to do a certain action.
Expand Down Expand Up @@ -82,6 +83,12 @@ func (sm *RepoSnapshotManager) PersistRepoSnapshot() error {
return sm.root.convertAndSaveToFile(sm.snapshotFilePath)
}

// Return the count and size of files that have been successfully transferred and their respective directories are marked as complete,
// ensuring they won't be transferred again. This data helps in estimating the remaining files for transfer after stopping.
func (sm *RepoSnapshotManager) CalculateTransferredFilesAndSize() (totalFilesCount uint32, totalFilesSize uint64, err error) {
return sm.root.CalculateTransferredFilesAndSize()
}

// Returns the node corresponding to the directory in the provided relative path. Path should be provided without the repository name.
func (sm *RepoSnapshotManager) LookUpNode(relativePath string) (requestedNode *Node, err error) {
if relativePath == "" {
Expand Down
51 changes: 31 additions & 20 deletions utils/reposnapshot/snapshotmanager_test.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,48 @@
package reposnapshot

import (
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
"github.com/stretchr/testify/assert"
"encoding/json"
"os"
"path"
"path/filepath"
"testing"

clientutils "github.com/jfrog/jfrog-client-go/utils"
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
"github.com/stretchr/testify/assert"
)

const dummyRepoKey = "dummy-repo-local"

var expectedFile = filepath.Join("testdata", dummyRepoKey)

func TestLoad(t *testing.T) {
t.Run("repo snapshot doesn't exist", func(t *testing.T) { testLoad(t, "/path/to/file", false, CreateNewNode(".", nil)) })
t.Run("repo snapshot exists", func(t *testing.T) { testLoad(t, expectedFile, true, createTestSnapshotTree(t)) })
func TestLoadDoesNotExist(t *testing.T) {
_, exists, err := LoadRepoSnapshotManager(dummyRepoKey, "/path/to/file")
assert.NoError(t, err)
assert.False(t, exists)
}

func testLoad(t *testing.T, snapshotPath string, expectedExists bool, expectedRoot *Node) {
sm, exists, err := LoadRepoSnapshotManager(dummyRepoKey, snapshotPath)
func TestLoad(t *testing.T) {
sm, exists, err := LoadRepoSnapshotManager(dummyRepoKey, expectedFile)
assert.NoError(t, err)
assert.Equal(t, expectedExists, exists)
if expectedExists {
// Convert to wrapper in order to compare.
expectedWrapper, err := expectedRoot.convertToWrapper()
assert.NoError(t, err)
rootWrapper, err := sm.root.convertToWrapper()
assert.NoError(t, err)
assert.Equal(t, expectedWrapper, rootWrapper)
assert.Equal(t, snapshotPath, sm.snapshotFilePath)
assert.Equal(t, dummyRepoKey, sm.repoKey)
}
assert.True(t, exists)
// Convert to wrapper in order to compare
expectedRoot := createTestSnapshotTree(t)
expectedWrapper, err := expectedRoot.convertToWrapper()
assert.NoError(t, err)
rootWrapper, err := sm.root.convertToWrapper()
assert.NoError(t, err)

// Marshal json to compare strings
expected, err := json.Marshal(expectedWrapper)
assert.NoError(t, err)
actual, err := json.Marshal(rootWrapper)
assert.NoError(t, err)

// Compare
assert.Equal(t, clientutils.IndentJson(expected), clientutils.IndentJson(actual))
assert.Equal(t, expectedFile, sm.snapshotFilePath)
assert.Equal(t, dummyRepoKey, sm.repoKey)
}

func TestSaveToFile(t *testing.T) {
Expand All @@ -43,7 +54,7 @@ func TestSaveToFile(t *testing.T) {
assert.NoError(t, err)
actual, err := os.ReadFile(manager.snapshotFilePath)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
assert.Equal(t, clientutils.IndentJson(expected), clientutils.IndentJson(actual))
}

func TestNodeCompletedAndTreeCollapsing(t *testing.T) {
Expand Down Expand Up @@ -179,7 +190,7 @@ func createNodeBase(t *testing.T, name string, filesCount int, parent *Node) *No
node := CreateNewNode(name, parent)
node.NodeStatus = DoneExploring
for i := 0; i < filesCount; i++ {
assert.NoError(t, node.IncrementFilesCount())
assert.NoError(t, node.IncrementFilesCount(uint64(i)))
}
return node
}
Expand Down
Loading
Loading