diff --git a/cmd/oras/internal/display/status/progress/manager.go b/cmd/oras/internal/display/status/progress/manager.go index 2b526e47e..d0a3186dd 100644 --- a/cmd/oras/internal/display/status/progress/manager.go +++ b/cmd/oras/internal/display/status/progress/manager.go @@ -45,19 +45,23 @@ type manager struct { status []*status statusLock sync.RWMutex console *console.Console + actionPrompt string + donePrompt string updating sync.WaitGroup renderDone chan struct{} renderClosed chan struct{} } // NewManager initialized a new progress manager. -func NewManager(f *os.File) (Manager, error) { - c, err := console.New(f) +func NewManager(actionPrompt string, donePrompt string, tty *os.File) (Manager, error) { + c, err := console.New(tty) if err != nil { return nil, err } m := &manager{ console: c, + actionPrompt: actionPrompt, + donePrompt: donePrompt, renderDone: make(chan struct{}), renderClosed: make(chan struct{}), } @@ -131,15 +135,15 @@ func (m *manager) SendAndStop(desc ocispec.Descriptor, prompt string) error { } func (m *manager) statusChan(s *status) *Messenger { - ch := make(chan *status, BufferSize) + messenger := NewMessenger(m.actionPrompt, m.donePrompt) m.updating.Add(1) go func() { defer m.updating.Done() - for newStatus := range ch { + for newStatus := range messenger.ch { s.update(newStatus) } }() - return &Messenger{ch: ch} + return messenger } // Close stops all status and waits for updating and rendering. diff --git a/cmd/oras/internal/display/status/progress/messenger.go b/cmd/oras/internal/display/status/progress/messenger.go index 9f0188b5a..86cc6a0f7 100644 --- a/cmd/oras/internal/display/status/progress/messenger.go +++ b/cmd/oras/internal/display/status/progress/messenger.go @@ -23,8 +23,19 @@ import ( // Messenger is progress message channel. type Messenger struct { - ch chan *status - closed bool + ch chan *status + actionPrompt string + donePrompt string + closed bool +} + +func NewMessenger(actionPrompt, donePrompt string) *Messenger { + ch := make(chan *status, BufferSize) + return &Messenger{ + ch: ch, + actionPrompt: actionPrompt, + donePrompt: donePrompt, + } } // Start initializes the messenger. @@ -50,6 +61,16 @@ func (sm *Messenger) Send(prompt string, descriptor ocispec.Descriptor, offset i } } +// SendAction send the action status message. +func (sm *Messenger) SendAction(descriptor ocispec.Descriptor, offset int64) { + sm.Send(sm.actionPrompt, descriptor, offset) +} + +// SendDone send the done status message. +func (sm *Messenger) SendDone(descriptor ocispec.Descriptor, offset int64) { + sm.Send(sm.donePrompt, descriptor, offset) +} + // Stop the messenger after sending a end message. func (sm *Messenger) Stop() { if sm.closed { diff --git a/cmd/oras/internal/display/status/progress/messenger_test.go b/cmd/oras/internal/display/status/progress/messenger_test.go index a8b782e55..0c8ecd93b 100644 --- a/cmd/oras/internal/display/status/progress/messenger_test.go +++ b/cmd/oras/internal/display/status/progress/messenger_test.go @@ -22,12 +22,11 @@ import ( func Test_Messenger(t *testing.T) { var msg *status - ch := make(chan *status, BufferSize) - messenger := &Messenger{ch: ch} + messenger := NewMessenger("Reading", "Read") messenger.Start() select { - case msg = <-ch: + case msg = <-messenger.ch: if msg.offset != -1 { t.Errorf("Expected start message with offset -1, got %d", msg.offset) } @@ -42,7 +41,7 @@ func Test_Messenger(t *testing.T) { expected := int64(50) messenger.Send("Reading", desc, expected) select { - case msg = <-ch: + case msg = <-messenger.ch: if msg.offset != expected { t.Errorf("Expected status message with offset %d, got %d", expected, msg.offset) } @@ -56,7 +55,7 @@ func Test_Messenger(t *testing.T) { messenger.Send("Reading", desc, expected) messenger.Send("Read", desc, desc.Size) select { - case msg = <-ch: + case msg = <-messenger.ch: if msg.offset != desc.Size { t.Errorf("Expected status message with offset %d, got %d", expected, msg.offset) } @@ -67,7 +66,7 @@ func Test_Messenger(t *testing.T) { t.Error("Expected status message") } select { - case msg = <-ch: + case msg = <-messenger.ch: t.Errorf("Unexpected status message %v", msg) default: } @@ -75,7 +74,7 @@ func Test_Messenger(t *testing.T) { expected = int64(-1) messenger.Stop() select { - case msg = <-ch: + case msg = <-messenger.ch: if msg.offset != expected { t.Errorf("Expected END status message with offset %d, got %d", expected, msg.offset) } @@ -85,7 +84,7 @@ func Test_Messenger(t *testing.T) { messenger.Stop() select { - case msg = <-ch: + case msg = <-messenger.ch: if msg != nil { t.Errorf("Unexpected status message %v", msg) } diff --git a/cmd/oras/internal/display/status/track/reader.go b/cmd/oras/internal/display/status/track/reader.go index 93919381f..1b36f4a70 100644 --- a/cmd/oras/internal/display/status/track/reader.go +++ b/cmd/oras/internal/display/status/track/reader.go @@ -17,56 +17,38 @@ package track import ( "io" - "os" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras/cmd/oras/internal/display/status/progress" ) -type reader struct { - base io.Reader - offset int64 - actionPrompt string - donePrompt string - descriptor ocispec.Descriptor - manager progress.Manager - messenger *progress.Messenger +type Reader interface { + io.Reader + Done() + Close() + Start() } -// NewReader returns a new reader with tracked progress. -func NewReader(r io.Reader, descriptor ocispec.Descriptor, actionPrompt string, donePrompt string, tty *os.File) (*reader, error) { - manager, err := progress.NewManager(tty) - if err != nil { - return nil, err - } - return managedReader(r, descriptor, manager, actionPrompt, donePrompt) +type reader struct { + base io.Reader + offset int64 + descriptor ocispec.Descriptor + messenger *progress.Messenger } -func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager progress.Manager, actionPrompt string, donePrompt string) (*reader, error) { - messenger, err := manager.Add() - if err != nil { - return nil, err +// NewReader returns a new reader with tracked progress. +func NewReader(r io.Reader, descriptor ocispec.Descriptor, messenger *progress.Messenger) Reader { + tr := reader{ + base: r, + descriptor: descriptor, + messenger: messenger, } - - return &reader{ - base: r, - descriptor: descriptor, - actionPrompt: actionPrompt, - donePrompt: donePrompt, - manager: manager, - messenger: messenger, - }, nil -} - -// StopManager stops the messenger channel and related manager. -func (r *reader) StopManager() { - r.Close() - _ = r.manager.Close() + return &tr } // Done sends message to mark the tracked progress as complete. func (r *reader) Done() { - r.messenger.Send(r.donePrompt, r.descriptor, r.descriptor.Size) + r.messenger.SendDone(r.descriptor, r.descriptor.Size) r.messenger.Stop() } @@ -93,6 +75,6 @@ func (r *reader) Read(p []byte) (int, error) { return n, io.ErrUnexpectedEOF } } - r.messenger.Send(r.actionPrompt, r.descriptor, r.offset) + r.messenger.SendAction(r.descriptor, r.offset) return n, err } diff --git a/cmd/oras/internal/display/status/track/target.go b/cmd/oras/internal/display/status/track/target.go index 5c704ebbc..e13c9a990 100644 --- a/cmd/oras/internal/display/status/track/target.go +++ b/cmd/oras/internal/display/status/track/target.go @@ -35,9 +35,7 @@ type GraphTarget interface { type graphTarget struct { oras.GraphTarget - manager progress.Manager - actionPrompt string - donePrompt string + manager progress.Manager } type referenceGraphTarget struct { @@ -46,15 +44,13 @@ type referenceGraphTarget struct { // NewTarget creates a new tracked Target. func NewTarget(t oras.GraphTarget, actionPrompt, donePrompt string, tty *os.File) (GraphTarget, error) { - manager, err := progress.NewManager(tty) + manager, err := progress.NewManager(actionPrompt, donePrompt, tty) if err != nil { return nil, err } gt := &graphTarget{ - GraphTarget: t, - manager: manager, - actionPrompt: actionPrompt, - donePrompt: donePrompt, + GraphTarget: t, + manager: manager, } if _, ok := t.(registry.ReferencePusher); ok { @@ -74,7 +70,13 @@ func (t *graphTarget) Mount(ctx context.Context, desc ocispec.Descriptor, fromRe // Push pushes the content to the base oras.GraphTarget with tracking. func (t *graphTarget) Push(ctx context.Context, expected ocispec.Descriptor, content io.Reader) error { - r, err := managedReader(content, expected, t.manager, t.actionPrompt, t.donePrompt) + messenger, err := t.manager.Add() + if err != nil { + return err + } + defer messenger.Stop() + + r := NewReader(content, expected, messenger) if err != nil { return err } @@ -89,7 +91,13 @@ func (t *graphTarget) Push(ctx context.Context, expected ocispec.Descriptor, con // PushReference pushes the content to the base oras.GraphTarget with tracking. func (rgt *referenceGraphTarget) PushReference(ctx context.Context, expected ocispec.Descriptor, content io.Reader, reference string) error { - r, err := managedReader(content, expected, rgt.manager, rgt.actionPrompt, rgt.donePrompt) + messenger, err := rgt.manager.Add() + if err != nil { + return err + } + defer messenger.Stop() + + r := NewReader(content, expected, messenger) if err != nil { return err } diff --git a/cmd/oras/root/blob/fetch.go b/cmd/oras/root/blob/fetch.go index 44694c428..ea6fccd7e 100644 --- a/cmd/oras/root/blob/fetch.go +++ b/cmd/oras/root/blob/fetch.go @@ -28,6 +28,7 @@ import ( "oras.land/oras-go/v2/registry/remote" "oras.land/oras/cmd/oras/internal/argument" "oras.land/oras/cmd/oras/internal/command" + "oras.land/oras/cmd/oras/internal/display/status/progress" "oras.land/oras/cmd/oras/internal/display/status/track" oerrors "oras.land/oras/cmd/oras/internal/errors" "oras.land/oras/cmd/oras/internal/option" @@ -170,12 +171,20 @@ func (opts *fetchBlobOptions) doFetch(ctx context.Context, src oras.ReadOnlyTarg return ocispec.Descriptor{}, err } } else { - // TTY output - trackedReader, err := track.NewReader(vr, desc, "Downloading", "Downloaded ", opts.TTY) + manager, err := progress.NewManager("Downloading", "Downloaded ", opts.TTY) if err != nil { - return ocispec.Descriptor{}, err + return desc, err } - defer trackedReader.StopManager() + defer manager.Close() + + messenger, err := manager.Add() + if err != nil { + return desc, err + } + defer messenger.Stop() + + // TTY output + trackedReader := track.NewReader(vr, desc, messenger) trackedReader.Start() if _, err = io.Copy(writer, trackedReader); err != nil { return ocispec.Descriptor{}, err diff --git a/cmd/oras/root/blob/push.go b/cmd/oras/root/blob/push.go index 6bb126357..f86f3055c 100644 --- a/cmd/oras/root/blob/push.go +++ b/cmd/oras/root/blob/push.go @@ -19,6 +19,7 @@ import ( "context" "errors" "io" + "oras.land/oras/cmd/oras/internal/display/status/progress" "os" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -154,12 +155,20 @@ func (opts *pushBlobOptions) doPush(ctx context.Context, printer *output.Printer return printer.PrintStatus(desc, "Uploaded ") } - // TTY output - trackedReader, err := track.NewReader(r, desc, "Uploading", "Uploaded ", opts.TTY) + manager, err := progress.NewManager("Uploading", "Uploaded ", opts.TTY) if err != nil { return err } - defer trackedReader.StopManager() + defer manager.Close() + + messenger, err := manager.Add() + if err != nil { + return err + } + defer messenger.Stop() + + // TTY output + trackedReader := track.NewReader(r, desc, messenger) trackedReader.Start() r = trackedReader if err := t.Push(ctx, desc, r); err != nil {