Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 133 additions & 95 deletions x/blockdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,27 +480,35 @@ func (s *Database) ReadBlock(height BlockHeight) (BlockData, error) {

// Read the complete block data
blockData := make(BlockData, indexEntry.Size)
dataFile, localOffset, err := s.getDataFileAndOffset(indexEntry.Offset)
if err != nil {
s.log.Error("Failed to read block: failed to get data file",
zap.Uint64("height", height),
zap.Uint64("dataOffset", indexEntry.Offset),
zap.Error(err),
)
return nil, fmt.Errorf("failed to get data file for block at height %d: %w", height, err)
}
_, err = dataFile.ReadAt(blockData, int64(localOffset+uint64(sizeOfBlockEntryHeader)))
if err != nil {
s.log.Error("Failed to read block: failed to read block data from file",
zap.Uint64("height", height),
zap.Uint64("localOffset", localOffset),
zap.Uint32("blockSize", indexEntry.Size),
zap.Error(err),
)
return nil, fmt.Errorf("failed to read block data from data file: %w", err)
}

return blockData, nil
// loop to retry fetching the data file if it got closed between get and read.
// If not closed, we read the block and return.
for {
dataFile, localOffset, fileIndex, err := s.getDataFileAndOffset(indexEntry.Offset)
if err != nil {
s.log.Error("Failed to read block: failed to get data file",
zap.Uint64("height", height),
zap.Uint64("dataOffset", indexEntry.Offset),
zap.Error(err),
)
return nil, fmt.Errorf("failed to get data file for block at height %d: %w", height, err)
}
_, err = dataFile.ReadAt(blockData, int64(localOffset+uint64(sizeOfBlockEntryHeader)))
if err != nil {
if errors.Is(err, os.ErrClosed) {
s.fileCache.Evict(fileIndex)
continue
}
s.log.Error("Failed to read block: failed to read block data from file",
zap.Uint64("height", height),
zap.Uint64("localOffset", localOffset),
zap.Uint32("blockSize", indexEntry.Size),
zap.Error(err),
)
return nil, fmt.Errorf("failed to read block data from data file: %w", err)
}
return blockData, nil
}
}

// ReadHeader retrieves only the header portion of a block by its height.
Expand Down Expand Up @@ -529,29 +537,36 @@ func (s *Database) ReadHeader(height BlockHeight) (BlockData, error) {
return nil, fmt.Errorf("%w: invalid header size %d exceeds block size %d", ErrHeaderSizeTooLarge, indexEntry.HeaderSize, indexEntry.Size)
}

// Read only the header portion
headerData := make([]byte, indexEntry.HeaderSize)
dataFile, localOffset, err := s.getDataFileAndOffset(indexEntry.Offset)
if err != nil {
s.log.Error("Failed to read header: failed to get data file",
zap.Uint64("height", height),
zap.Uint64("dataOffset", indexEntry.Offset),
zap.Error(err),
)
return nil, fmt.Errorf("failed to get data file for block header at height %d: %w", height, err)
}
_, err = dataFile.ReadAt(headerData, int64(localOffset+uint64(sizeOfBlockEntryHeader)))
if err != nil {
s.log.Error("Failed to read header: failed to read header data from file",
zap.Uint64("height", height),
zap.Uint64("localOffset", localOffset),
zap.Uint32("headerSize", indexEntry.HeaderSize),
zap.Error(err),
)
return nil, fmt.Errorf("failed to read block header data from data file: %w", err)
}

return headerData, nil
// loop to retry fetching the data file if it got closed between get and read.
// If not closed, we read the header and return.
for {
dataFile, localOffset, fileIndex, err := s.getDataFileAndOffset(indexEntry.Offset)
if err != nil {
s.log.Error("Failed to read header: failed to get data file",
zap.Uint64("height", height),
zap.Uint64("dataOffset", indexEntry.Offset),
zap.Error(err),
)
return nil, fmt.Errorf("failed to get data file for block header at height %d: %w", height, err)
}
_, err = dataFile.ReadAt(headerData, int64(localOffset+uint64(sizeOfBlockEntryHeader)))
if err != nil {
if errors.Is(err, os.ErrClosed) {
s.fileCache.Evict(fileIndex)
continue
}
s.log.Error("Failed to read header: failed to read header data from file",
zap.Uint64("height", height),
zap.Uint64("localOffset", localOffset),
zap.Uint32("headerSize", indexEntry.HeaderSize),
zap.Error(err),
)
return nil, fmt.Errorf("failed to read block header data from data file: %w", err)
}
return headerData, nil
}
}

// ReadBody retrieves only the body portion (excluding header) of a block by its height.
Expand All @@ -567,46 +582,55 @@ func (s *Database) ReadBody(height BlockHeight) (BlockData, error) {

bodySize := indexEntry.Size - indexEntry.HeaderSize
bodyData := make([]byte, bodySize)
dataFile, localOffset, err := s.getDataFileAndOffset(indexEntry.Offset)
if err != nil {
s.log.Error("Failed to read body: failed to get data file",
zap.Uint64("height", height),
zap.Uint64("dataOffset", indexEntry.Offset),
zap.Error(err),
)
return nil, fmt.Errorf("failed to get data file for block body at height %d: %w", height, err)
}
headerOffset, err := safemath.Add(localOffset, uint64(sizeOfBlockEntryHeader))
if err != nil {
s.log.Error("Failed to read body: header offset calculation overflow",
zap.Uint64("height", height),
zap.Uint64("localOffset", localOffset),
zap.Error(err),
)
return nil, fmt.Errorf("calculating header offset would overflow for block at height %d: %w", height, err)
}
bodyOffset, err := safemath.Add(headerOffset, uint64(indexEntry.HeaderSize))
if err != nil {
s.log.Error("Failed to read body: body offset calculation overflow",
zap.Uint64("height", height),
zap.Uint64("headerOffset", headerOffset),
zap.Uint32("headerSize", indexEntry.HeaderSize),
zap.Error(err),
)
return nil, fmt.Errorf("calculating body offset would overflow for block at height %d: %w", height, err)
}

_, err = dataFile.ReadAt(bodyData, int64(bodyOffset))
if err != nil {
s.log.Error("Failed to read body: failed to read body data from file",
zap.Uint64("height", height),
zap.Uint64("bodyOffset", bodyOffset),
zap.Uint32("bodySize", bodySize),
zap.Error(err),
)
return nil, fmt.Errorf("failed to read block body data from data file: %w", err)
// loop to retry fetching the data file if it got closed between get and read.
// If not closed, we read the body and return.
for {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the read body and header methods will be moved soon.

dataFile, localOffset, fileIndex, err := s.getDataFileAndOffset(indexEntry.Offset)
if err != nil {
s.log.Error("Failed to read body: failed to get data file",
zap.Uint64("height", height),
zap.Uint64("dataOffset", indexEntry.Offset),
zap.Error(err),
)
return nil, fmt.Errorf("failed to get data file for block body at height %d: %w", height, err)
}
headerOffset, err := safemath.Add(localOffset, uint64(sizeOfBlockEntryHeader))
if err != nil {
s.log.Error("Failed to read body: header offset calculation overflow",
zap.Uint64("height", height),
zap.Uint64("localOffset", localOffset),
zap.Error(err),
)
return nil, fmt.Errorf("calculating header offset would overflow for block at height %d: %w", height, err)
}
bodyOffset, err := safemath.Add(headerOffset, uint64(indexEntry.HeaderSize))
if err != nil {
s.log.Error("Failed to read body: body offset calculation overflow",
zap.Uint64("height", height),
zap.Uint64("headerOffset", headerOffset),
zap.Uint32("headerSize", indexEntry.HeaderSize),
zap.Error(err),
)
return nil, fmt.Errorf("calculating body offset would overflow for block at height %d: %w", height, err)
}

_, err = dataFile.ReadAt(bodyData, int64(bodyOffset))
if err != nil {
if errors.Is(err, os.ErrClosed) {
s.fileCache.Evict(fileIndex)
continue
}
s.log.Error("Failed to read body: failed to read body data from file",
zap.Uint64("height", height),
zap.Uint64("bodyOffset", bodyOffset),
zap.Uint32("bodySize", bodySize),
zap.Error(err),
)
return nil, fmt.Errorf("failed to read block body data from data file: %w", err)
}
return bodyData, nil
}
return bodyData, nil
}

// HasBlock checks if a block exists at the given height.
Expand Down Expand Up @@ -879,7 +903,7 @@ func (s *Database) recoverBlockAtOffset(offset, totalDataSize uint64) (blockEntr
return bh, fmt.Errorf("%w: not enough data for block header at offset %d", ErrCorrupted, offset)
}

dataFile, localOffset, err := s.getDataFileAndOffset(offset)
dataFile, localOffset, _, err := s.getDataFileAndOffset(offset)
if err != nil {
return bh, fmt.Errorf("recovery: failed to get data file for offset %d: %w", offset, err)
}
Expand Down Expand Up @@ -989,7 +1013,7 @@ func (s *Database) initializeDataFiles() error {
// Pre-load the data file for the next write offset.
nextOffset := s.nextDataWriteOffset.Load()
if nextOffset > 0 {
_, _, err := s.getDataFileAndOffset(nextOffset)
_, _, _, err := s.getDataFileAndOffset(nextOffset)
if err != nil {
return fmt.Errorf("failed to pre-load data file for offset %d: %w", nextOffset, err)
}
Expand Down Expand Up @@ -1121,11 +1145,6 @@ func (s *Database) writeBlockAt(offset uint64, bh blockEntryHeader, block BlockD
return fmt.Errorf("failed to serialize block header: %w", err)
}

dataFile, localOffset, err := s.getDataFileAndOffset(offset)
if err != nil {
return fmt.Errorf("failed to get data file for writing block %d: %w", bh.Height, err)
}

// Allocate combined buffer for header and block data and write it to the data file
combinedBufSize, err := safemath.Add(uint64(sizeOfBlockEntryHeader), uint64(len(block)))
if err != nil {
Expand All @@ -1134,16 +1153,35 @@ func (s *Database) writeBlockAt(offset uint64, bh blockEntryHeader, block BlockD
combinedBuf := make([]byte, combinedBufSize)
copy(combinedBuf, headerBytes)
copy(combinedBuf[sizeOfBlockEntryHeader:], block)
if _, err := dataFile.WriteAt(combinedBuf, int64(localOffset)); err != nil {
return fmt.Errorf("failed to write block to data file at offset %d: %w", offset, err)
}

if s.config.SyncToDisk {
if err := dataFile.Sync(); err != nil {
return fmt.Errorf("failed to sync data file after writing block %d: %w", bh.Height, err)
// loop to retry fetching the data file if it got closed between get and write.
// If not closed, we write the block and return.
for {
dataFile, localOffset, fileIndex, err := s.getDataFileAndOffset(offset)
if err != nil {
return fmt.Errorf("failed to get data file for writing block %d: %w", bh.Height, err)
}

if _, err := dataFile.WriteAt(combinedBuf, int64(localOffset)); err != nil {
if errors.Is(err, os.ErrClosed) {
// ensure the file is evicted, otherwise we'll retry forever
s.fileCache.Evict(fileIndex)
continue
}
return fmt.Errorf("failed to write block to data file at offset %d: %w", offset, err)
}

if s.config.SyncToDisk {
if err := dataFile.Sync(); err != nil {
if errors.Is(err, os.ErrClosed) {
s.fileCache.Evict(fileIndex)
continue
}
return fmt.Errorf("failed to sync data file after writing block %d: %w", bh.Height, err)
}
}
return nil
}
return nil
}

func (s *Database) updateBlockHeights(writtenBlockHeight BlockHeight) error {
Expand Down Expand Up @@ -1335,10 +1373,10 @@ func (s *Database) allocateBlockSpace(totalSize uint32) (writeDataOffset uint64,
}
}

func (s *Database) getDataFileAndOffset(globalOffset uint64) (*os.File, uint64, error) {
func (s *Database) getDataFileAndOffset(globalOffset uint64) (*os.File, uint64, int, error) {
maxFileSize := s.header.MaxDataFileSize
fileIndex := int(globalOffset / maxFileSize)
localOffset := globalOffset % maxFileSize
handle, err := s.getOrOpenDataFile(fileIndex)
return handle, localOffset, err
return handle, localOffset, fileIndex, err
}
Loading
Loading