Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/fuse/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func (s *Server) Mount(onReady func()) error {

maxReadAhead := s.config.MaxReadAheadMB * 1024 * 1024
if maxReadAhead == 0 {
maxReadAhead = 128 * 1024 // 128KB default (matches rclone)
// 4MB default: Usenet segments are typically ~750KB decoded, so a 4MB
// readahead window lets the kernel pipeline multiple segment-sized reads,
// keeping the prefetch queue saturated without excessive memory use.
maxReadAhead = 4 * 1024 * 1024
}

opts := &fs.Options{
Expand Down
26 changes: 14 additions & 12 deletions internal/usenet/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"io"
"sync"
"sync/atomic"
)

type Segment struct {
Expand All @@ -24,7 +25,7 @@ type segmentRange struct {
start int64
end int64
segments []*segment
current int
current atomic.Int64 // Current segment index; updated atomically so GetCurrentIndex is lock-free
ctx context.Context
mu sync.RWMutex
}
Expand All @@ -33,11 +34,10 @@ func (r *segmentRange) HasSegments() bool {
return len(r.segments) > 0
}

// GetCurrentIndex returns the current segment index being read
// GetCurrentIndex returns the current segment index being read.
// Lock-free: reads the atomic counter directly.
func (r *segmentRange) GetCurrentIndex() int {
r.mu.RLock()
defer r.mu.RUnlock()
return r.current
return int(r.current.Load())
}

func (r *segmentRange) Len() int {
Expand All @@ -47,14 +47,15 @@ func (r *segmentRange) Len() int {
}

func (r *segmentRange) Get() (*segment, error) {
current := int(r.current.Load())
r.mu.RLock()
defer r.mu.RUnlock()

if r.current >= len(r.segments) {
if current >= len(r.segments) {
return nil, ErrSegmentLimit
}

return r.segments[r.current], nil
return r.segments[current], nil
}

func (r *segmentRange) GetSegment(index int) (*segment, error) {
Expand All @@ -70,18 +71,19 @@ func (r *segmentRange) GetSegment(index int) (*segment, error) {

func (r *segmentRange) Next() (*segment, error) {
r.mu.Lock()
if r.current >= len(r.segments) {
current := int(r.current.Load())
if current >= len(r.segments) {
r.mu.Unlock()
return nil, ErrSegmentLimit
}

// Release data from consumed segment to allow GC
r.segments[r.current].Release()
r.segments[r.current] = nil

r.current += 1
r.segments[current].Release()
r.segments[current] = nil
r.mu.Unlock()

r.current.Add(1)

return r.Get()
}

Expand Down
2 changes: 0 additions & 2 deletions internal/usenet/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ func TestSegmentRangeClear_ContinuesOnAllSegments(t *testing.T) {

sr := &segmentRange{
segments: segments,
current: 0,
}

_ = sr.Clear()
Expand Down Expand Up @@ -340,7 +339,6 @@ func TestSegmentRangeClear_AllSegmentsReleased(t *testing.T) {

sr := &segmentRange{
segments: segments,
current: 0,
}

err := sr.Clear()
Expand Down
169 changes: 92 additions & 77 deletions internal/usenet/usenet_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/avast/retry-go/v4"
Expand Down Expand Up @@ -51,24 +52,37 @@ func (e *DataCorruptionError) Unwrap() error {
return e.UnderlyingErr
}

// bufPool reuses download buffers to reduce GC pressure.
// Pre-sized for typical Usenet segments (~750KB decoded).
var bufPool = sync.Pool{
New: func() any {
return bytes.NewBuffer(make([]byte, 0, 750*1024))
},
}

type UsenetReader struct {
log *slog.Logger
wg sync.WaitGroup
ctx context.Context // Reader's context for cancellation
cancel context.CancelFunc
rg *segmentRange
maxPrefetch int // Maximum segments prefetched ahead of current read position
init chan any
initDownload sync.Once
closeOnce sync.Once
totalBytesRead int64
poolGetter func() (*nntppool.Client, error) // Dynamic pool getter
log *slog.Logger
wg sync.WaitGroup
ctx context.Context // Reader's context for cancellation
cancel context.CancelFunc
rg *segmentRange
maxPrefetch int // Maximum segments prefetched ahead of current read position
init chan any
initDownload sync.Once
closeOnce sync.Once
poolGetter func() (*nntppool.Client, error) // Dynamic pool getter
metricsTracker MetricsTracker
streamID string
segmentStore SegmentStore // optional, nil = no caching

// Prefetch-based download tracking
nextToDownload int // Index of next segment to schedule
// Prefetch-based download tracking (atomic: written by downloadManager, read from multiple goroutines)
nextToDownload atomic.Int64
totalBytesRead atomic.Int64

// segmentConsumed is signaled (non-blocking, capacity 1) whenever the reader
// consumes a segment via Next(), allowing downloadManager to immediately
// schedule the next download instead of waiting for the 50ms poll timer.
segmentConsumed chan struct{}

mu sync.Mutex
}
Expand All @@ -90,16 +104,17 @@ func NewUsenetReader(
}

ur := &UsenetReader{
log: log,
ctx: ctx,
cancel: cancel,
rg: rg,
init: make(chan any, 1),
maxPrefetch: maxPrefetch,
poolGetter: poolGetter,
metricsTracker: metricsTracker,
streamID: streamID,
segmentStore: segmentStore,
log: log,
ctx: ctx,
cancel: cancel,
rg: rg,
init: make(chan any, 1),
maxPrefetch: maxPrefetch,
poolGetter: poolGetter,
metricsTracker: metricsTracker,
streamID: streamID,
segmentStore: segmentStore,
segmentConsumed: make(chan struct{}, 1),
}

ur.wg.Go(func() {
Expand Down Expand Up @@ -183,21 +198,12 @@ func (b *UsenetReader) Read(p []byte) (int, error) {

s, err := rg.Get()
if err != nil {
b.mu.Lock()
totalRead := b.totalBytesRead
b.mu.Unlock()
totalRead := b.totalBytesRead.Load()

if b.isArticleNotFoundError(err) {
if totalRead > 0 {
return 0, &DataCorruptionError{
UnderlyingErr: err,
BytesRead: totalRead,
}
} else {
return 0, &DataCorruptionError{
UnderlyingErr: err,
BytesRead: 0,
}
return 0, &DataCorruptionError{
UnderlyingErr: err,
BytesRead: totalRead,
}
}
return 0, io.EOF
Expand All @@ -208,20 +214,15 @@ func (b *UsenetReader) Read(p []byte) (int, error) {
nn, err := s.GetReaderContext(b.ctx).Read(p[n:])
n += nn

b.mu.Lock()
b.totalBytesRead += int64(nn)
totalRead := b.totalBytesRead
b.mu.Unlock()
totalRead := b.totalBytesRead.Add(int64(nn))

if err != nil {
if errors.Is(err, io.EOF) {
// Segment fully read — move to next segment
b.mu.Lock()
rg := b.rg
b.mu.Unlock()

if rg == nil {
return n, io.ErrClosedPipe
// Segment fully read — signal download manager to schedule the next
// segment immediately rather than waiting for the polling interval.
select {
case b.segmentConsumed <- struct{}{}:
default:
}

s, err = rg.Next()
Expand Down Expand Up @@ -263,18 +264,20 @@ func (b *UsenetReader) isArticleNotFoundError(err error) bool {

func (b *UsenetReader) GetBufferedOffset() int64 {
b.mu.Lock()
defer b.mu.Unlock()
rg := b.rg
b.mu.Unlock()

if b.rg == nil {
if rg == nil {
return 0
}

if b.nextToDownload == 0 {
nextToDownload := int(b.nextToDownload.Load())
if nextToDownload == 0 {
return 0
}

idx := b.nextToDownload - 1
s, err := b.rg.GetSegment(idx)
idx := nextToDownload - 1
s, err := rg.GetSegment(idx)
if err != nil || s == nil {
return 0
}
Expand Down Expand Up @@ -307,9 +310,14 @@ func (b *UsenetReader) downloadSegmentWithRetry(ctx context.Context, seg *segmen
return err
}

buf := bytes.NewBuffer(make([]byte, 0, seg.SegmentSize))
// Reuse buffer from pool to reduce GC pressure.
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()

result, err := cp.BodyStream(attemptCtx, seg.Id, buf)
if err != nil {
bufPool.Put(buf)

if errors.Is(err, context.DeadlineExceeded) {
b.log.DebugContext(ctx, "Segment download attempt timed out after 30s", "segment_id", seg.Id)
}
Expand All @@ -329,7 +337,10 @@ func (b *UsenetReader) downloadSegmentWithRetry(ctx context.Context, seg *segmen
return err
}

resultBytes = buf.Bytes()
// Copy decoded data out before returning buffer to pool.
resultBytes = make([]byte, buf.Len())
copy(resultBytes, buf.Bytes())
bufPool.Put(buf)

if b.metricsTracker != nil {
b.metricsTracker.IncArticlesDownloaded()
Expand Down Expand Up @@ -372,47 +383,51 @@ func (b *UsenetReader) downloadManager(ctx context.Context) {
return
}

if b.rg.Len() == 0 {
// Cache the rg reference once. Safe because this goroutine is tracked in b.wg,
// and rg.Clear() is only called after wg.Wait() completes in Close().
b.mu.Lock()
rg := b.rg
b.mu.Unlock()

if rg == nil || rg.Len() == 0 {
return
}

// Cache total segment count — fixed after segmentRange is built.
totalSegments := rg.Len()

for ctx.Err() == nil {
b.mu.Lock()
if b.rg == nil {
b.mu.Unlock()
return
}
nextToDownload := int(b.nextToDownload.Load())

// Check if all segments have been scheduled
totalSegments := b.rg.Len()
if b.nextToDownload >= totalSegments {
b.mu.Unlock()
// All segments have been scheduled — exit the loop.
if nextToDownload >= totalSegments {
break
}

// Limit how far ahead we prefetch beyond the current read position
currentRead := b.rg.GetCurrentIndex()
if b.nextToDownload-currentRead >= b.maxPrefetch {
b.mu.Unlock()
// Wait briefly before re-checking
// GetCurrentIndex is now lock-free (atomic read).
// Limit how far ahead we prefetch beyond the current read position.
currentRead := rg.GetCurrentIndex()
if nextToDownload-currentRead >= b.maxPrefetch {
// Block until the reader consumes a segment (immediate wake-up)
// or the fallback 1s timeout fires (in case the signal was missed).
select {
case <-time.After(50 * time.Millisecond):
continue
case <-b.segmentConsumed:
case <-time.After(1 * time.Second):
case <-ctx.Done():
return
}
continue
}

// Schedule next segment for download
idx := b.nextToDownload
b.nextToDownload++
b.mu.Unlock()

seg, err := b.rg.GetSegment(idx)
seg, err := rg.GetSegment(nextToDownload)
if err != nil || seg == nil {
b.nextToDownload.Add(1)
continue
}

idx := nextToDownload
b.nextToDownload.Add(1)

go func(segIdx int, s *segment) {
defer func() {
if p := recover(); p != nil {
Expand Down
Loading