Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 44 additions & 11 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{io, marker::PhantomData, mem, ops::Range, vec};
use std::{fmt::Debug, io, marker::PhantomData, mem, ops::Range, vec};

use itertools::Itertools;
use log::{debug, info, trace, warn};
Expand Down Expand Up @@ -333,7 +333,8 @@ pub fn committed_meta(repo: impl Repo) -> Result<Option<segment::Metadata>, erro
};

let mut storage = repo.open_segment_reader(last)?;
segment::Metadata::extract(last, &mut storage).map(Some)
let offset_index = repo.get_offset_index(last).ok();
segment::Metadata::extract(last, &mut storage, offset_index.as_ref()).map(Some)
}

pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
Expand Down Expand Up @@ -464,29 +465,61 @@ fn reset_to_internal(repo: &impl Repo, segments: &[u64], offset: u64) -> io::Res
repo.remove_segment(segment)?;
} else {
// Read commit-wise until we find the byte offset.
let reader = repo::open_segment_reader(repo, DEFAULT_LOG_FORMAT_VERSION, segment)?;
let mut reader = repo::open_segment_reader(repo, DEFAULT_LOG_FORMAT_VERSION, segment)?;

let (index_file, mut byte_offset) = repo
.get_offset_index(segment)
.and_then(|index_file| {
let (key, byte_offset) = index_file.key_lookup(offset).map_err(|e| {
io::Error::new(io::ErrorKind::NotFound, format!("Offset index cannot be used: {e:?}"))
})?;

reader.seek_to_offset(&index_file, key).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Offset index is not used at offset {key}: {e}"),
)
})?;

Ok((Some(index_file), byte_offset))
})
.inspect_err(|e| {
warn!("commitlog offset index is not used: {e:?}");
})
.unwrap_or((None, segment::Header::LEN as u64));

let commits = reader.commits();

let mut bytes_read = 0;
for commit in commits {
let commit = commit?;
if commit.min_tx_offset > offset {
break;
}
bytes_read += Commit::from(commit).encoded_len() as u64;
byte_offset += Commit::from(commit).encoded_len() as u64;
}

if bytes_read == 0 {
if byte_offset == segment::Header::LEN as u64 {
// Segment is empty, just remove it.
repo.remove_segment(segment)?;
} else {
let byte_offset = segment::Header::LEN as u64 + bytes_read;
debug!("truncating segment {segment} to {offset} at {byte_offset}");
let mut file = repo.open_segment_writer(segment)?;
// Note: The offset index truncates equal or greater,
// inclusive. We'd like to retain `offset` in the index, as
// the commit is also retained in the log.
file.ftruncate(offset + 1, byte_offset)?;

if let Some(mut index_file) = index_file {
let index_file = index_file.as_mut();
// Note: The offset index truncates equal or greater,
// inclusive. We'd like to retain `offset` in the index, as
// the commit is also retained in the log.
index_file.ftruncate(offset + 1, byte_offset).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Failed to truncate offset index: {e}"),
)
})?;
index_file.async_flush()?;
}

file.ftruncate(offset, byte_offset)?;
// Some filesystems require fsync after ftruncate.
file.fsync()?;
break;
Expand Down
7 changes: 4 additions & 3 deletions crates/commitlog/src/index/indexfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
/// - `IndexError::OutOfMemory`: Append after index file is already full.
pub fn append(&mut self, key: Key, value: u64) -> Result<(), IndexError> {
let key = key.into();
if self.last_key()? >= key {
return Err(IndexError::InvalidInput);
let last_key = self.last_key()?;
if last_key >= key {
return Err(IndexError::InvalidInput(last_key, key));
}

let start = self.num_entries * ENTRY_SIZE;
Expand All @@ -186,7 +187,7 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
}

/// Truncates the index file starting from the entry with a key greater than or equal to the given key.
pub fn truncate(&mut self, key: Key) -> Result<(), IndexError> {
pub(crate) fn truncate(&mut self, key: Key) -> Result<(), IndexError> {
let key = key.into();
let (found_key, index) = self.find_index(Key::from(key))?;

Expand Down
4 changes: 2 additions & 2 deletions crates/commitlog/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub enum IndexError {
#[error("Asked key is smaller than the first entry in the index")]
KeyNotFound,

#[error("Key should be monotnously increasing")]
InvalidInput,
#[error("Key should be monotonically increasing: input: {1}, last: {0}")]
InvalidInput(u64, u64),

#[error("index file is not readable")]
InvalidFormat,
Expand Down
17 changes: 16 additions & 1 deletion crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,20 @@ impl<T: Repo> Repo for &T {
fn existing_offsets(&self) -> io::Result<Vec<u64>> {
T::existing_offsets(self)
}

fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result<TxOffsetIndexMut> {
T::create_offset_index(self, offset, cap)
}

/// Remove [`TxOffsetIndexMut`] named with `offset`.
fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> {
T::remove_offset_index(self, offset)
}

/// Get [`TxOffsetIndex`] for the given `offset`.
fn get_offset_index(&self, offset: TxOffset) -> io::Result<TxOffsetIndex> {
T::get_offset_index(self, offset)
}
}

impl<T: SegmentLen> SegmentLen for io::BufReader<T> {
Expand Down Expand Up @@ -223,12 +237,13 @@ pub fn resume_segment_writer<R: Repo>(
offset: u64,
) -> io::Result<Result<Writer<R::SegmentWriter>, Metadata>> {
let mut storage = repo.open_segment_writer(offset)?;
let offset_index = repo.get_offset_index(offset).ok();
let Metadata {
header,
tx_range,
size_in_bytes,
max_epoch,
} = match Metadata::extract(offset, &mut storage) {
} = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) {
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
warn!("invalid commit in segment {offset}: {source}");
debug!("sofar={sofar:?}");
Expand Down
138 changes: 119 additions & 19 deletions crates/commitlog/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use log::{debug, warn};
use crate::{
commit::{self, Commit, StoredCommit},
error,
index::IndexError,
index::{IndexError, IndexFileMut},
payload::Encode,
repo::{TxOffset, TxOffsetIndex, TxOffsetIndexMut},
Options,
Expand Down Expand Up @@ -332,11 +332,31 @@ impl FileLike for OffsetIndexWriter {

fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
self.reset();
let _ = self.head.truncate(tx_offset);
self.head
.truncate(tx_offset)
.inspect_err(|e| {
warn!("failed to truncate offset index at {tx_offset}: {e:?}");
})
.ok();
Ok(())
}
}

impl FileLike for IndexFileMut<TxOffset> {
fn fsync(&mut self) -> io::Result<()> {
self.async_flush()
}

fn ftruncate(&mut self, tx_offset: u64, _size: u64) -> io::Result<()> {
self.truncate(tx_offset).map_err(|e| {
io::Error::new(
ErrorKind::Other,
format!("failed to truncate offset index at {tx_offset}: {e:?}"),
)
})
}
}

#[derive(Debug)]
pub struct Reader<R> {
pub header: Header,
Expand Down Expand Up @@ -393,7 +413,7 @@ impl<R: io::BufRead + io::Seek> Reader<R> {

#[cfg(test)]
pub(crate) fn metadata(self) -> Result<Metadata, error::SegmentMetadata> {
Metadata::with_header(self.min_tx_offset, self.header, self.inner)
Metadata::with_header(self.min_tx_offset, self.header, self.inner, None)
}
}

Expand Down Expand Up @@ -513,30 +533,38 @@ pub struct Metadata {
}

impl Metadata {
/// Read and validate metadata from a segment.
/// Reads and validates metadata from a segment.
/// It will look for last commit index offset and then traverse the segment
///
/// This traverses the entire segment, consuming thre `reader.
/// Doing so is necessary to determine `max_tx_offset`, `size_in_bytes` and
/// `max_epoch`.
pub(crate) fn extract<R: io::Read>(min_tx_offset: u64, mut reader: R) -> Result<Self, error::SegmentMetadata> {
/// Determines `max_tx_offset`, `size_in_bytes`, and `max_epoch` from the segment.
pub(crate) fn extract<R: io::Read + io::Seek>(
min_tx_offset: TxOffset,
mut reader: R,
offset_index: Option<&TxOffsetIndex>,
) -> Result<Self, error::SegmentMetadata> {
let header = Header::decode(&mut reader)?;
Self::with_header(min_tx_offset, header, reader)
Self::with_header(min_tx_offset, header, reader, offset_index)
}

fn with_header<R: io::Read>(
fn with_header<R: io::Read + io::Seek>(
min_tx_offset: u64,
header: Header,
mut reader: R,
offset_index: Option<&TxOffsetIndex>,
) -> Result<Self, error::SegmentMetadata> {
let mut sofar = Self {
header,
tx_range: Range {
start: min_tx_offset,
end: min_tx_offset,
},
size_in_bytes: Header::LEN as u64,
max_epoch: Commit::DEFAULT_EPOCH,
};
let mut sofar = offset_index
.and_then(|index| Self::find_valid_indexed_commit(min_tx_offset, header, &mut reader, index).ok())
.unwrap_or_else(|| Self {
header,
tx_range: Range {
start: min_tx_offset,
end: min_tx_offset,
},
size_in_bytes: Header::LEN as u64,
max_epoch: u64::default(),
});

reader.seek(SeekFrom::Start(sofar.size_in_bytes))?;

fn commit_meta<R: io::Read>(
reader: &mut R,
Expand Down Expand Up @@ -573,6 +601,78 @@ impl Metadata {

Ok(sofar)
}

/// Finds the last valid commit in the segment using the offset index.
/// It traverses the index in reverse order, starting from the last key.
///
/// Returns
/// * `Ok((Metadata)` - If a valid commit is found containing the commit, It adds a default
/// header, which should be replaced with the actual header.
/// * `Err` - If no valid commit is found or if the index is empty
fn find_valid_indexed_commit<R: io::Read + io::Seek>(
min_tx_offset: u64,
header: Header,
reader: &mut R,
offset_index: &TxOffsetIndex,
) -> io::Result<Metadata> {
let mut candidate_last_key = TxOffset::MAX;

while let Ok((key, byte_offset)) = offset_index.key_lookup(candidate_last_key) {
match Self::validate_commit_at_offset(reader, key, byte_offset) {
Ok(commit) => {
return Ok(Metadata {
header,
tx_range: Range {
start: min_tx_offset,
end: commit.tx_range.end,
},
size_in_bytes: byte_offset + commit.size_in_bytes,
max_epoch: commit.epoch,
});
}

// `TxOffset` at `byte_offset` is not valid, so try with previous entry
Err(_) => {
candidate_last_key = key.saturating_sub(1);
if candidate_last_key == 0 {
break;
}
}
}
}

Err(io::Error::new(
ErrorKind::InvalidData,
format!("No valid commit found in index up to key: {}", candidate_last_key),
))
}

/// Validates and decodes a commit at `byte_offset` in the segment.
///
/// # Returns
/// * `Ok(commit::Metadata)` - If a valid commit is found with matching transaction offset
/// * `Err` - If commit can't be decoded or has mismatched transaction offset
fn validate_commit_at_offset<R: io::Read + io::Seek>(
reader: &mut R,
tx_offset: TxOffset,
byte_offset: u64,
) -> io::Result<commit::Metadata> {
reader.seek(SeekFrom::Start(byte_offset))?;
let commit = commit::Metadata::extract(reader)?
.ok_or_else(|| io::Error::new(ErrorKind::InvalidData, "failed to decode commit"))?;

if commit.tx_range.start != tx_offset {
return Err(io::Error::new(
ErrorKind::InvalidData,
format!(
"mismatch key in index offset file: expected={} actual={}",
tx_offset, commit.tx_range.start
),
));
}

Ok(commit)
}
}

#[cfg(test)]
Expand Down
12 changes: 7 additions & 5 deletions crates/commitlog/src/stream/reader.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::ops::RangeBounds;
use std::{io::SeekFrom, ops::RangeBounds};

use async_stream::try_stream;
use bytes::{Buf as _, Bytes};
use futures::Stream;
use log::{debug, info, trace, warn};
use tokio::io::{self, AsyncBufRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _, SeekFrom};
use tokio::task::spawn_blocking;
use log::{debug, trace, warn};
use tokio::{
io::{self, AsyncBufRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _},
task::spawn_blocking,
};
use tokio_util::io::SyncIoBridge;

use crate::{
Expand Down Expand Up @@ -64,7 +66,7 @@ fn read_segment(
range: RangeFromMaybeToInclusive,
) -> impl Stream<Item = io::Result<Bytes>> {
try_stream! {
info!("reading segment {segment_start}");
trace!("reading segment {segment_start}");
let (segment_header, segment_header_bytes) = {
let mut buf = [0u8; segment::Header::LEN];
segment.read_exact(&mut buf).await?;
Expand Down
Loading
Loading