Skip to content

Commit

Permalink
refactor: simplify reader with manager and messenger
Browse files Browse the repository at this point in the history
Signed-off-by: Terry Howe <terrylhowe@gmail.com>
  • Loading branch information
TerryHowe committed Aug 14, 2024
1 parent 8986d6d commit 83dc8ec
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 69 deletions.
14 changes: 9 additions & 5 deletions cmd/oras/internal/display/status/progress/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,23 @@ type manager struct {
status []*status
statusLock sync.RWMutex
console *console.Console
actionPrompt string
donePrompt string
updating sync.WaitGroup
renderDone chan struct{}
renderClosed chan struct{}
}

// NewManager initialized a new progress manager.
func NewManager(f *os.File) (Manager, error) {
c, err := console.New(f)
func NewManager(actionPrompt string, donePrompt string, tty *os.File) (Manager, error) {
c, err := console.New(tty)

Check warning on line 57 in cmd/oras/internal/display/status/progress/manager.go

View check run for this annotation

Codecov / codecov/patch

cmd/oras/internal/display/status/progress/manager.go#L56-L57

Added lines #L56 - L57 were not covered by tests
if err != nil {
return nil, err
}
m := &manager{
console: c,
actionPrompt: actionPrompt,
donePrompt: donePrompt,
renderDone: make(chan struct{}),
renderClosed: make(chan struct{}),

Check warning on line 66 in cmd/oras/internal/display/status/progress/manager.go

View check run for this annotation

Codecov / codecov/patch

cmd/oras/internal/display/status/progress/manager.go#L63-L66

Added lines #L63 - L66 were not covered by tests
}
Expand Down Expand Up @@ -131,15 +135,15 @@ func (m *manager) SendAndStop(desc ocispec.Descriptor, prompt string) error {
}

func (m *manager) statusChan(s *status) *Messenger {
ch := make(chan *status, BufferSize)
messenger := NewMessenger(m.actionPrompt, m.donePrompt)
m.updating.Add(1)
go func() {
defer m.updating.Done()
for newStatus := range ch {
for newStatus := range messenger.ch {
s.update(newStatus)
}
}()
return &Messenger{ch: ch}
return messenger
}

// Close stops all status and waits for updating and rendering.
Expand Down
25 changes: 23 additions & 2 deletions cmd/oras/internal/display/status/progress/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,19 @@ import (

// Messenger is progress message channel.
type Messenger struct {
ch chan *status
closed bool
ch chan *status
actionPrompt string
donePrompt string
closed bool
}

func NewMessenger(actionPrompt, donePrompt string) *Messenger {
ch := make(chan *status, BufferSize)
return &Messenger{
ch: ch,
actionPrompt: actionPrompt,
donePrompt: donePrompt,
}
}

// Start initializes the messenger.
Expand All @@ -50,6 +61,16 @@ func (sm *Messenger) Send(prompt string, descriptor ocispec.Descriptor, offset i
}
}

// SendAction send the action status message.
func (sm *Messenger) SendAction(descriptor ocispec.Descriptor, offset int64) {
sm.Send(sm.actionPrompt, descriptor, offset)
}

// SendDone send the done status message.
func (sm *Messenger) SendDone(descriptor ocispec.Descriptor, offset int64) {
sm.Send(sm.donePrompt, descriptor, offset)

Check warning on line 71 in cmd/oras/internal/display/status/progress/messenger.go

View check run for this annotation

Codecov / codecov/patch

cmd/oras/internal/display/status/progress/messenger.go#L70-L71

Added lines #L70 - L71 were not covered by tests
}

// Stop the messenger after sending a end message.
func (sm *Messenger) Stop() {
if sm.closed {
Expand Down
15 changes: 7 additions & 8 deletions cmd/oras/internal/display/status/progress/messenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ import (

func Test_Messenger(t *testing.T) {
var msg *status
ch := make(chan *status, BufferSize)
messenger := &Messenger{ch: ch}
messenger := NewMessenger("Reading", "Read")

messenger.Start()
select {
case msg = <-ch:
case msg = <-messenger.ch:
if msg.offset != -1 {
t.Errorf("Expected start message with offset -1, got %d", msg.offset)
}
Expand All @@ -42,7 +41,7 @@ func Test_Messenger(t *testing.T) {
expected := int64(50)
messenger.Send("Reading", desc, expected)
select {
case msg = <-ch:
case msg = <-messenger.ch:
if msg.offset != expected {
t.Errorf("Expected status message with offset %d, got %d", expected, msg.offset)
}
Expand All @@ -56,7 +55,7 @@ func Test_Messenger(t *testing.T) {
messenger.Send("Reading", desc, expected)
messenger.Send("Read", desc, desc.Size)
select {
case msg = <-ch:
case msg = <-messenger.ch:
if msg.offset != desc.Size {
t.Errorf("Expected status message with offset %d, got %d", expected, msg.offset)
}
Expand All @@ -67,15 +66,15 @@ func Test_Messenger(t *testing.T) {
t.Error("Expected status message")
}
select {
case msg = <-ch:
case msg = <-messenger.ch:
t.Errorf("Unexpected status message %v", msg)
default:
}

expected = int64(-1)
messenger.Stop()
select {
case msg = <-ch:
case msg = <-messenger.ch:
if msg.offset != expected {
t.Errorf("Expected END status message with offset %d, got %d", expected, msg.offset)
}
Expand All @@ -85,7 +84,7 @@ func Test_Messenger(t *testing.T) {

messenger.Stop()
select {
case msg = <-ch:
case msg = <-messenger.ch:
if msg != nil {
t.Errorf("Unexpected status message %v", msg)
}
Expand Down
56 changes: 19 additions & 37 deletions cmd/oras/internal/display/status/track/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,38 @@ package track

import (
"io"
"os"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras/cmd/oras/internal/display/status/progress"
)

type reader struct {
base io.Reader
offset int64
actionPrompt string
donePrompt string
descriptor ocispec.Descriptor
manager progress.Manager
messenger *progress.Messenger
type Reader interface {
io.Reader
Done()
Close()
Start()
}

// NewReader returns a new reader with tracked progress.
func NewReader(r io.Reader, descriptor ocispec.Descriptor, actionPrompt string, donePrompt string, tty *os.File) (*reader, error) {
manager, err := progress.NewManager(tty)
if err != nil {
return nil, err
}
return managedReader(r, descriptor, manager, actionPrompt, donePrompt)
type reader struct {
base io.Reader
offset int64
descriptor ocispec.Descriptor
messenger *progress.Messenger
}

func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager progress.Manager, actionPrompt string, donePrompt string) (*reader, error) {
messenger, err := manager.Add()
if err != nil {
return nil, err
// NewReader returns a new reader with tracked progress.
func NewReader(r io.Reader, descriptor ocispec.Descriptor, messenger *progress.Messenger) Reader {
tr := reader{
base: r,
descriptor: descriptor,
messenger: messenger,
}

return &reader{
base: r,
descriptor: descriptor,
actionPrompt: actionPrompt,
donePrompt: donePrompt,
manager: manager,
messenger: messenger,
}, nil
}

// StopManager stops the messenger channel and related manager.
func (r *reader) StopManager() {
r.Close()
_ = r.manager.Close()
return &tr
}

// Done sends message to mark the tracked progress as complete.
func (r *reader) Done() {
r.messenger.Send(r.donePrompt, r.descriptor, r.descriptor.Size)
r.messenger.SendDone(r.descriptor, r.descriptor.Size)
r.messenger.Stop()
}

Expand All @@ -93,6 +75,6 @@ func (r *reader) Read(p []byte) (int, error) {
return n, io.ErrUnexpectedEOF
}
}
r.messenger.Send(r.actionPrompt, r.descriptor, r.offset)
r.messenger.SendAction(r.descriptor, r.offset)
return n, err
}
28 changes: 18 additions & 10 deletions cmd/oras/internal/display/status/track/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ type GraphTarget interface {

type graphTarget struct {
oras.GraphTarget
manager progress.Manager
actionPrompt string
donePrompt string
manager progress.Manager
}

type referenceGraphTarget struct {
Expand All @@ -46,15 +44,13 @@ type referenceGraphTarget struct {

// NewTarget creates a new tracked Target.
func NewTarget(t oras.GraphTarget, actionPrompt, donePrompt string, tty *os.File) (GraphTarget, error) {
manager, err := progress.NewManager(tty)
manager, err := progress.NewManager(actionPrompt, donePrompt, tty)
if err != nil {
return nil, err
}
gt := &graphTarget{
GraphTarget: t,
manager: manager,
actionPrompt: actionPrompt,
donePrompt: donePrompt,
GraphTarget: t,
manager: manager,
}

if _, ok := t.(registry.ReferencePusher); ok {
Expand All @@ -74,7 +70,13 @@ func (t *graphTarget) Mount(ctx context.Context, desc ocispec.Descriptor, fromRe

// Push pushes the content to the base oras.GraphTarget with tracking.
func (t *graphTarget) Push(ctx context.Context, expected ocispec.Descriptor, content io.Reader) error {
r, err := managedReader(content, expected, t.manager, t.actionPrompt, t.donePrompt)
messenger, err := t.manager.Add()
if err != nil {
return err

Check warning on line 75 in cmd/oras/internal/display/status/track/target.go

View check run for this annotation

Codecov / codecov/patch

cmd/oras/internal/display/status/track/target.go#L73-L75

Added lines #L73 - L75 were not covered by tests
}
defer messenger.Stop()

Check warning on line 77 in cmd/oras/internal/display/status/track/target.go

View check run for this annotation

Codecov / codecov/patch

cmd/oras/internal/display/status/track/target.go#L77

Added line #L77 was not covered by tests

r := NewReader(content, expected, messenger)

Check warning on line 79 in cmd/oras/internal/display/status/track/target.go

View check run for this annotation

Codecov / codecov/patch

cmd/oras/internal/display/status/track/target.go#L79

Added line #L79 was not covered by tests
if err != nil {
return err
}
Expand All @@ -89,7 +91,13 @@ func (t *graphTarget) Push(ctx context.Context, expected ocispec.Descriptor, con

// PushReference pushes the content to the base oras.GraphTarget with tracking.
func (rgt *referenceGraphTarget) PushReference(ctx context.Context, expected ocispec.Descriptor, content io.Reader, reference string) error {
r, err := managedReader(content, expected, rgt.manager, rgt.actionPrompt, rgt.donePrompt)
messenger, err := rgt.manager.Add()
if err != nil {
return err

Check warning on line 96 in cmd/oras/internal/display/status/track/target.go

View check run for this annotation

Codecov / codecov/patch

cmd/oras/internal/display/status/track/target.go#L96

Added line #L96 was not covered by tests
}
defer messenger.Stop()

r := NewReader(content, expected, messenger)
if err != nil {
return err
}
Expand Down
17 changes: 13 additions & 4 deletions cmd/oras/root/blob/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras/cmd/oras/internal/argument"
"oras.land/oras/cmd/oras/internal/command"
"oras.land/oras/cmd/oras/internal/display/status/progress"
"oras.land/oras/cmd/oras/internal/display/status/track"
oerrors "oras.land/oras/cmd/oras/internal/errors"
"oras.land/oras/cmd/oras/internal/option"
Expand Down Expand Up @@ -170,12 +171,20 @@ func (opts *fetchBlobOptions) doFetch(ctx context.Context, src oras.ReadOnlyTarg
return ocispec.Descriptor{}, err
}
} else {
// TTY output
trackedReader, err := track.NewReader(vr, desc, "Downloading", "Downloaded ", opts.TTY)
manager, err := progress.NewManager("Downloading", "Downloaded ", opts.TTY)
if err != nil {
return ocispec.Descriptor{}, err
return desc, err

Check warning on line 176 in cmd/oras/root/blob/fetch.go

View check run for this annotation

Codecov / codecov/patch

cmd/oras/root/blob/fetch.go#L176

Added line #L176 was not covered by tests
}
defer trackedReader.StopManager()
defer manager.Close()

messenger, err := manager.Add()
if err != nil {
return desc, err

Check warning on line 182 in cmd/oras/root/blob/fetch.go

View check run for this annotation

Codecov / codecov/patch

cmd/oras/root/blob/fetch.go#L182

Added line #L182 was not covered by tests
}
defer messenger.Stop()

// TTY output
trackedReader := track.NewReader(vr, desc, messenger)
trackedReader.Start()
if _, err = io.Copy(writer, trackedReader); err != nil {
return ocispec.Descriptor{}, err
Expand Down
15 changes: 12 additions & 3 deletions cmd/oras/root/blob/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"errors"
"io"
"oras.land/oras/cmd/oras/internal/display/status/progress"
"os"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -154,12 +155,20 @@ func (opts *pushBlobOptions) doPush(ctx context.Context, printer *output.Printer
return printer.PrintStatus(desc, "Uploaded ")
}

// TTY output
trackedReader, err := track.NewReader(r, desc, "Uploading", "Uploaded ", opts.TTY)
manager, err := progress.NewManager("Uploading", "Uploaded ", opts.TTY)
if err != nil {
return err
}
defer trackedReader.StopManager()
defer manager.Close()

messenger, err := manager.Add()
if err != nil {
return err

Check warning on line 166 in cmd/oras/root/blob/push.go

View check run for this annotation

Codecov / codecov/patch

cmd/oras/root/blob/push.go#L166

Added line #L166 was not covered by tests
}
defer messenger.Stop()

// TTY output
trackedReader := track.NewReader(r, desc, messenger)
trackedReader.Start()
r = trackedReader
if err := t.Push(ctx, desc, r); err != nil {
Expand Down

0 comments on commit 83dc8ec

Please sign in to comment.