Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions pkg/cloud/cloud_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ type ResumingReader struct {
}

var _ ioctx.ReadCloserCtx = &ResumingReader{}
var _ ioctx.ReaderAtCtx = &ResumingReader{}
var _ ioctx.SeekerCtx = &ResumingReader{}

// NewResumingReader returns a ResumingReader instance. Reader does not have to
// be provided, and will be created with the opener if it's not provided. Size
Expand Down Expand Up @@ -378,6 +380,57 @@ func (r *ResumingReader) Close(ctx context.Context) error {
return err
}

// ReadAt implements ioctx.ReaderAtCtx using the Opener to create a reader at the specified offset.
// This allows formats like Parquet that require random access to work efficiently with cloud storage.
func (r *ResumingReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
if r.Opener == nil {
return 0, errors.New("ReadAt not supported: no Opener available")
}

reader, _, err := r.Opener(ctx, off)
if err != nil {
return 0, err
}
defer reader.Close()

return io.ReadFull(reader, p)
}

// Seek implements ioctx.SeekerCtx by tracking position without actually seeking the underlying reader.
// Actual repositioning happens via the Opener when Read is called.
func (r *ResumingReader) Seek(ctx context.Context, offset int64, whence int) (int64, error) {
var newPos int64
switch whence {
case io.SeekStart:
newPos = offset
case io.SeekCurrent:
newPos = r.Pos + offset
case io.SeekEnd:
if r.Size == 0 {
return 0, errors.New("Seek from end not supported: size unknown")
}
newPos = r.Size + offset
default:
return 0, errors.Newf("invalid whence: %d", whence)
}

if newPos < 0 {
return 0, errors.New("negative position")
}

// Close current reader if position changed
if r.Reader != nil && newPos != r.Pos {
r.Reader.Close()
if r.ReaderSpan != nil {
r.ReaderSpan.Finish()
}
r.Reader = nil
}

r.Pos = newPos
return newPos, nil
}

// CheckHTTPContentRangeHeader parses Content-Range header and ensures that
// range start offset is the same as the expected 'pos'. It returns the total
// size of the remote object as extracted from the header.
Expand Down
188 changes: 188 additions & 0 deletions pkg/cloud/cloud_io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,191 @@ func (p *nErrorsProducer) maybeProduceErr() error {
}
return nil
}

// TestResumingReaderReadAt tests the ReadAt functionality for random access
func TestResumingReaderReadAt(t *testing.T) {
ctx := context.Background()
rf := &fakeReaderFactory{
data: "hello world",
}

t.Run("basic-read-at", func(t *testing.T) {
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)

// Read at various offsets
buf := make([]byte, 5)

// Read "hello" at offset 0
n, err := reader.ReadAt(ctx, buf, 0)
require.NoError(t, err)
require.Equal(t, 5, n)
require.Equal(t, "hello", string(buf))

// Read "world" at offset 6
n, err = reader.ReadAt(ctx, buf, 6)
require.NoError(t, err)
require.Equal(t, 5, n)
require.Equal(t, "world", string(buf))

// Read "o wor" at offset 4
n, err = reader.ReadAt(ctx, buf, 4)
require.NoError(t, err)
require.Equal(t, 5, n)
require.Equal(t, "o wor", string(buf))
})

t.Run("read-at-eof", func(t *testing.T) {
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)

// Try to read at the end of the data
buf := make([]byte, 5)
n, err := reader.ReadAt(ctx, buf, int64(len(rf.data)))
require.Error(t, err)
require.Equal(t, 0, n)
})

t.Run("read-at-partial", func(t *testing.T) {
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)

// Try to read more bytes than available
buf := make([]byte, 10)
n, err := reader.ReadAt(ctx, buf, 7)
require.Equal(t, io.ErrUnexpectedEOF, err)
require.Equal(t, 4, n) // Should read "orld"
require.Equal(t, "orld", string(buf[:n]))
})

t.Run("read-at-no-opener", func(t *testing.T) {
// Create reader without opener
reader := &ResumingReader{
Opener: nil,
}

buf := make([]byte, 5)
_, err := reader.ReadAt(ctx, buf, 0)
require.Error(t, err)
require.Contains(t, err.Error(), "ReadAt not supported: no Opener available")
})

t.Run("read-at-with-error", func(t *testing.T) {
injectedErr := errors.New("injected read error")
rfWithErr := &fakeReaderFactory{
data: "hello world",
newReaderAtKnob: func() error {
return injectedErr
},
}

reader := NewResumingReader(ctx, rfWithErr.newReaderAt, nil, 0, int64(len(rfWithErr.data)), "", nil, nil)

buf := make([]byte, 5)
_, err := reader.ReadAt(ctx, buf, 0)
require.Error(t, err)
require.ErrorIs(t, err, injectedErr)
})
}

// TestResumingReaderSeek tests the Seek functionality for repositioning
func TestResumingReaderSeek(t *testing.T) {
ctx := context.Background()
rf := &fakeReaderFactory{
data: "hello world",
}

t.Run("seek-from-start", func(t *testing.T) {
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)

pos, err := reader.Seek(ctx, 6, io.SeekStart)
require.NoError(t, err)
require.Equal(t, int64(6), pos)
require.Equal(t, int64(6), reader.Pos)
})

t.Run("seek-from-current", func(t *testing.T) {
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 5, int64(len(rf.data)), "", nil, nil)

pos, err := reader.Seek(ctx, 3, io.SeekCurrent)
require.NoError(t, err)
require.Equal(t, int64(8), pos)
require.Equal(t, int64(8), reader.Pos)
})

t.Run("seek-from-end", func(t *testing.T) {
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)

pos, err := reader.Seek(ctx, -5, io.SeekEnd)
require.NoError(t, err)
require.Equal(t, int64(len(rf.data)-5), pos)
require.Equal(t, int64(len(rf.data)-5), reader.Pos)
})

t.Run("seek-from-end-no-size", func(t *testing.T) {
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, 0, "", nil, nil)

_, err := reader.Seek(ctx, -5, io.SeekEnd)
require.Error(t, err)
require.Contains(t, err.Error(), "Seek from end not supported: size unknown")
})

t.Run("seek-negative-position", func(t *testing.T) {
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 5, int64(len(rf.data)), "", nil, nil)

_, err := reader.Seek(ctx, -10, io.SeekCurrent)
require.Error(t, err)
require.Contains(t, err.Error(), "negative position")
})

t.Run("seek-invalid-whence", func(t *testing.T) {
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)

_, err := reader.Seek(ctx, 0, 99)
require.Error(t, err)
require.Contains(t, err.Error(), "invalid whence")
})

t.Run("seek-closes-reader", func(t *testing.T) {
// Open a reader first
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)
require.NoError(t, reader.Open(ctx))
require.NotNil(t, reader.Reader)

// Seek to a different position
_, err := reader.Seek(ctx, 5, io.SeekStart)
require.NoError(t, err)

// Reader should be closed and nil
require.Nil(t, reader.Reader)
require.Equal(t, int64(5), reader.Pos)
})

t.Run("seek-same-position-no-close", func(t *testing.T) {
// Open a reader first
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)
require.NoError(t, reader.Open(ctx))
originalReader := reader.Reader
require.NotNil(t, originalReader)

// Seek to the same position
_, err := reader.Seek(ctx, 0, io.SeekStart)
require.NoError(t, err)

// Reader should NOT be closed (same position)
require.Equal(t, originalReader, reader.Reader)
require.Equal(t, int64(0), reader.Pos)
})

t.Run("seek-then-read", func(t *testing.T) {
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)

// Seek to position 6
_, err := reader.Seek(ctx, 6, io.SeekStart)
require.NoError(t, err)

// Read from new position
buf := make([]byte, 5)
n, err := reader.Read(ctx, buf)
require.NoError(t, err)
require.Equal(t, 5, n)
require.Equal(t, "world", string(buf))
})
}
26 changes: 26 additions & 0 deletions pkg/cloud/impl_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,32 @@ func (l *limitedReader) Close(ctx context.Context) error {
return l.r.Close(ctx)
}

// ReadAt implements ioctx.ReaderAtCtx by delegating to the underlying reader if it supports it.
// This is needed for Parquet and other formats that require random access.
func (l *limitedReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
if readerAt, ok := l.r.(ioctx.ReaderAtCtx); ok {
n, err = readerAt.ReadAt(ctx, p, off)
// ReadAt can be called concurrently, so apply rate limiting immediately
// without batching (unlike Read which batches via l.pool).
if n > 0 {
if err := l.lim.WaitN(ctx, int64(n)); err != nil {
log.Dev.Warningf(ctx, "failed to throttle read: %+v", err)
}
}
return n, err
}
return 0, errors.New("ReadAt not supported by underlying reader")
}

// Seek implements ioctx.SeekerCtx by delegating to the underlying reader if it supports it.
// This is needed for Parquet and other formats that require random access.
func (l *limitedReader) Seek(ctx context.Context, offset int64, whence int) (int64, error) {
if seeker, ok := l.r.(ioctx.SeekerCtx); ok {
return seeker.Seek(ctx, offset, whence)
}
return 0, errors.New("Seek not supported by underlying reader")
}

type limitedWriter struct {
w io.WriteCloser
ctx context.Context
Expand Down
19 changes: 19 additions & 0 deletions pkg/cloud/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/cidr"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/errors"
io_prometheus_client "github.com/prometheus/client_model/go"
)

Expand Down Expand Up @@ -186,6 +187,24 @@ func (mr *metricsReader) Close(ctx context.Context) error {
return mr.ReadCloserCtx.Close(ctx)
}

// ReadAt implements ioctx.ReaderAtCtx by delegating to the underlying reader if it supports it.
// This allows Parquet and other formats that require random access to work with
// cloud storage backends without buffering the entire file to disk.
func (mr *metricsReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
if readerAt, ok := mr.ReadCloserCtx.(ioctx.ReaderAtCtx); ok {
return readerAt.ReadAt(ctx, p, off)
}
return 0, errors.New("ReadAt not supported by underlying reader")
}

// Seek implements ioctx.SeekerCtx by delegating to the underlying reader if it supports it.
func (mr *metricsReader) Seek(ctx context.Context, offset int64, whence int) (int64, error) {
if seeker, ok := mr.ReadCloserCtx.(ioctx.SeekerCtx); ok {
return seeker.Seek(ctx, offset, whence)
}
return 0, errors.New("Seek not supported by underlying reader")
}

type metricsWriter struct {
io.WriteCloser
m *Metrics
Expand Down