Skip to content

Commit

Permalink
writer
Browse files Browse the repository at this point in the history
display o

NewWriter

display buckets only that have started

mock

output conc num

err handle

go mod

log number buckets

space

writer rm unnecessary methods
  • Loading branch information
go-to-k committed Feb 3, 2025
1 parent 5175d3a commit d4a3763
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 213 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/aws/smithy-go v1.22.1
github.com/charmbracelet/bubbletea v1.1.1
github.com/fatih/color v1.18.0
github.com/gosuri/uilive v0.0.4
github.com/rs/zerolog v1.33.0
github.com/stretchr/testify v1.8.0
github.com/urfave/cli/v2 v2.27.4
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gosuri/uilive v0.0.4 h1:hUEBpQDj8D8jXgtCdBu7sWsy5sbW/5GhuO8KBwJ2jyY=
github.com/gosuri/uilive v0.0.4/go.mod h1:V/epo5LjjlDE5RJUcqx8dbw+zc93y5Ya3yg8tfZ74VI=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down
6 changes: 1 addition & 5 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,7 @@ func (a *App) initBucketProcessor() error {
ForceMode: a.ForceMode,
OldVersionsOnly: a.OldVersionsOnly,
}
processor, err := NewBucketProcessor(processorConfig, a.s3Wrapper)
if err != nil {
return err
}
a.bucketProcessor = processor
a.bucketProcessor = NewBucketProcessor(processorConfig, a.s3Wrapper)
}
return nil
}
Expand Down
59 changes: 32 additions & 27 deletions internal/app/bucket_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package app
import (
"context"

"github.com/go-to-k/cls3/internal/io"
"github.com/go-to-k/cls3/internal/wrapper"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -37,11 +38,8 @@ type BucketProcessor struct {
func NewBucketProcessor(
config BucketProcessorConfig,
s3Wrapper wrapper.IWrapper,
) (*BucketProcessor, error) {
state, err := NewClearingState(config.TargetBuckets, s3Wrapper, config.ForceMode)
if err != nil {
return nil, err
}
) *BucketProcessor {
state := NewClearingState(config.TargetBuckets, s3Wrapper, config.ForceMode)

display := NewDisplayManager(state, config.QuietMode)

Expand All @@ -50,25 +48,48 @@ func NewBucketProcessor(
s3Wrapper: s3Wrapper,
state: state,
display: display,
}, nil
}
}

// Process executes the bucket processing workflow
func (p *BucketProcessor) Process(ctx context.Context) error {
if err := p.display.Start(p.config.TargetBuckets); err != nil {
return err
concurrencyNumber := p.determineConcurrencyNumber()
io.Logger.Info().Msgf("Number of buckets: %v", len(p.config.TargetBuckets))
io.Logger.Info().Msgf("Concurrency number: %v", concurrencyNumber)

for _, bucket := range p.config.TargetBuckets {
if err := p.s3Wrapper.OutputCheckingMessage(bucket); err != nil {
return err
}
}

if err := p.clearBuckets(ctx); err != nil {
p.display.Start(p.config.TargetBuckets)

if err := p.clearBuckets(ctx, concurrencyNumber); err != nil {
return err
}

return p.display.Finish(p.config.TargetBuckets)
}

// determineConcurrencyNumber calculates the appropriate concurrency number
func (p *BucketProcessor) determineConcurrencyNumber() int {
// Series when ConcurrentMode is off.
if !p.config.ConcurrentMode {
return 1
}

// Cases where ConcurrencyNumber is unspecified.
if p.config.ConcurrencyNumber == UnspecifiedConcurrencyNumber {
return len(p.config.TargetBuckets)
}

// Cases where ConcurrencyNumber is specified.
return p.config.ConcurrencyNumber
}

// clearBuckets processes all buckets with the specified concurrency
func (p *BucketProcessor) clearBuckets(ctx context.Context) error {
concurrencyNumber := p.determineConcurrencyNumber()
func (p *BucketProcessor) clearBuckets(ctx context.Context, concurrencyNumber int) error {
sem := semaphore.NewWeighted(int64(concurrencyNumber))
clearEg := errgroup.Group{}

Expand All @@ -87,22 +108,6 @@ func (p *BucketProcessor) clearBuckets(ctx context.Context) error {
return clearEg.Wait()
}

// determineConcurrencyNumber calculates the appropriate concurrency number
func (p *BucketProcessor) determineConcurrencyNumber() int {
// Series when ConcurrentMode is off.
if !p.config.ConcurrentMode {
return 1
}

// Cases where ConcurrencyNumber is unspecified.
if p.config.ConcurrencyNumber == UnspecifiedConcurrencyNumber {
return len(p.config.TargetBuckets)
}

// Cases where ConcurrencyNumber is specified.
return p.config.ConcurrencyNumber
}

// clearSingleBucket processes a single bucket
func (p *BucketProcessor) clearSingleBucket(ctx context.Context, bucket string) error {
clearingCountCh, clearingCompletedCh := p.state.GetChannelsForBucket(bucket)
Expand Down
140 changes: 116 additions & 24 deletions internal/app/bucket_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,28 +81,49 @@ func TestBucketProcessor_Process(t *testing.T) {
{
name: "successfully process buckets",
prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState, md *MockIDisplayManager) {
md.EXPECT().Start([]string{"bucket1"}).Return(nil)
md.EXPECT().Finish([]string{"bucket1"}).Return(nil)
countCh := make(chan int64)
completedCh := make(chan bool)
mc.EXPECT().GetChannelsForBucket("bucket1").Return(countCh, completedCh)
m.EXPECT().OutputCheckingMessage("bucket1").Return(nil)
m.EXPECT().OutputCheckingMessage("bucket2").Return(nil)
md.EXPECT().Start([]string{"bucket1", "bucket2"})
md.EXPECT().Finish([]string{"bucket1", "bucket2"}).Return(nil)

countCh1 := make(chan int64)
completedCh1 := make(chan bool)
countCh2 := make(chan int64)
completedCh2 := make(chan bool)

mc.EXPECT().GetChannelsForBucket("bucket1").Return(countCh1, completedCh1)
mc.EXPECT().GetChannelsForBucket("bucket2").Return(countCh2, completedCh2)
m.EXPECT().ClearBucket(
gomock.Any(),
wrapper.ClearBucketInput{
TargetBucket: "bucket1",
ForceMode: false,
OldVersionsOnly: false,
QuietMode: false,
ClearingCountCh: countCh,
ClearingCountCh: countCh1,
},
).Return(nil)
m.EXPECT().ClearBucket(
gomock.Any(),
wrapper.ClearBucketInput{
TargetBucket: "bucket2",
ForceMode: false,
OldVersionsOnly: false,
QuietMode: false,
ClearingCountCh: countCh2,
},
).Return(nil)
go func() {
completed := <-completedCh
completed := <-completedCh1
assert.True(t, completed, "value from completedCh should be true")
}()
go func() {
completed := <-completedCh2
assert.True(t, completed, "value from completedCh should be true")
}()
},
config: BucketProcessorConfig{
TargetBuckets: []string{"bucket1"},
TargetBuckets: []string{"bucket1", "bucket2"},
QuietMode: false,
ConcurrentMode: false,
ConcurrencyNumber: UnspecifiedConcurrencyNumber,
Expand All @@ -112,9 +133,9 @@ func TestBucketProcessor_Process(t *testing.T) {
wantErr: false,
},
{
name: "error when display start fails",
name: "error when output checking message fails",
prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState, md *MockIDisplayManager) {
md.EXPECT().Start([]string{"bucket1"}).Return(fmt.Errorf("StartError"))
m.EXPECT().OutputCheckingMessage("bucket1").Return(fmt.Errorf("OutputCheckingMessageError"))
},
config: BucketProcessorConfig{
TargetBuckets: []string{"bucket1"},
Expand All @@ -123,12 +144,13 @@ func TestBucketProcessor_Process(t *testing.T) {
ConcurrencyNumber: UnspecifiedConcurrencyNumber,
},
wantErr: true,
expectedErr: "StartError",
expectedErr: "OutputCheckingMessageError",
},
{
name: "error when clear bucket fails",
prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState, md *MockIDisplayManager) {
md.EXPECT().Start([]string{"bucket1"}).Return(nil)
m.EXPECT().OutputCheckingMessage("bucket1").Return(nil)
md.EXPECT().Start([]string{"bucket1"})
countCh := make(chan int64)
completedCh := make(chan bool)
mc.EXPECT().GetChannelsForBucket("bucket1").Return(countCh, completedCh)
Expand Down Expand Up @@ -159,7 +181,8 @@ func TestBucketProcessor_Process(t *testing.T) {
{
name: "error when display finish fails",
prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState, md *MockIDisplayManager) {
md.EXPECT().Start([]string{"bucket1"}).Return(nil)
m.EXPECT().OutputCheckingMessage("bucket1").Return(nil)
md.EXPECT().Start([]string{"bucket1"})
countCh := make(chan int64)
completedCh := make(chan bool)
mc.EXPECT().GetChannelsForBucket("bucket1").Return(countCh, completedCh)
Expand Down Expand Up @@ -221,11 +244,12 @@ func TestBucketProcessor_Process(t *testing.T) {

func TestBucketProcessor_clearBuckets(t *testing.T) {
tests := []struct {
name string
prepareMockFn func(m *wrapper.MockIWrapper, mc *MockIClearingState)
config BucketProcessorConfig
wantErr bool
expectedErr string
name string
prepareMockFn func(m *wrapper.MockIWrapper, mc *MockIClearingState)
config BucketProcessorConfig
concurrencyNumber int
wantErr bool
expectedErr string
}{
{
name: "successfully clear single bucket",
Expand Down Expand Up @@ -254,7 +278,40 @@ func TestBucketProcessor_clearBuckets(t *testing.T) {
ConcurrentMode: false,
ConcurrencyNumber: UnspecifiedConcurrencyNumber,
},
wantErr: false,
concurrencyNumber: 1,
wantErr: false,
},
{
name: "successfully clear multiple buckets",
prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState) {
for _, bucket := range []string{"bucket1", "bucket2"} {
countCh := make(chan int64)
completedCh := make(chan bool)
mc.EXPECT().GetChannelsForBucket(bucket).Return(countCh, completedCh)
m.EXPECT().ClearBucket(
gomock.Any(),
wrapper.ClearBucketInput{
TargetBucket: bucket,
ForceMode: false,
OldVersionsOnly: false,
QuietMode: false,
ClearingCountCh: countCh,
},
).Return(nil)
go func() {
completed := <-completedCh
assert.True(t, completed, "value from completedCh should be true")
}()
}
},
config: BucketProcessorConfig{
TargetBuckets: []string{"bucket1", "bucket2"},
QuietMode: false,
ConcurrentMode: true,
ConcurrencyNumber: UnspecifiedConcurrencyNumber,
},
concurrencyNumber: 1,
wantErr: false,
},
{
name: "successfully clear multiple buckets concurrently",
Expand Down Expand Up @@ -285,7 +342,40 @@ func TestBucketProcessor_clearBuckets(t *testing.T) {
ConcurrentMode: true,
ConcurrencyNumber: UnspecifiedConcurrencyNumber,
},
wantErr: false,
concurrencyNumber: 2,
wantErr: false,
},
{
name: "successfully clear many buckets concurrently",
prepareMockFn: func(m *wrapper.MockIWrapper, mc *MockIClearingState) {
for _, bucket := range []string{"bucket1", "bucket2", "bucket3", "bucket4", "bucket5"} {
countCh := make(chan int64)
completedCh := make(chan bool)
mc.EXPECT().GetChannelsForBucket(bucket).Return(countCh, completedCh)
m.EXPECT().ClearBucket(
gomock.Any(),
wrapper.ClearBucketInput{
TargetBucket: bucket,
ForceMode: false,
OldVersionsOnly: false,
QuietMode: false,
ClearingCountCh: countCh,
},
).Return(nil)
go func() {
completed := <-completedCh
assert.True(t, completed, "value from completedCh should be true")
}()
}
},
config: BucketProcessorConfig{
TargetBuckets: []string{"bucket1", "bucket2", "bucket3", "bucket4", "bucket5"},
QuietMode: false,
ConcurrentMode: true,
ConcurrencyNumber: UnspecifiedConcurrencyNumber,
},
concurrencyNumber: 2,
wantErr: false,
},
{
name: "successfully clear single bucket with quiet mode",
Expand All @@ -310,7 +400,8 @@ func TestBucketProcessor_clearBuckets(t *testing.T) {
ConcurrentMode: false,
ConcurrencyNumber: UnspecifiedConcurrencyNumber,
},
wantErr: false,
concurrencyNumber: 1,
wantErr: false,
},
{
name: "error when clear bucket fails",
Expand Down Expand Up @@ -339,8 +430,9 @@ func TestBucketProcessor_clearBuckets(t *testing.T) {
ConcurrentMode: false,
ConcurrencyNumber: UnspecifiedConcurrencyNumber,
},
wantErr: true,
expectedErr: "ClearBucketError",
concurrencyNumber: 1,
wantErr: true,
expectedErr: "ClearBucketError",
},
}

Expand All @@ -357,7 +449,7 @@ func TestBucketProcessor_clearBuckets(t *testing.T) {
state: mockClearingState,
}

err := processor.clearBuckets(context.Background())
err := processor.clearBuckets(context.Background(), tt.concurrencyNumber)
if (err != nil) != tt.wantErr {
t.Errorf("error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
Loading

0 comments on commit d4a3763

Please sign in to comment.