Skip to content

Commit

Permalink
Update piecedirectory.go
Browse files Browse the repository at this point in the history
Full update with Triton patches for Solana indexing
  • Loading branch information
cryptowhizzard authored Oct 21, 2024
1 parent 37ee98d commit 941a6a1
Showing 1 changed file with 196 additions and 54 deletions.
250 changes: 196 additions & 54 deletions piecedirectory/piecedirectory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

)

var log = logging.Logger("piecedirectory")
Expand Down Expand Up @@ -81,6 +82,33 @@ type PieceDirectory struct {
addIdxOpByCid sync.Map
}

func (ps *PieceDirectory) StartJobLogger() {
ticker := time.NewTicker(time.Minute)
retryTicker := time.NewTicker(30 * time.Second)
go func() {
for {
if ps.ctx != nil {
for {
select {
case <-ticker.C:
ps.logOpenJobs()
case <-ps.ctx.Done():
ticker.Stop()
retryTicker.Stop()
return
}
}
} else {
log.Error("Context is nil in StartJobLogger, retrying in 30 seconds")
select {
case <-retryTicker.C:
// Retry after 30 seconds
}
}
}
}()
}

func NewPieceDirectory(store *bdclient.Store, pr types.PieceReader, addIndexThrottleSize int, opts ...Option) *PieceDirectory {
prCache := ttlcache.NewCache()
_ = prCache.SetTTL(30 * time.Second)
Expand Down Expand Up @@ -127,6 +155,7 @@ func NewPieceDirectory(store *bdclient.Store, pr types.PieceReader, addIndexThro

prCache.SetExpirationReasonCallback(expireCallback)

pd.StartJobLogger()
return pd
}

Expand Down Expand Up @@ -258,57 +287,104 @@ func (ps *PieceDirectory) AddDealForPiece(ctx context.Context, pieceCid cid.Cid,
type addIndexOperation struct {
done chan struct{}
err error
startTime time.Time
isRunningMu sync.Mutex // Mutex for accessing isRunning
isQueued bool // Indicates if the job is queued
isRunning bool // Track if the operation is currently running
closeOnce sync.Once // Ensures the channel is closed only once
}

func (ps *PieceDirectory) addIndexForPieceThrottled(ctx context.Context, pieceCid cid.Cid, dealInfo model.DealInfo) error {
// Check if there is already an add index operation in progress for the
// given piece cid. If not, create a new one.
opi, loaded := ps.addIdxOpByCid.LoadOrStore(pieceCid, &addIndexOperation{
done: make(chan struct{}),
})
op := opi.(*addIndexOperation)
if loaded {
log.Debugw("add index: operation in progress, waiting for completion", "pieceCid", pieceCid)
defer func() {
log.Debugw("add index: in progress operation completed", "pieceCid", pieceCid)
}()

// There is an add index operation in progress, so wait for it to
// complete
select {
case <-ctx.Done():
return ctx.Err()
case <-op.done:
return op.err
}
}

// A new operation was added to the map, so clean it up when it's done
defer ps.addIdxOpByCid.Delete(pieceCid)
defer close(op.done)

// Wait for the throttle to yield an open spot
log.Debugw("add index: wait for open throttle position",
"pieceCid", pieceCid, "queued", len(ps.addIdxThrottle), "queue-limit", ps.addIdxThrottleSize)
select {
case <-ctx.Done():
op.err = ctx.Err()
return ctx.Err()
case ps.addIdxThrottle <- struct{}{}:
}
defer func() { <-ps.addIdxThrottle }()

// Perform the add index operation.
// Note: Once we start the add index operation we don't want to cancel it
// if one of the waiting threads cancels its context. So instead we use the
// PieceDirectory's context.
op.err = ps.addIndexForPiece(ps.ctx, pieceCid, dealInfo)
opi, loaded := ps.addIdxOpByCid.LoadOrStore(pieceCid, &addIndexOperation{
done: make(chan struct{}),
startTime: time.Now(),
isRunning: false,
closeOnce: sync.Once{},
})
op := opi.(*addIndexOperation)

if loaded {
log.Debugw("add index: operation already in queue, waiting for completion", "pieceCid", pieceCid)
defer func() {
log.Debugw("add index: queued operation completed", "pieceCid", pieceCid)
}()

select {
case <-ctx.Done():
return ctx.Err()
case <-op.done:
return op.err
}
}

if !loaded {
// Job is newly added to the queue
op.isQueued = true
}

defer ps.addIdxOpByCid.Delete(pieceCid)
defer op.closeOnce.Do(func() {
op.isRunningMu.Lock()
op.isRunning = false // Ensure isRunning is set to false when done
op.isRunningMu.Unlock()
close(op.done)
})

// Wait for the throttle to yield an open spot
log.Debugw("add index: waiting for throttle position", "pieceCid", pieceCid, "queued", len(ps.addIdxThrottle), "queue-limit", ps.addIdxThrottleSize)
select {
case <-ctx.Done():
op.err = ctx.Err()
return ctx.Err()
case ps.addIdxThrottle <- struct{}{}:
// Release the throttle spot when the function returns
defer func() { <-ps.addIdxThrottle }()
}

op.isQueued = false
op.isRunningMu.Lock()
op.isRunning = true
op.isRunningMu.Unlock()

// Start the timeout context here, after the job has been dequeued
jobCtx, cancel := context.WithTimeout(ctx, time.Minute * 600) // 600-minute timeout for the job
defer cancel()

// Perform the add index operation
op.err = ps.addIndexForPiece(jobCtx, pieceCid, dealInfo)

if op.err != nil {
log.Errorw("add index: error running job", "pieceCid", pieceCid, "error", op.err)
} else {
log.Infow("add index: job completed successfully", "pieceCid", pieceCid, "duration", time.Since(op.startTime))
}

return op.err
}

// Return the result
log.Debugw("add index: completed", "pieceCid", pieceCid)
return op.err
// Updated logOpenJobs function
func (ps *PieceDirectory) logOpenJobs() {
var runningCount, queuedCount int
ps.addIdxOpByCid.Range(func(key, value interface{}) bool {
op := value.(*addIndexOperation)

var status string
if op.isRunning {
status = "running"
runningCount++
} else if op.isQueued {
status = "queued"
queuedCount++
}

log.Infow("add index: job status", "pieceCid", key, "status", status)
return true
})

log.Infow("Job Summary", "runningJobs", runningCount, "queuedJobs", queuedCount)
}

/*
func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid, dealInfo model.DealInfo) error {
// Get a reader over the piece data
log.Debugw("add index: get index", "pieceCid", pieceCid)
Expand Down Expand Up @@ -372,6 +448,81 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid
return eg.Wait()
}
*/

func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid, dealInfo model.DealInfo) error {
// Get a reader over the piece data
log.Debugw("add index: get index", "pieceCid", pieceCid)
reader, err := ps.pieceReader.GetReader(ctx, dealInfo.MinerAddr, dealInfo.SectorID, dealInfo.PieceOffset, dealInfo.PieceLength)
log.Debugf("got the piece reader for piece %s and deal %s", pieceCid, dealInfo.DealUuid)
if err != nil {
return fmt.Errorf("getting reader over piece %s: %w", pieceCid, err)
}

defer reader.Close() //nolint:errcheck

// Try to parse data as containing a data segment index
log.Debugw("add index: read index", "pieceCid", pieceCid)
recs, err := parsePieceWithDataSegmentIndex(pieceCid, int64(dealInfo.PieceLength.Unpadded()), reader)
if err != nil {
log.Infow("add index: data segment check failed. falling back to car", "pieceCid", pieceCid, "err", err)
// Iterate over all the blocks in the piece to extract the index records
if _, err := reader.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("seek to start for piece %s: %w", pieceCid, err)
}
// Creating a buffered reader specifically for CAR parsing
bufferSize := 20 * 1024 * 1024 // 20 MB buffer size
bufferedReader := bufio.NewReaderSize(reader, bufferSize)

recs, err = parseRecordsFromCar(bufferedReader)
if err != nil {
return fmt.Errorf("parse car for piece %s: %w", pieceCid, err)
}
}

if len(recs) == 0 {
log.Warnw("add index: generated index with 0 recs", "pieceCid", pieceCid)
return nil
}

// Total number of records
totalRecs := len(recs)
var processedRecs int64

// Transferring a large number of records over the wire can take a significant amount of time.
// Split the transfer into multiple concurrent parts to speed it up.
concurrency := ps.settings.addIndexConcurrency
if concurrency > totalRecs {
concurrency = totalRecs
}

rangeLen := totalRecs / concurrency
eg, ctx := errgroup.WithContext(ctx)

for i := 0; i < concurrency; i++ {
i := i
eg.Go(func() error {
start := i * rangeLen
end := start + rangeLen
if i == concurrency-1 {
end = totalRecs
}

if err := ps.store.AddIndex(ctx, pieceCid, recs[start:end], true); err != nil {
return fmt.Errorf("adding CAR index for piece %s: %w", pieceCid, err)
}

// Update processed records count and log progress
atomic.AddInt64(&processedRecs, int64(end-start))
currentPercentage := (float64(atomic.LoadInt64(&processedRecs)) / float64(totalRecs)) * 100
log.Infof("Progress for piece %s: %.2f%%", pieceCid, currentPercentage)

return nil
})
}

return eg.Wait()
}

func parseRecordsFromCar(reader io.Reader) ([]model.Record, error) {
// Iterate over all the blocks in the piece to extract the index records
Expand Down Expand Up @@ -625,15 +776,6 @@ func (ps *PieceDirectory) RemoveDealForPiece(ctx context.Context, pieceCid cid.C
return nil
}

//func (ps *piecedirectory) deleteIndexForPiece(pieceCid cid.Cid) interface{} {
// TODO: Maybe mark for GC instead of deleting immediately

// Delete mh => offset index from store
//err := ps.carIndex.Delete(pieceCid)
//if err != nil {
//err = fmt.Errorf("deleting CAR index for piece %s: %w", pieceCid, err)
//}

//// Delete mh => piece index from store
//if mherr := ps.mhToPieceIndex.Delete(pieceCid); mherr != nil {
//err = multierror.Append(fmt.Errorf("deleting cid index for piece %s: %w", pieceCid, mherr))
Expand Down

0 comments on commit 941a6a1

Please sign in to comment.