Skip to content

Commit

Permalink
ut: add test for Snapshot (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
QuakeWang authored Aug 6, 2024
1 parent e86c828 commit bd2f887
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 1 deletion.
3 changes: 2 additions & 1 deletion crates/paimon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ version.workspace = true
bitflags = "2.6.0"
chrono = { version = "0.4.38", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_with = "3.8.3"
serde_bytes = "0.11.15"
serde_json = "1.0.120"
serde_with = "3.8.3"
snafu = "0.8.3"
typed-builder = "^0.19"
opendal = "0.48"
247 changes: 247 additions & 0 deletions crates/paimon/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use typed_builder::TypedBuilder;

/// Snapshot for paimon.
Expand All @@ -38,7 +39,21 @@ pub struct Snapshot {
/// a manifest recording all index files of this table
#[builder(default = None)]
index_manifest: Option<String>,
/// user who committed this snapshot
commit_user: String,
/// Mainly for snapshot deduplication.
///
/// If multiple snapshots have the same commitIdentifier, reading from any of these snapshots
/// must produce the same table.
///
/// If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be
/// committed before snapshot B, and thus snapshot A must contain older records than snapshot B.
commit_identifier: i64,
/// timestamp of this snapshot
time_millis: u64,
/// log offsets of all changes occurred in this snapshot
#[builder(default = None)]
log_offsets: Option<HashMap<i32, i64>>,
/// record count of all changes occurred in this snapshot
#[builder(default = None)]
total_record_count: Option<i64>,
Expand Down Expand Up @@ -105,6 +120,24 @@ impl Snapshot {
&self.commit_user
}

/// Get the commit time of this snapshot.
#[inline]
pub fn time_millis(&self) -> u64 {
self.time_millis
}

/// Get the commit identifier of this snapshot.
#[inline]
pub fn commit_identifier(&self) -> i64 {
self.commit_identifier
}

/// Get the log offsets of this snapshot.
#[inline]
pub fn log_offsets(&self) -> Option<&HashMap<i32, i64>> {
self.log_offsets.as_ref()
}

/// Get the total record count of this snapshot.
#[inline]
pub fn total_record_count(&self) -> Option<i64> {
Expand Down Expand Up @@ -135,3 +168,217 @@ impl Snapshot {
self.statistics.as_deref()
}
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json;

#[test]
fn test_snapshot_creation() {
let snapshot = Snapshot::builder()
.version(3)
.id(1)
.schema_id(0)
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
.commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
.commit_identifier(9223372036854775807)
.time_millis(1721287833568)
.build();

assert_eq!(snapshot.version(), 3);
assert_eq!(snapshot.id(), 1);
assert_eq!(snapshot.schema_id(), 0);
assert_eq!(
snapshot.base_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
);
assert_eq!(
snapshot.delta_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
);
assert_eq!(
snapshot.commit_user(),
"cf568e07-05ad-4943-b4bd-37461bc58729"
);
assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
assert_eq!(snapshot.time_millis(), 1721287833568);
assert!(snapshot.changelog_manifest_list().is_none());
assert!(snapshot.index_manifest().is_none());
assert!(snapshot.log_offsets().is_none());
assert!(snapshot.total_record_count().is_none());
assert!(snapshot.delta_record_count().is_none());
assert!(snapshot.changelog_record_count().is_none());
assert!(snapshot.watermark().is_none());
assert!(snapshot.statistics().is_none());
}

#[test]
fn test_snapshot_with_optional_fields() {
let snapshot = Snapshot::builder()
.version(3)
.id(1)
.schema_id(0)
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
.changelog_manifest_list(Some(
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2".to_string(),
))
.index_manifest(Some(
"index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0".to_string(),
))
.commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
.commit_identifier(9223372036854775807)
.time_millis(1721287833568)
.total_record_count(Some(1))
.delta_record_count(Some(1))
.changelog_record_count(Some(0))
.watermark(Some(-9223372036854775808))
.statistics(Some("statistics_v2".to_string()))
.build();

assert_eq!(snapshot.version(), 3);
assert_eq!(snapshot.id(), 1);
assert_eq!(snapshot.schema_id(), 0);
assert_eq!(
snapshot.base_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
);
assert_eq!(
snapshot.delta_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
);
assert_eq!(
snapshot.changelog_manifest_list(),
Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2")
);
assert_eq!(
snapshot.index_manifest(),
Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0")
);
assert_eq!(
snapshot.commit_user(),
"cf568e07-05ad-4943-b4bd-37461bc58729"
);
assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
assert_eq!(snapshot.time_millis(), 1721287833568);
assert_eq!(
snapshot.changelog_manifest_list(),
Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2")
);
assert_eq!(snapshot.total_record_count(), Some(1));
assert_eq!(snapshot.delta_record_count(), Some(1));
assert_eq!(snapshot.changelog_record_count(), Some(0));
assert_eq!(snapshot.watermark(), Some(-9223372036854775808));
assert_eq!(snapshot.statistics(), Some("statistics_v2"));
}

#[test]
fn test_snapshot_with_none_fields() {
let snapshot = Snapshot::builder()
.version(3)
.id(1)
.schema_id(0)
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
.changelog_manifest_list(None)
.index_manifest(None)
.commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
.commit_identifier(9223372036854775807)
.time_millis(1721287833568)
.total_record_count(None)
.delta_record_count(None)
.changelog_record_count(None)
.watermark(None)
.statistics(None)
.build();

assert_eq!(snapshot.version(), 3);
assert_eq!(snapshot.id(), 1);
assert_eq!(snapshot.schema_id(), 0);
assert_eq!(
snapshot.base_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
);
assert_eq!(
snapshot.delta_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
);
assert_eq!(
snapshot.commit_user(),
"cf568e07-05ad-4943-b4bd-37461bc58729"
);
assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
assert_eq!(snapshot.time_millis(), 1721287833568);
assert!(snapshot.changelog_manifest_list().is_none());
assert!(snapshot.index_manifest().is_none());
assert!(snapshot.log_offsets().is_none());
assert!(snapshot.total_record_count().is_none());
assert!(snapshot.delta_record_count().is_none());
assert!(snapshot.changelog_record_count().is_none());
assert!(snapshot.watermark().is_none());
assert!(snapshot.statistics().is_none());
}

#[test]
fn test_snapshot_serialization_deserialization() {
let data = r#"
{
"version" : 3,
"id" : 1,
"schemaId" : 0,
"baseManifestList" : "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0",
"deltaManifestList" : "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1",
"changelogManifestList" : null,
"indexManifest" : "index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0",
"commitUser" : "cf568e07-05ad-4943-b4bd-37461bc58729",
"commitIdentifier" : 9223372036854775807,
"timeMillis" : 1721287833568,
"logOffsets" : { },
"totalRecordCount" : 1,
"deltaRecordCount" : 1,
"changelogRecordCount" : 0,
"watermark" : -9223372036854775808
}
"#;

let snapshot: Snapshot =
serde_json::from_str(data).expect("Failed to deserialize Snapshot");

assert_eq!(snapshot.version(), 3);
assert_eq!(snapshot.id(), 1);
assert_eq!(snapshot.schema_id(), 0);
assert_eq!(
snapshot.base_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
);
assert_eq!(
snapshot.delta_manifest_list(),
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
);
assert_eq!(snapshot.changelog_manifest_list(), None);
assert_eq!(
snapshot.index_manifest(),
Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0")
);
assert_eq!(
snapshot.commit_user(),
"cf568e07-05ad-4943-b4bd-37461bc58729"
);
assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
assert_eq!(snapshot.time_millis(), 1721287833568);
assert!(snapshot.changelog_manifest_list().is_none());
assert_eq!(snapshot.total_record_count(), Some(1));
assert_eq!(snapshot.delta_record_count(), Some(1));
assert_eq!(snapshot.changelog_record_count(), Some(0));
assert_eq!(snapshot.watermark(), Some(-9223372036854775808));

let serialized = serde_json::to_string(&snapshot).expect("Failed to serialize Snapshot");

let deserialized: Snapshot =
serde_json::from_str(&serialized).expect("Failed to deserialize serialized Snapshot");

assert_eq!(snapshot, deserialized);
}
}

0 comments on commit bd2f887

Please sign in to comment.