Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion rust/lance-table/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! any checks.
//!
//! When providing your own commit handler, most often you are implementing in
//! terms of a lock. The trait [CommitLock] can be implemented as a simpler

Check warning on line 22 in rust/lance-table/src/io/commit.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-table/src/io/commit.rs
//! alternative to [CommitHandler].

use std::io;
Expand All @@ -27,7 +27,7 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::{fmt::Debug, fs::DirEntry};

use chrono::{DateTime, Utc};
use super::manifest::write_manifest;
use futures::future::Either;
use futures::Stream;
Expand All @@ -39,7 +39,7 @@
use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
use lance_io::object_writer::{ObjectWriter, WriteResult};
use log::warn;
use object_store::PutOptions;

Check warning on line 42 in rust/lance-table/src/io/commit.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-table/src/io/commit.rs
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
use snafu::location;
use tracing::info;
Expand Down Expand Up @@ -224,6 +224,8 @@
/// if we detect a change in the e-tag, it means the manifest was tampered with.
/// This might happen if the dataset was deleted and then re-created.
pub e_tag: Option<String>,
/// Last modified time of the manifest file.
pub last_modified: DateTime<Utc>,
}

impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
Expand Down Expand Up @@ -251,6 +253,7 @@
size: Some(meta.size),
naming_scheme: scheme,
e_tag: meta.e_tag,
last_modified: meta.last_modified,
})
}
}
Expand Down Expand Up @@ -316,6 +319,7 @@
size: Some(meta.size),
naming_scheme: scheme,
e_tag: meta.e_tag,
last_modified: meta.last_modified,
})
}
// If the list is not lexically ordered, we need to iterate all manifests
Expand Down Expand Up @@ -352,6 +356,7 @@
size: Some(current_meta.size),
naming_scheme: scheme,
e_tag: current_meta.e_tag,
last_modified: current_meta.last_modified,
})
}
(None, _) => Err(Error::NotFound {
Expand Down Expand Up @@ -420,6 +425,7 @@
size: Some(metadata.len()),
naming_scheme: scheme.unwrap(),
e_tag: Some(get_etag(&metadata)),
last_modified: get_last_modified(&metadata)?,
}))
} else {
Ok(None)
Expand All @@ -443,6 +449,13 @@
format!("{inode:x}-{mtime:x}-{size:x}")
}

/// Get the last modified time from file metadata.
fn get_last_modified(metadata: &std::fs::Metadata) -> std::io::Result<DateTime<Utc>> {
let mtime = metadata.modified()?;
let dt = DateTime::<Utc>::from(mtime);
Ok(dt)
}

#[cfg(unix)]
/// We include the inode when available to yield an ETag more resistant to collisions
/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
Expand Down Expand Up @@ -608,6 +621,7 @@
path: ManifestNamingScheme::V2.manifest_path(base_path, version),
size: None,
e_tag: None,
last_modified: Utc::now(),
});
}

Expand All @@ -621,6 +635,7 @@
size: Some(meta.size),
naming_scheme: scheme,
e_tag: meta.e_tag,
last_modified: meta.last_modified,
}),
Err(ObjectStoreError::NotFound { .. }) => {
// fallback to V1
Expand All @@ -631,6 +646,7 @@
size: None,
naming_scheme: scheme,
e_tag: None,
last_modified: Utc::now(),
})
}
Err(e) => Err(e.into()),
Expand Down Expand Up @@ -875,6 +891,7 @@
naming_scheme,
path: version_path,
e_tag: res.e_tag,
last_modified: Utc::now(),
})
}
}
Expand Down Expand Up @@ -958,6 +975,7 @@
naming_scheme,
path,
e_tag: res.e_tag,
last_modified: Utc::now(),
})
}
}
Expand Down Expand Up @@ -1026,6 +1044,7 @@
size: Some(res.size as u64),
naming_scheme,
e_tag: None, // Re-name can change e-tag.
last_modified: Utc::now(),
})
}
Err(ObjectStoreError::AlreadyExists { .. }) => {
Expand Down Expand Up @@ -1101,6 +1120,7 @@
size: Some(size),
naming_scheme,
e_tag: res.e_tag,
last_modified: Utc::now(),
})
}
}
Expand Down
7 changes: 7 additions & 0 deletions rust/lance-table/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use chrono::Utc;
use lance_core::utils::tracing::{
AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
};
Expand Down Expand Up @@ -57,6 +58,7 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
size: None,
naming_scheme,
e_tag: None,
last_modified: Utc::now(),
})
}

Expand Down Expand Up @@ -84,6 +86,7 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
size: None,
naming_scheme,
e_tag: None,
last_modified: Utc::now(),
})
})
.transpose()
Expand Down Expand Up @@ -194,6 +197,7 @@ impl ExternalManifestCommitHandler {
size: Some(size),
naming_scheme,
e_tag,
last_modified: Utc::now(),
};

if !copied {
Expand Down Expand Up @@ -242,6 +246,7 @@ impl CommitHandler for ExternalManifestCommitHandler {
size,
naming_scheme,
e_tag,
last_modified,
}) => {
// The path is finalized, no need to check object store
if path.extension() == Some(MANIFEST_EXTENSION) {
Expand All @@ -251,6 +256,7 @@ impl CommitHandler for ExternalManifestCommitHandler {
size,
naming_scheme,
e_tag,
last_modified,
});
}

Expand Down Expand Up @@ -339,6 +345,7 @@ impl CommitHandler for ExternalManifestCommitHandler {
size: Some(size),
naming_scheme,
e_tag,
last_modified: Utc::now(),
});
}
Err(ObjectStoreError::NotFound { .. }) => {
Expand Down
175 changes: 175 additions & 0 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,177 @@
}
}

pub struct Snapshot {
pub dataset: Arc<Dataset>,
pub manifest_location: ManifestLocation,
}

impl Snapshot {
pub fn new(dataset: Arc<Dataset>, manifest_location: ManifestLocation) -> Self {
Self {
dataset,
manifest_location,
}
}

pub fn version_number(&self) -> u64 {
self.manifest_location.version
}

pub fn timestamp(&self) -> DateTime<Utc> {
self.manifest_location.last_modified
}

pub async fn manifest(&self) -> Result<Manifest> {
read_manifest(
&self.dataset.object_store,
&self.manifest_location.path,
self.manifest_location.size,
)
.await
}

pub async fn version(&self) -> Result<Version> {
self.manifest().await.map(|m| Version::from(&m))
}

/// Read the transaction for this snapshot.
///
/// Uses the Dataset's metadata cache for efficient repeated access.
/// The cache is keyed by version, so repeated calls for the same version
/// are efficient.
pub async fn transaction(&self) -> Result<Option<Transaction>> {
self.dataset
.read_transaction_by_version(self.manifest_location.version)
.await
}

Check warning on line 259 in rust/lance/src/dataset.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset.rs
/// Checkout this snapshot as a new Dataset instance.
pub async fn checkout(&self) -> Result<Dataset> {
self.dataset.checkout_version(self.manifest_location.version).await
}
}

pub struct Snapshots {
dataset: Arc<Dataset>,
}

impl Snapshots {
pub fn new(dataset: Arc<Dataset>) -> Self {
Self { dataset }
}

/// List all snapshots in descending order (latest first).
async fn list_inner(&self) -> Result<Vec<ManifestLocation>> {
self.dataset
.commit_handler
.list_manifest_locations(&self.dataset.base, &self.dataset.object_store, true)
.try_collect()
.await
}

/// Get the latest snapshot.
pub async fn latest(&self) -> Result<Snapshot> {
let location = self
.dataset
.commit_handler
.resolve_latest_location(&self.dataset.base, &self.dataset.object_store)
.await?;
Ok(Snapshot::new(self.dataset.clone(), location))
}

/// Get all snapshots as a vector (latest first).
pub async fn list(&self) -> Result<Vec<Snapshot>> {
Ok(self
.list_inner()
.await?
.into_iter()
.map(|loc| Snapshot::new(self.dataset.clone(), loc))
.collect())
}

/// Get snapshots within the version range `[lower_version, upper_version)`.
/// The results are ordered from latest to earliest.
pub async fn within(&self, lower_version: u64, upper_version: u64) -> Result<Vec<Snapshot>> {
Ok(self
.list()
.await?
.into_iter()
.filter(|s| s.version_number() >= lower_version && s.version_number() < upper_version)
.collect())
}

/// Get snapshots within the timestamp range `[lower, upper)`.
/// The results are ordered from latest to earliest.
pub async fn within_timestamp(
&self,
lower: DateTime<Utc>,
upper: DateTime<Utc>,
) -> Result<Vec<Snapshot>> {
Ok(self
.list()
.await?
.into_iter()
.filter(|s| s.timestamp() >= lower && s.timestamp() < upper)
.collect())
}

/// Get snapshots earlier than the given version, returning at most `limit` results.
/// The results are ordered from latest to earliest.
pub async fn earlier_than(&self, version: u64, limit: u32) -> Result<Vec<Snapshot>> {
Ok(self
.list()
.await?
.into_iter()
.filter(|s| s.version_number() < version)
.take(limit as usize)
.collect())
}

/// Get snapshots earlier than the given timestamp, returning at most `limit` results.
/// The results are ordered from latest to earliest.
pub async fn earlier_than_timestamp(
&self,
timestamp: DateTime<Utc>,
limit: u32,
) -> Result<Vec<Snapshot>> {
Ok(self
.list()
.await?
.into_iter()
.filter(|s| s.timestamp() < timestamp)
.take(limit as usize)
.collect())
}

/// Get snapshots later than the given version, returning at most `limit` results.
/// The results are ordered from latest to earliest.
pub async fn later_than(&self, version: u64, limit: u32) -> Result<Vec<Snapshot>> {
Ok(self
.list()
.await?
.into_iter()
.filter(|s| s.version_number() >= version)
.take(limit as usize)
.collect())
}

/// Get snapshots later than the given timestamp, returning at most `limit` results.
/// The results are ordered from latest to earliest.
pub async fn later_than_timestamp(
&self,
timestamp: DateTime<Utc>,
limit: u32,
) -> Result<Vec<Snapshot>> {
Ok(self
.list()
.await?
.into_iter()
.filter(|s| s.timestamp() >= timestamp)
.take(limit as usize)
.collect())
}
}
/// Customize read behavior of a dataset.
#[derive(Clone, Debug)]
pub struct ReadParams {
Expand Down Expand Up @@ -439,6 +610,10 @@
self.refs.branches()
}

pub fn snapshots(&self) -> Snapshots {
Snapshots::new(Arc::new(self.clone()))
}

/// Check out the latest version of the dataset
pub async fn checkout_latest(&mut self) -> Result<()> {
let (manifest, manifest_location) = self.latest_manifest().await?;
Expand Down
Loading
Loading