Skip to content

Commit

Permalink
Merge branch 'dev' into fix-precheck
Browse files Browse the repository at this point in the history
  • Loading branch information
yahavi committed Nov 26, 2023
2 parents 067a0ce + 2c2ac13 commit 5878f10
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 233 deletions.
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/longpropertycheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (lpc *LongPropertyCheck) longPropertiesTaskProducer(progress *progressbar.T
if long := isLongProperty(property); long {
log.Debug(fmt.Sprintf(`Found long property ('@%s':'%s')`, property.Key, property.Value))
if lpc.producerConsumer != nil {
lpc.producerConsumer.AddTaskWithError(lpc.createSearchPropertyTask(property, args, progress), lpc.errorsQueue.AddError)
_, _ = lpc.producerConsumer.AddTaskWithError(lpc.createSearchPropertyTask(property, args, progress), lpc.errorsQueue.AddError)
}
if progress != nil {
progress.IncGeneralProgressTotalBy(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func testSearchPropertiesInFiles(t *testing.T, properties []Property, specificRe
testServer, serverDetails, _ := getLongPropertyCheckStubServer(t, properties, propertiesFiles, false)
defer testServer.Close()

longPropertyCheck := NewLongPropertyCheck(specificRepos, true)
longPropertyCheck := NewLongPropertyCheck(specificRepos, false)
longPropertyCheck.producerConsumer = parallel.NewRunner(threadCount, maxThreadCapacity, false)
longPropertyCheck.filesChan = make(chan FileWithLongProperty, threadCount)
longPropertyCheck.errorsQueue = clientutils.NewErrorsQueue(1)
Expand Down
2 changes: 2 additions & 0 deletions artifactory/commands/transferfiles/state/runstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type ActionOnStatusFunc func(transferRunStatus *TransferRunStatus) error
// This state is used to allow showing the current run status by the 'jf rt transfer-files --status' command.
// It is also used for the time estimation and more.
type TransferRunStatus struct {
// Timestamp since the beginning of the current transfer execution
startTimestamp time.Time
lastSaveTimestamp time.Time
// This variable holds the total/transferred number of repositories (not their files).
OverallTransfer ProgressState `json:"overall_transfer,omitempty"`
Expand Down
42 changes: 34 additions & 8 deletions artifactory/commands/transferfiles/state/statemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ func (ts *TransferStateManager) UnlockTransferStateManager() error {
return ts.unlockStateManager()
}

func (ts *TransferStateManager) SetStartTimestamp(startTimestamp time.Time) {
ts.startTimestamp = startTimestamp
}

func (ts *TransferStateManager) GetStartTimestamp() time.Time {
return ts.startTimestamp
}

// Set the repository state.
// repoKey - Repository key
// totalSizeBytes - Repository size in bytes
Expand Down Expand Up @@ -394,21 +402,39 @@ func (ts *TransferStateManager) tryLockStateManager() error {
return nil
}

func getStartTimestamp() (int64, error) {
func (ts *TransferStateManager) Running() (running bool, err error) {
lockDirPath, err := coreutils.GetJfrogTransferLockDir()
if err != nil {
return 0, err
return false, err
}
return lock.GetLastLockTimestamp(lockDirPath)
var startTimestamp int64
startTimestamp, err = lock.GetLastLockTimestamp(lockDirPath)
return err == nil && startTimestamp != 0, err
}

func GetRunningTime() (runningTime string, isRunning bool, err error) {
startTimestamp, err := getStartTimestamp()
func (ts *TransferStateManager) InitStartTimestamp() (running bool, err error) {
if !ts.startTimestamp.IsZero() {
return true, nil
}
lockDirPath, err := coreutils.GetJfrogTransferLockDir()
if err != nil {
return false, err
}
var startTimestamp int64
startTimestamp, err = lock.GetLastLockTimestamp(lockDirPath)
if err != nil || startTimestamp == 0 {
return
return false, err
}
ts.startTimestamp = time.Unix(0, startTimestamp)
return true, nil
}

func (ts *TransferStateManager) GetRunningTimeString() (runningTime string) {
if ts.startTimestamp.IsZero() {
return ""
}
runningSecs := int64(time.Since(time.Unix(0, startTimestamp)).Seconds())
return SecondsToLiteralTime(runningSecs, ""), true, nil
runningSecs := int64(time.Since(ts.startTimestamp).Seconds())
return SecondsToLiteralTime(runningSecs, "")
}

func UpdateChunkInState(stateManager *TransferStateManager, chunk *api.ChunkStatus) (err error) {
Expand Down
75 changes: 75 additions & 0 deletions artifactory/commands/transferfiles/state/statemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,78 @@ func TestTryLockStateManager(t *testing.T) {
assert.NoError(t, stateManager.tryLockStateManager())
assert.ErrorIs(t, new(AlreadyLockedError), stateManager.tryLockStateManager())
}

func TestRunning(t *testing.T) {
stateManager, cleanUp := InitStateTest(t)
defer cleanUp()

// Assert no running=false
running, err := stateManager.Running()
assert.NoError(t, err)
assert.False(t, running)

// Lock to simulate transfer
assert.NoError(t, stateManager.TryLockTransferStateManager())

// Assert running=true
running, err = stateManager.Running()
assert.NoError(t, err)
assert.True(t, running)
}

func TestInitStartTimestamp(t *testing.T) {
stateManager, cleanUp := InitStateTest(t)
defer cleanUp()

// Init start timestamp and expect timestamp zero
running, err := stateManager.InitStartTimestamp()
assert.NoError(t, err)
assert.False(t, running)
assert.True(t, stateManager.startTimestamp.IsZero())

// Lock to simulate transfer
assert.NoError(t, stateManager.TryLockTransferStateManager())

// Init start timestamp and expect timestamp non-zero
running, err = stateManager.InitStartTimestamp()
assert.NoError(t, err)
assert.True(t, running)
assert.False(t, stateManager.startTimestamp.IsZero())
}

var getRunningTimeStringCases = []struct {
startTimestamp time.Time
expectedString string
}{
{time.Now(), "Less than a minute"},
{time.Now().Add(-time.Second), "Less than a minute"},
{time.Now().Add(-time.Minute), "1 minute"},
{time.Now().Add(-time.Hour), "1 hour"},
{time.Now().Add(-time.Hour).Add(time.Minute), "59 minutes"},
{time.Now().Add(-time.Hour).Add(time.Minute).Add(10 * time.Second), "58 minutes"},
}

func TestGetRunningTimeString(t *testing.T) {
stateManager, cleanUp := InitStateTest(t)
defer cleanUp()

runningTime := stateManager.GetRunningTimeString()
assert.Empty(t, runningTime)

// Lock and init start timestamp to simulate transfer
assert.NoError(t, stateManager.TryLockTransferStateManager())
running, err := stateManager.InitStartTimestamp()
assert.NoError(t, err)
assert.True(t, running)

// Run test cases
for _, testCase := range getRunningTimeStringCases {
t.Run(testCase.startTimestamp.String(), func(t *testing.T) {
// Set start timestamp
stateManager.startTimestamp = testCase.startTimestamp

// Assert running time string
assert.Equal(t, testCase.expectedString, stateManager.GetRunningTimeString())
})
}
}
130 changes: 35 additions & 95 deletions artifactory/commands/transferfiles/state/timeestimation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@ package state

import (
"fmt"
"time"

"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/utils"

"github.com/jfrog/jfrog-client-go/utils/log"
)

const (
milliSecsInSecond = 1000
bytesInMB = 1024 * 1024
bytesPerMilliSecToMBPerSec = float64(milliSecsInSecond) / float64(bytesInMB)
// Precalculated average index time per build info, in seconds. This constant is used to estimate the processing time of all
// the build info files about to be transferred. Since the build info indexing time is not related to its file size,
// the estimation approach we use with data is irrelevant.
buildInfoAverageIndexTimeSec = 1.25
milliSecsInSecond = 1000
bytesInMB = 1024 * 1024
bytesPerMilliSecToMBPerSec = float64(milliSecsInSecond) / float64(bytesInMB)
minTransferTimeToShowEstimation = time.Minute * 5
)

type timeTypeSingular string
Expand All @@ -36,15 +33,14 @@ type TimeEstimationManager struct {
LastSpeedsSum float64 `json:"last_speeds_sum,omitempty"`
// The last calculated sum of speeds, in bytes/ms
SpeedsAverage float64 `json:"speeds_average,omitempty"`
// Data estimated remaining time is saved so that it can be used when handling a build-info repository and speed cannot be calculated.
DataEstimatedRemainingTime int64 `json:"data_estimated_remaining_time,omitempty"`
// Total transferred bytes since the beginning of the current transfer execution
CurrentTotalTransferredBytes uint64 `json:"current_total_transferred_bytes,omitempty"`
// The state manager
stateManager *TransferStateManager
}

func (tem *TimeEstimationManager) AddChunkStatus(chunkStatus api.ChunkStatus, durationMillis int64) {
// Build info repository requires no action here (transferred counter is updated in the state manager and no other calculation is needed).
if durationMillis == 0 || tem.stateManager.BuildInfoRepo {
if durationMillis == 0 {
return
}

Expand All @@ -54,7 +50,10 @@ func (tem *TimeEstimationManager) AddChunkStatus(chunkStatus api.ChunkStatus, du
func (tem *TimeEstimationManager) addDataChunkStatus(chunkStatus api.ChunkStatus, durationMillis int64) {
var chunkSizeBytes int64
for _, file := range chunkStatus.Files {
if file.Status == api.Success && !file.ChecksumDeployed {
if file.Status != api.Fail {
tem.CurrentTotalTransferredBytes += uint64(file.SizeBytes)
}
if (file.Status == api.Success || file.Status == api.SkippedLargeProps) && !file.ChecksumDeployed {
chunkSizeBytes += file.SizeBytes
}
}
Expand Down Expand Up @@ -96,103 +95,44 @@ func (tem *TimeEstimationManager) getSpeed() float64 {
return tem.SpeedsAverage * bytesPerMilliSecToMBPerSec
}

// GetSpeedString gets the transfer speed in an easy-to-read string.
// GetSpeedString gets the transfer speed as an easy-to-read string.
func (tem *TimeEstimationManager) GetSpeedString() string {
if tem.stateManager.BuildInfoRepo {
return "Not available while transferring a build-info repository"
}
if len(tem.LastSpeeds) == 0 {
return "Not available yet"
}
return fmt.Sprintf("%.3f MB/s", tem.getSpeed())
}

// getEstimatedRemainingTime gets the estimated remaining time in seconds.
// The estimated remaining time is the sum of:
// 1. Data estimated remaining time, derived by the average speed and remaining data size.
// 2. Build info estimated remaining time, derived by a precalculated average time per build info.
func (tem *TimeEstimationManager) getEstimatedRemainingTime() (int64, error) {
err := tem.calculateDataEstimatedRemainingTime()
if err != nil {
return 0, err
}
return tem.DataEstimatedRemainingTime + tem.getBuildInfoEstimatedRemainingTime(), nil
}

// calculateDataEstimatedRemainingTime calculates the data estimated remaining time in seconds, and sets it to the corresponding
// variable in the estimation manager.
func (tem *TimeEstimationManager) calculateDataEstimatedRemainingTime() error {
// If a build info repository is currently being handled, use the data estimated time previously calculated.
// Else, start calculating when the speeds average is set.
if tem.stateManager.BuildInfoRepo || tem.SpeedsAverage == 0 {
return nil
}
transferredSizeBytes, err := tem.stateManager.GetTransferredSizeBytes()
if err != nil {
return err
}

// In case we reach a situation where we transfer more data than expected, we cannot estimate how long transferring the remaining data will take.
if tem.stateManager.OverallTransfer.TotalSizeBytes <= transferredSizeBytes {
tem.DataEstimatedRemainingTime = 0
return nil
// GetEstimatedRemainingTimeString gets the estimated remaining time as an easy-to-read string.
// Return "Not available yet" in the following cases:
// 1. 5 minutes not passed since the beginning of the transfer
// 2. No files transferred
// 3. The transfer speed is less than 1 byte per second
func (tem *TimeEstimationManager) GetEstimatedRemainingTimeString() string {
remainingTimeSec := tem.getEstimatedRemainingSeconds()
if remainingTimeSec == 0 {
return "Not available yet"
}

// We only convert to int64 at the end to avoid a scenario where the conversion of SpeedsAverage returns zero.
remainingTime := float64(tem.stateManager.OverallTransfer.TotalSizeBytes-transferredSizeBytes) / tem.SpeedsAverage
// Convert from milliseconds to seconds.
tem.DataEstimatedRemainingTime = int64(remainingTime) / milliSecsInSecond
return nil
return SecondsToLiteralTime(int64(remainingTimeSec), "About ")
}

func (tem *TimeEstimationManager) getBuildInfoEstimatedRemainingTime() int64 {
if tem.stateManager.OverallBiFiles.TotalUnits <= tem.stateManager.OverallBiFiles.TransferredUnits {
func (tem *TimeEstimationManager) getEstimatedRemainingSeconds() uint64 {
if tem.CurrentTotalTransferredBytes == 0 {
// No files transferred
return 0
}

workingThreads, err := tem.getWorkingThreadsForBuildInfoEstimation()
if err != nil {
log.Error("Couldn't calculate time estimation:", err.Error())
duration := time.Since(tem.stateManager.startTimestamp)
if duration < minTransferTimeToShowEstimation {
// 5 minutes not yet passed
return 0
}

remainingBiFiles := float64(tem.stateManager.OverallBiFiles.TotalUnits - tem.stateManager.OverallBiFiles.TransferredUnits)
remainingTime := remainingBiFiles * buildInfoAverageIndexTimeSec / float64(workingThreads)
return int64(remainingTime)
}

func (tem *TimeEstimationManager) getWorkingThreadsForBuildInfoEstimation() (int, error) {
workingThreads, err := tem.stateManager.GetWorkingThreads()
if err != nil {
return 0, err
}
// If the uploader didn't start working, temporarily display estimation according to one thread.
if workingThreads == 0 {
return 1, nil
}
// If currently handling a data repository and the number of threads is high, show build info estimation according to the build info threads limit.
if workingThreads > utils.MaxBuildInfoThreads {
return utils.MaxBuildInfoThreads, nil
}
return workingThreads, nil
}

// GetEstimatedRemainingTimeString gets the estimated remaining time in an easy-to-read string.
func (tem *TimeEstimationManager) GetEstimatedRemainingTimeString() string {
if !tem.isTimeEstimationAvailable() {
return "Not available in this phase"
}
if !tem.stateManager.BuildInfoRepo && len(tem.LastSpeeds) == 0 {
return "Not available yet"
}
remainingTimeSec, err := tem.getEstimatedRemainingTime()
if err != nil {
return err.Error()
transferredBytesInSeconds := tem.CurrentTotalTransferredBytes / uint64(duration.Seconds())
if transferredBytesInSeconds == 0 {
// Less than 1 byte per second
return 0
}

return SecondsToLiteralTime(remainingTimeSec, "About ")
}

func (tem *TimeEstimationManager) isTimeEstimationAvailable() bool {
return tem.stateManager.CurrentRepoPhase == api.Phase1 || tem.stateManager.CurrentRepoPhase == api.Phase3
remainingBytes := tem.stateManager.OverallTransfer.TotalSizeBytes - tem.stateManager.OverallTransfer.TransferredSizeBytes
return uint64(remainingBytes) / transferredBytesInSeconds
}
Loading

0 comments on commit 5878f10

Please sign in to comment.