Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/lance-table/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub use fragment::*;
pub use index::IndexMetadata;

pub use manifest::{
BasePath, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, SelfDescribingFileReader,
WriterVersion, is_detached_version,
BasePath, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, ManifestSummary,
SelfDescribingFileReader, WriterVersion, is_detached_version,
};
pub use transaction::Transaction;

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
.collect()
}

#[derive(Default)]
#[derive(Default, Debug, Clone, PartialEq)]
pub struct ManifestSummary {
pub total_fragments: u64,
pub total_data_files: u64,
Expand Down
44 changes: 43 additions & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ use std::sync::Arc;
use take::row_offsets_to_row_addresses;
use tracing::{info, instrument};

pub use checkpoint::{CheckpointConfig, VersionCheckpoint, VersionSummary};
pub(crate) mod blob;
mod branch_location;
pub mod builder;
pub mod checkpoint;
pub mod cleanup;
pub mod delta;
pub mod fragment;
Expand Down Expand Up @@ -212,6 +214,16 @@ impl From<&Manifest> for Version {
}
}

impl From<&VersionSummary> for Version {
fn from(s: &VersionSummary) -> Self {
Self {
version: s.version,
timestamp: DateTime::from_timestamp_millis(s.timestamp_millis).unwrap_or_else(Utc::now),
metadata: s.manifest_summary.clone().into(),
}
}
}

/// Customize read behavior of a dataset.
#[derive(Clone, Debug)]
pub struct ReadParams {
Expand Down Expand Up @@ -1757,11 +1769,40 @@ impl Dataset {
}

/// Get all versions.
///
/// This method efficiently retrieves version history by:
/// 1. Reading historical versions from VersionCheckpoint when available
/// 2. Reading only incremental manifests for versions newer than the checkpoint
///
/// If the checkpoint is unavailable or disabled, falls back to reading all manifests.
pub async fn versions(&self) -> Result<Vec<Version>> {
let mut versions: Vec<Version> = self
let config = CheckpointConfig::from_config(&self.manifest.config);
let (checkpointed_latest_version, mut versions): (u64, Vec<Version>) = if config.enabled {
VersionCheckpoint::load_latest(self.base.clone(), self.object_store.clone(), config)
.await?
.map(|version_checkpoint| {
(
version_checkpoint.latest_version(),
version_checkpoint
.versions
.iter()
.filter(|version| !version.is_cleaned_up)
.map(|s| s.into())
.collect(),
)
})
.unwrap_or_default()
} else {
(0, Vec::new())
};

let inc_versions: Vec<Version> = self
.commit_handler
.list_manifest_locations(&self.base, &self.object_store, false)
.try_filter_map(|location| async move {
if location.version <= checkpointed_latest_version {
return Ok(None);
}
match read_manifest(&self.object_store, &location.path, location.size).await {
Ok(manifest) => Ok(Some(Version::from(&manifest))),
Err(e) => Err(e),
Expand All @@ -1770,6 +1811,7 @@ impl Dataset {
.try_collect()
.await?;

versions.extend(inc_versions);
// TODO: this API should support pagination
versions.sort_by_key(|v| v.version);

Expand Down
Loading
Loading