Skip to content

Commit

Permalink
[chore] Move test code under _test (#11621)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Nov 7, 2024
1 parent e690ac5 commit d39dd7a
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 160 deletions.
File renamed without changes.
160 changes: 0 additions & 160 deletions exporter/internal/queue/mock_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"syscall"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -97,161 +95,3 @@ func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) e
func (m *mockStorageClient) isClosed() bool {
return m.closed.Load()
}

func newFakeBoundedStorageClient(maxSizeInBytes int) *fakeBoundedStorageClient {
return &fakeBoundedStorageClient{
st: map[string][]byte{},
MaxSizeInBytes: maxSizeInBytes,
}
}

// this storage client mimics the behavior of actual storage engines with limited storage space available
// in general, real storage engines often have a per-write-transaction storage overhead, needing to keep
// both the old and the new value stored until the transaction is committed
// this is useful for testing the persistent queue queue behavior with a full disk
type fakeBoundedStorageClient struct {
MaxSizeInBytes int
st map[string][]byte
sizeInBytes int
mux sync.Mutex
}

func (m *fakeBoundedStorageClient) Get(ctx context.Context, key string) ([]byte, error) {
op := storage.GetOperation(key)
if err := m.Batch(ctx, op); err != nil {
return nil, err
}

return op.Value, nil
}

func (m *fakeBoundedStorageClient) Set(ctx context.Context, key string, value []byte) error {
return m.Batch(ctx, storage.SetOperation(key, value))
}

func (m *fakeBoundedStorageClient) Delete(ctx context.Context, key string) error {
return m.Batch(ctx, storage.DeleteOperation(key))
}

func (m *fakeBoundedStorageClient) Close(context.Context) error {
return nil
}

func (m *fakeBoundedStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
m.mux.Lock()
defer m.mux.Unlock()

totalAdded, totalRemoved := m.getTotalSizeChange(ops)

// the assumption here is that the new data needs to coexist with the old data on disk
// for the transaction to succeed
// this seems to be true for the file storage extension at least
if m.sizeInBytes+totalAdded > m.MaxSizeInBytes {
return fmt.Errorf("insufficient space available: %w", syscall.ENOSPC)
}

for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value = m.st[op.Key]
case storage.Set:
m.st[op.Key] = op.Value
case storage.Delete:
delete(m.st, op.Key)
default:
return errors.New("wrong operation type")
}
}

m.sizeInBytes += totalAdded - totalRemoved

return nil
}

func (m *fakeBoundedStorageClient) SetMaxSizeInBytes(newMaxSize int) {
m.mux.Lock()
defer m.mux.Unlock()
m.MaxSizeInBytes = newMaxSize
}

func (m *fakeBoundedStorageClient) GetSizeInBytes() int {
m.mux.Lock()
defer m.mux.Unlock()
return m.sizeInBytes
}

func (m *fakeBoundedStorageClient) getTotalSizeChange(ops []storage.Operation) (totalAdded int, totalRemoved int) {
totalAdded, totalRemoved = 0, 0
for _, op := range ops {
switch op.Type {
case storage.Set:
if oldValue, ok := m.st[op.Key]; ok {
totalRemoved += len(oldValue)
} else {
totalAdded += len(op.Key)
}
totalAdded += len(op.Value)
case storage.Delete:
if value, ok := m.st[op.Key]; ok {
totalRemoved += len(op.Key)
totalRemoved += len(value)
}
default:
}
}
return totalAdded, totalRemoved
}

func newFakeStorageClientWithErrors(errors []error) *fakeStorageClientWithErrors {
return &fakeStorageClientWithErrors{
errors: errors,
}
}

// this storage client just returns errors from a list in order
// used for testing error handling
type fakeStorageClientWithErrors struct {
errors []error
nextErrorIndex int
mux sync.Mutex
}

func (m *fakeStorageClientWithErrors) Get(ctx context.Context, key string) ([]byte, error) {
op := storage.GetOperation(key)
err := m.Batch(ctx, op)
if err != nil {
return nil, err
}

return op.Value, nil
}

func (m *fakeStorageClientWithErrors) Set(ctx context.Context, key string, value []byte) error {
return m.Batch(ctx, storage.SetOperation(key, value))
}

func (m *fakeStorageClientWithErrors) Delete(ctx context.Context, key string) error {
return m.Batch(ctx, storage.DeleteOperation(key))
}

func (m *fakeStorageClientWithErrors) Close(context.Context) error {
return nil
}

func (m *fakeStorageClientWithErrors) Batch(context.Context, ...storage.Operation) error {
m.mux.Lock()
defer m.mux.Unlock()

if m.nextErrorIndex >= len(m.errors) {
return nil
}

m.nextErrorIndex++
return m.errors[m.nextErrorIndex-1]
}

func (m *fakeStorageClientWithErrors) Reset() {
m.mux.Lock()
defer m.mux.Unlock()
m.nextErrorIndex = 0
}
158 changes: 158 additions & 0 deletions exporter/internal/queue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,164 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
return nh.ext
}

func newFakeBoundedStorageClient(maxSizeInBytes int) *fakeBoundedStorageClient {
return &fakeBoundedStorageClient{
st: map[string][]byte{},
MaxSizeInBytes: maxSizeInBytes,
}
}

// this storage client mimics the behavior of actual storage engines with limited storage space available
// in general, real storage engines often have a per-write-transaction storage overhead, needing to keep
// both the old and the new value stored until the transaction is committed
// this is useful for testing the persistent queue queue behavior with a full disk
type fakeBoundedStorageClient struct {
MaxSizeInBytes int
st map[string][]byte
sizeInBytes int
mux sync.Mutex
}

func (m *fakeBoundedStorageClient) Get(ctx context.Context, key string) ([]byte, error) {
op := storage.GetOperation(key)
if err := m.Batch(ctx, op); err != nil {
return nil, err
}

return op.Value, nil
}

func (m *fakeBoundedStorageClient) Set(ctx context.Context, key string, value []byte) error {
return m.Batch(ctx, storage.SetOperation(key, value))
}

func (m *fakeBoundedStorageClient) Delete(ctx context.Context, key string) error {
return m.Batch(ctx, storage.DeleteOperation(key))
}

func (m *fakeBoundedStorageClient) Close(context.Context) error {
return nil
}

func (m *fakeBoundedStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
m.mux.Lock()
defer m.mux.Unlock()

totalAdded, totalRemoved := m.getTotalSizeChange(ops)

// the assumption here is that the new data needs to coexist with the old data on disk
// for the transaction to succeed
// this seems to be true for the file storage extension at least
if m.sizeInBytes+totalAdded > m.MaxSizeInBytes {
return fmt.Errorf("insufficient space available: %w", syscall.ENOSPC)
}

for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value = m.st[op.Key]
case storage.Set:
m.st[op.Key] = op.Value
case storage.Delete:
delete(m.st, op.Key)
default:
return errors.New("wrong operation type")
}
}

m.sizeInBytes += totalAdded - totalRemoved

return nil
}

func (m *fakeBoundedStorageClient) SetMaxSizeInBytes(newMaxSize int) {
m.mux.Lock()
defer m.mux.Unlock()
m.MaxSizeInBytes = newMaxSize
}

func (m *fakeBoundedStorageClient) GetSizeInBytes() int {
m.mux.Lock()
defer m.mux.Unlock()
return m.sizeInBytes
}

func (m *fakeBoundedStorageClient) getTotalSizeChange(ops []storage.Operation) (totalAdded int, totalRemoved int) {
totalAdded, totalRemoved = 0, 0
for _, op := range ops {
switch op.Type {
case storage.Set:
if oldValue, ok := m.st[op.Key]; ok {
totalRemoved += len(oldValue)
} else {
totalAdded += len(op.Key)
}
totalAdded += len(op.Value)
case storage.Delete:
if value, ok := m.st[op.Key]; ok {
totalRemoved += len(op.Key)
totalRemoved += len(value)
}
default:
}
}
return totalAdded, totalRemoved
}

func newFakeStorageClientWithErrors(errors []error) *fakeStorageClientWithErrors {
return &fakeStorageClientWithErrors{
errors: errors,
}
}

// this storage client just returns errors from a list in order
// used for testing error handling
type fakeStorageClientWithErrors struct {
errors []error
nextErrorIndex int
mux sync.Mutex
}

func (m *fakeStorageClientWithErrors) Get(ctx context.Context, key string) ([]byte, error) {
op := storage.GetOperation(key)
err := m.Batch(ctx, op)
if err != nil {
return nil, err
}

return op.Value, nil
}

func (m *fakeStorageClientWithErrors) Set(ctx context.Context, key string, value []byte) error {
return m.Batch(ctx, storage.SetOperation(key, value))
}

func (m *fakeStorageClientWithErrors) Delete(ctx context.Context, key string) error {
return m.Batch(ctx, storage.DeleteOperation(key))
}

func (m *fakeStorageClientWithErrors) Close(context.Context) error {
return nil
}

func (m *fakeStorageClientWithErrors) Batch(context.Context, ...storage.Operation) error {
m.mux.Lock()
defer m.mux.Unlock()

if m.nextErrorIndex >= len(m.errors) {
return nil
}

m.nextErrorIndex++
return m.errors[m.nextErrorIndex-1]
}

func (m *fakeStorageClientWithErrors) Reset() {
m.mux.Lock()
defer m.mux.Unlock()
m.nextErrorIndex = 0
}

// createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers.
func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], capacity int64, numConsumers int,
consumeFunc func(_ context.Context, item tracesRequest) error) Queue[tracesRequest] {
Expand Down

0 comments on commit d39dd7a

Please sign in to comment.