Skip to content

Commit 984449b

Browse files
committed
cloud: add random access support to ResumingReader
This commit adds ReadAt and Seek implementations to ResumingReader and its wrapper types (limitedReader, metricsReader), enabling random access to cloud-stored files. Implementation details: - ReadAt creates a new reader at the specified offset using the Opener - Seek tracks position logically without seeking the underlying reader - Both methods implement ioctx.ReaderAtCtx and ioctx.SeekerCtx interfaces - Thread-safe: ReadAt can be called concurrently from multiple goroutines - Works with all cloud storage backends that support range requests This enables efficient reading of formats like Parquet that require random access, without buffering entire files to disk. Release note: none Epic: CRDB-23802
1 parent f37e9c2 commit 984449b

File tree

4 files changed

+278
-0
lines changed

4 files changed

+278
-0
lines changed

pkg/cloud/cloud_io.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ type ResumingReader struct {
252252
}
253253

254254
var _ ioctx.ReadCloserCtx = &ResumingReader{}
255+
var _ ioctx.ReaderAtCtx = &ResumingReader{}
256+
var _ ioctx.SeekerCtx = &ResumingReader{}
255257

256258
// NewResumingReader returns a ResumingReader instance. Reader does not have to
257259
// be provided, and will be created with the opener if it's not provided. Size
@@ -378,6 +380,57 @@ func (r *ResumingReader) Close(ctx context.Context) error {
378380
return err
379381
}
380382

383+
// ReadAt implements ioctx.ReaderAtCtx using the Opener to create a reader at the specified offset.
384+
// This allows formats like Parquet that require random access to work efficiently with cloud storage.
385+
func (r *ResumingReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
386+
if r.Opener == nil {
387+
return 0, errors.New("ReadAt not supported: no Opener available")
388+
}
389+
390+
reader, _, err := r.Opener(ctx, off)
391+
if err != nil {
392+
return 0, err
393+
}
394+
defer reader.Close()
395+
396+
return io.ReadFull(reader, p)
397+
}
398+
399+
// Seek implements ioctx.SeekerCtx by tracking position without actually seeking the underlying reader.
400+
// Actual repositioning happens via the Opener when Read is called.
401+
func (r *ResumingReader) Seek(ctx context.Context, offset int64, whence int) (int64, error) {
402+
var newPos int64
403+
switch whence {
404+
case io.SeekStart:
405+
newPos = offset
406+
case io.SeekCurrent:
407+
newPos = r.Pos + offset
408+
case io.SeekEnd:
409+
if r.Size == 0 {
410+
return 0, errors.New("Seek from end not supported: size unknown")
411+
}
412+
newPos = r.Size + offset
413+
default:
414+
return 0, errors.Newf("invalid whence: %d", whence)
415+
}
416+
417+
if newPos < 0 {
418+
return 0, errors.New("negative position")
419+
}
420+
421+
// Close current reader if position changed
422+
if r.Reader != nil && newPos != r.Pos {
423+
r.Reader.Close()
424+
if r.ReaderSpan != nil {
425+
r.ReaderSpan.Finish()
426+
}
427+
r.Reader = nil
428+
}
429+
430+
r.Pos = newPos
431+
return newPos, nil
432+
}
433+
381434
// CheckHTTPContentRangeHeader parses Content-Range header and ensures that
382435
// range start offset is the same as the expected 'pos'. It returns the total
383436
// size of the remote object as extracted from the header.

pkg/cloud/cloud_io_test.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,3 +253,191 @@ func (p *nErrorsProducer) maybeProduceErr() error {
253253
}
254254
return nil
255255
}
256+
257+
// TestResumingReaderReadAt tests the ReadAt functionality for random access
258+
func TestResumingReaderReadAt(t *testing.T) {
259+
ctx := context.Background()
260+
rf := &fakeReaderFactory{
261+
data: "hello world",
262+
}
263+
264+
t.Run("basic-read-at", func(t *testing.T) {
265+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)
266+
267+
// Read at various offsets
268+
buf := make([]byte, 5)
269+
270+
// Read "hello" at offset 0
271+
n, err := reader.ReadAt(ctx, buf, 0)
272+
require.NoError(t, err)
273+
require.Equal(t, 5, n)
274+
require.Equal(t, "hello", string(buf))
275+
276+
// Read "world" at offset 6
277+
n, err = reader.ReadAt(ctx, buf, 6)
278+
require.NoError(t, err)
279+
require.Equal(t, 5, n)
280+
require.Equal(t, "world", string(buf))
281+
282+
// Read "o wor" at offset 4
283+
n, err = reader.ReadAt(ctx, buf, 4)
284+
require.NoError(t, err)
285+
require.Equal(t, 5, n)
286+
require.Equal(t, "o wor", string(buf))
287+
})
288+
289+
t.Run("read-at-eof", func(t *testing.T) {
290+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)
291+
292+
// Try to read at the end of the data
293+
buf := make([]byte, 5)
294+
n, err := reader.ReadAt(ctx, buf, int64(len(rf.data)))
295+
require.Error(t, err)
296+
require.Equal(t, 0, n)
297+
})
298+
299+
t.Run("read-at-partial", func(t *testing.T) {
300+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)
301+
302+
// Try to read more bytes than available
303+
buf := make([]byte, 10)
304+
n, err := reader.ReadAt(ctx, buf, 7)
305+
require.Equal(t, io.ErrUnexpectedEOF, err)
306+
require.Equal(t, 4, n) // Should read "orld"
307+
require.Equal(t, "orld", string(buf[:n]))
308+
})
309+
310+
t.Run("read-at-no-opener", func(t *testing.T) {
311+
// Create reader without opener
312+
reader := &ResumingReader{
313+
Opener: nil,
314+
}
315+
316+
buf := make([]byte, 5)
317+
_, err := reader.ReadAt(ctx, buf, 0)
318+
require.Error(t, err)
319+
require.Contains(t, err.Error(), "ReadAt not supported: no Opener available")
320+
})
321+
322+
t.Run("read-at-with-error", func(t *testing.T) {
323+
injectedErr := errors.New("injected read error")
324+
rfWithErr := &fakeReaderFactory{
325+
data: "hello world",
326+
newReaderAtKnob: func() error {
327+
return injectedErr
328+
},
329+
}
330+
331+
reader := NewResumingReader(ctx, rfWithErr.newReaderAt, nil, 0, int64(len(rfWithErr.data)), "", nil, nil)
332+
333+
buf := make([]byte, 5)
334+
_, err := reader.ReadAt(ctx, buf, 0)
335+
require.Error(t, err)
336+
require.ErrorIs(t, err, injectedErr)
337+
})
338+
}
339+
340+
// TestResumingReaderSeek tests the Seek functionality for repositioning
341+
func TestResumingReaderSeek(t *testing.T) {
342+
ctx := context.Background()
343+
rf := &fakeReaderFactory{
344+
data: "hello world",
345+
}
346+
347+
t.Run("seek-from-start", func(t *testing.T) {
348+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)
349+
350+
pos, err := reader.Seek(ctx, 6, io.SeekStart)
351+
require.NoError(t, err)
352+
require.Equal(t, int64(6), pos)
353+
require.Equal(t, int64(6), reader.Pos)
354+
})
355+
356+
t.Run("seek-from-current", func(t *testing.T) {
357+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 5, int64(len(rf.data)), "", nil, nil)
358+
359+
pos, err := reader.Seek(ctx, 3, io.SeekCurrent)
360+
require.NoError(t, err)
361+
require.Equal(t, int64(8), pos)
362+
require.Equal(t, int64(8), reader.Pos)
363+
})
364+
365+
t.Run("seek-from-end", func(t *testing.T) {
366+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)
367+
368+
pos, err := reader.Seek(ctx, -5, io.SeekEnd)
369+
require.NoError(t, err)
370+
require.Equal(t, int64(len(rf.data)-5), pos)
371+
require.Equal(t, int64(len(rf.data)-5), reader.Pos)
372+
})
373+
374+
t.Run("seek-from-end-no-size", func(t *testing.T) {
375+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, 0, "", nil, nil)
376+
377+
_, err := reader.Seek(ctx, -5, io.SeekEnd)
378+
require.Error(t, err)
379+
require.Contains(t, err.Error(), "Seek from end not supported: size unknown")
380+
})
381+
382+
t.Run("seek-negative-position", func(t *testing.T) {
383+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 5, int64(len(rf.data)), "", nil, nil)
384+
385+
_, err := reader.Seek(ctx, -10, io.SeekCurrent)
386+
require.Error(t, err)
387+
require.Contains(t, err.Error(), "negative position")
388+
})
389+
390+
t.Run("seek-invalid-whence", func(t *testing.T) {
391+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)
392+
393+
_, err := reader.Seek(ctx, 0, 99)
394+
require.Error(t, err)
395+
require.Contains(t, err.Error(), "invalid whence")
396+
})
397+
398+
t.Run("seek-closes-reader", func(t *testing.T) {
399+
// Open a reader first
400+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)
401+
require.NoError(t, reader.Open(ctx))
402+
require.NotNil(t, reader.Reader)
403+
404+
// Seek to a different position
405+
_, err := reader.Seek(ctx, 5, io.SeekStart)
406+
require.NoError(t, err)
407+
408+
// Reader should be closed and nil
409+
require.Nil(t, reader.Reader)
410+
require.Equal(t, int64(5), reader.Pos)
411+
})
412+
413+
t.Run("seek-same-position-no-close", func(t *testing.T) {
414+
// Open a reader first
415+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)
416+
require.NoError(t, reader.Open(ctx))
417+
originalReader := reader.Reader
418+
require.NotNil(t, originalReader)
419+
420+
// Seek to the same position
421+
_, err := reader.Seek(ctx, 0, io.SeekStart)
422+
require.NoError(t, err)
423+
424+
// Reader should NOT be closed (same position)
425+
require.Equal(t, originalReader, reader.Reader)
426+
require.Equal(t, int64(0), reader.Pos)
427+
})
428+
429+
t.Run("seek-then-read", func(t *testing.T) {
430+
reader := NewResumingReader(ctx, rf.newReaderAt, nil, 0, int64(len(rf.data)), "", nil, nil)
431+
432+
// Seek to position 6
433+
_, err := reader.Seek(ctx, 6, io.SeekStart)
434+
require.NoError(t, err)
435+
436+
// Read from new position
437+
buf := make([]byte, 5)
438+
n, err := reader.Read(ctx, buf)
439+
require.NoError(t, err)
440+
require.Equal(t, 5, n)
441+
require.Equal(t, "world", string(buf))
442+
})
443+
}

pkg/cloud/impl_registry.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,24 @@ func (l *limitedReader) Close(ctx context.Context) error {
463463
return l.r.Close(ctx)
464464
}
465465

466+
// ReadAt implements ioctx.ReaderAtCtx by delegating to the underlying reader if it supports it.
467+
// This is needed for Parquet and other formats that require random access.
468+
func (l *limitedReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
469+
if readerAt, ok := l.r.(ioctx.ReaderAtCtx); ok {
470+
return readerAt.ReadAt(ctx, p, off)
471+
}
472+
return 0, errors.New("ReadAt not supported by underlying reader")
473+
}
474+
475+
// Seek implements ioctx.SeekerCtx by delegating to the underlying reader if it supports it.
476+
// This is needed for Parquet and other formats that require random access.
477+
func (l *limitedReader) Seek(ctx context.Context, offset int64, whence int) (int64, error) {
478+
if seeker, ok := l.r.(ioctx.SeekerCtx); ok {
479+
return seeker.Seek(ctx, offset, whence)
480+
}
481+
return 0, errors.New("Seek not supported by underlying reader")
482+
}
483+
466484
type limitedWriter struct {
467485
w io.WriteCloser
468486
ctx context.Context

pkg/cloud/metrics.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cockroachdb/cockroach/pkg/util/cidr"
1313
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
1414
"github.com/cockroachdb/cockroach/pkg/util/metric"
15+
"github.com/cockroachdb/errors"
1516
io_prometheus_client "github.com/prometheus/client_model/go"
1617
)
1718

@@ -186,6 +187,24 @@ func (mr *metricsReader) Close(ctx context.Context) error {
186187
return mr.ReadCloserCtx.Close(ctx)
187188
}
188189

190+
// ReadAt implements ioctx.ReaderAtCtx by delegating to the underlying reader if it supports it.
191+
// This allows Parquet and other formats that require random access to work with
192+
// cloud storage backends without buffering the entire file to disk.
193+
func (mr *metricsReader) ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) {
194+
if readerAt, ok := mr.ReadCloserCtx.(ioctx.ReaderAtCtx); ok {
195+
return readerAt.ReadAt(ctx, p, off)
196+
}
197+
return 0, errors.New("ReadAt not supported by underlying reader")
198+
}
199+
200+
// Seek implements ioctx.SeekerCtx by delegating to the underlying reader if it supports it.
201+
func (mr *metricsReader) Seek(ctx context.Context, offset int64, whence int) (int64, error) {
202+
if seeker, ok := mr.ReadCloserCtx.(ioctx.SeekerCtx); ok {
203+
return seeker.Seek(ctx, offset, whence)
204+
}
205+
return 0, errors.New("Seek not supported by underlying reader")
206+
}
207+
189208
type metricsWriter struct {
190209
io.WriteCloser
191210
m *Metrics

0 commit comments

Comments
 (0)