From 686e2698eaba2a673a3cbf5c14d72f470c0f1777 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 2 Oct 2024 08:59:08 +0200 Subject: [PATCH] Rename some fields and functions and fix a misleading comment --- src/sync/planner.rs | 2 +- src/sync/schema/mod.rs | 4 ++-- src/sync/utils.rs | 10 +++++----- src/sync/writer.rs | 42 +++++++++++++++++++++++------------------- 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/src/sync/planner.rs b/src/sync/planner.rs index fa74457c..ee975656 100644 --- a/src/sync/planner.rs +++ b/src/sync/planner.rs @@ -948,7 +948,7 @@ mod tests { )?; let sync_item = DataSyncItem { - squashed: true, + is_squashed: true, tx_ids: Default::default(), sync_schema, data: vec![batch], diff --git a/src/sync/schema/mod.rs b/src/sync/schema/mod.rs index c98c9764..adce80b6 100644 --- a/src/sync/schema/mod.rs +++ b/src/sync/schema/mod.rs @@ -152,9 +152,9 @@ impl SyncSchema { // Check whether this and the other sync schemas are the same. // - // This is a narrower check than the equality check, since it checks for the sync column roles, + // This is a shallower check than the equality check, since it checks for the sync column roles, // names, data types and order (and doesn't inspect the underlying arrows field equality). - pub fn same(&self, other: &SyncSchema) -> bool { + pub fn is_compatible_with(&self, other: &SyncSchema) -> bool { self.columns() .iter() .zip(other.columns().iter()) diff --git a/src/sync/utils.rs b/src/sync/utils.rs index f269b36a..d5e0908f 100644 --- a/src/sync/utils.rs +++ b/src/sync/utils.rs @@ -14,7 +14,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use tracing::log::warn; // Returns the total number of bytes and rows in the slice of batches -pub(super) fn size_and_rows(batches: &[RecordBatch]) -> (usize, usize) { +pub(super) fn get_size_and_rows(batches: &[RecordBatch]) -> (usize, usize) { batches.iter().fold((0, 0), |(size, rows), batch| { ( size + batch.get_array_memory_size(), @@ -700,13 +700,13 @@ mod tests { let syncs = &[ DataSyncItem { - squashed: true, + is_squashed: true, tx_ids: vec![Uuid::new_v4()], sync_schema: sync_schema.clone(), data: vec![batch_1], }, DataSyncItem { - squashed: true, + is_squashed: true, tx_ids: vec![Uuid::new_v4()], sync_schema, data: vec![batch_2], @@ -825,13 +825,13 @@ mod tests { let syncs = &[ DataSyncItem { - squashed: true, + is_squashed: true, tx_ids: vec![Uuid::new_v4()], sync_schema: sync_schema.clone(), data: vec![batch_1], }, DataSyncItem { - squashed: true, + is_squashed: true, tx_ids: vec![Uuid::new_v4()], sync_schema, data: vec![batch_2], diff --git a/src/sync/writer.rs b/src/sync/writer.rs index 1e68222f..da506ade 100644 --- a/src/sync/writer.rs +++ b/src/sync/writer.rs @@ -20,7 +20,7 @@ use crate::context::SeafowlContext; use crate::sync::metrics::SyncWriterMetrics; use crate::sync::planner::SeafowlSyncPlanner; use crate::sync::schema::SyncSchema; -use crate::sync::utils::{size_and_rows, squash_batches}; +use crate::sync::utils::{get_size_and_rows, squash_batches}; use crate::sync::{Origin, SequenceNumber, SyncCommitInfo, SyncError, SyncResult}; // Denotes the last sequence number that was fully committed @@ -117,7 +117,7 @@ pub(super) struct DataSyncCollection { pub(super) struct DataSyncItem { // Flag denoting whether the batches have been physically squashed yet or not. If true there is // a single record batch in the `data` field, and this is strictly the case during flushing. - pub(super) squashed: bool, + pub(super) is_squashed: bool, // The (internal) id of the transaction that this change belongs to; for now it corresponds to // a random v4 UUID generated for the first message received in this transaction. pub(super) tx_ids: Vec, @@ -162,7 +162,7 @@ impl SeafowlDataSyncWriter { ) -> SyncResult<()> { let url = log_store.root_uri(); - let (sync_size, sync_rows) = size_and_rows(&batches); + let (sync_size, sync_rows) = get_size_and_rows(&batches); // Upsert a sequence entry for this origin and sequence number let tx_id = if let Some((tx_id, tx)) = self.txs.last_mut() @@ -225,18 +225,18 @@ impl SeafowlDataSyncWriter { .entry(url) .and_modify(|entry| { let prev_item = entry.syncs.last_mut().unwrap(); - let (_, prev_rows) = size_and_rows(&prev_item.data); - if prev_item.sync_schema.same(&sync_schema) + let (_, prev_rows) = get_size_and_rows(&prev_item.data); + if prev_item.sync_schema.is_compatible_with(&sync_schema) && prev_rows + sync_rows <= MAX_ROWS_PER_SYNC { // Just append to the last item if the sync schema matches and the row count // is smaller than a predefined value - prev_item.squashed = false; + prev_item.is_squashed = false; prev_item.tx_ids.push(tx_id); prev_item.data.extend(batches.clone()); } else { entry.syncs.push(DataSyncItem { - squashed: false, + is_squashed: false, tx_ids: vec![tx_id], sync_schema: sync_schema.clone(), data: batches.clone(), @@ -251,7 +251,7 @@ impl SeafowlDataSyncWriter { insertion_time: now(), log_store, syncs: vec![DataSyncItem { - squashed: false, + is_squashed: false, tx_ids: vec![tx_id], sync_schema: sync_schema.clone(), data: batches, @@ -381,7 +381,12 @@ impl SeafowlDataSyncWriter { } }; - info!("Flushing {} syncs for url {url}", entry.syncs.len()); + info!( + "Flushing {} bytes in {} rows across {} sync items for url {url}", + entry.size, + entry.rows, + entry.syncs.len() + ); let start = Instant::now(); let insertion_time = entry.insertion_time; @@ -497,14 +502,12 @@ impl SeafowlDataSyncWriter { // Perform physical squashing of change batches for a particular table fn physical_squashing(&mut self, url: &String) -> SyncResult<()> { if let Some(table_syncs) = self.syncs.get_mut(url) { - debug!( - "Total {} in-memory bytes before squashing syncs for {url}", - self.size - ); + let old_size = self.size; + let start = Instant::now(); // Squash the batches and measure the time it took and the reduction in rows/size // TODO: parallelize this - for item in table_syncs.syncs.iter_mut().filter(|i| !i.squashed) { - let (old_size, old_rows) = size_and_rows(&item.data); + for item in table_syncs.syncs.iter_mut().filter(|i| !i.is_squashed) { + let (old_size, old_rows) = get_size_and_rows(&item.data); let start = Instant::now(); let batch = squash_batches(&item.sync_schema, &item.data)?; @@ -537,13 +540,14 @@ impl SeafowlDataSyncWriter { table_syncs.rows -= old_rows; table_syncs.rows += rows; - item.squashed = true; + item.is_squashed = true; item.data = vec![batch]; } - debug!( - "Total {} in-memory bytes after squashing syncs for {url}", - self.size + let duration = start.elapsed().as_millis(); + info!( + "Squashed {} bytes in {duration} ms for {url}", + self.size - old_size, ); };