Skip to content

Commit

Permalink
Rename some fields and functions and fix a misleading comment
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Oct 2, 2024
1 parent e141a33 commit 22400e5
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/sync/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ mod tests {
)?;

let sync_item = DataSyncItem {
squashed: true,
is_squashed: true,
tx_ids: Default::default(),
sync_schema,
data: vec![batch],
Expand Down
4 changes: 2 additions & 2 deletions src/sync/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ impl SyncSchema {

// Check whether this and the other sync schemas are the same.
//
// This is a narrower check than the equality check, since it checks for the sync column roles,
// This is a shallower check than the equality check, since it checks for the sync column roles,
// names, data types and order (and doesn't inspect the underlying arrows field equality).
pub fn same(&self, other: &SyncSchema) -> bool {
pub fn is_compatible_with(&self, other: &SyncSchema) -> bool {
self.columns()
.iter()
.zip(other.columns().iter())
Expand Down
10 changes: 5 additions & 5 deletions src/sync/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
use tracing::log::warn;

// Returns the total number of bytes and rows in the slice of batches
pub(super) fn size_and_rows(batches: &[RecordBatch]) -> (usize, usize) {
pub(super) fn get_size_and_rows(batches: &[RecordBatch]) -> (usize, usize) {
batches.iter().fold((0, 0), |(size, rows), batch| {
(
size + batch.get_array_memory_size(),
Expand Down Expand Up @@ -700,13 +700,13 @@ mod tests {

let syncs = &[
DataSyncItem {
squashed: true,
is_squashed: true,
tx_ids: vec![Uuid::new_v4()],
sync_schema: sync_schema.clone(),
data: vec![batch_1],
},
DataSyncItem {
squashed: true,
is_squashed: true,
tx_ids: vec![Uuid::new_v4()],
sync_schema,
data: vec![batch_2],
Expand Down Expand Up @@ -825,13 +825,13 @@ mod tests {

let syncs = &[
DataSyncItem {
squashed: true,
is_squashed: true,
tx_ids: vec![Uuid::new_v4()],
sync_schema: sync_schema.clone(),
data: vec![batch_1],
},
DataSyncItem {
squashed: true,
is_squashed: true,
tx_ids: vec![Uuid::new_v4()],
sync_schema,
data: vec![batch_2],
Expand Down
42 changes: 23 additions & 19 deletions src/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::context::SeafowlContext;
use crate::sync::metrics::SyncWriterMetrics;
use crate::sync::planner::SeafowlSyncPlanner;
use crate::sync::schema::SyncSchema;
use crate::sync::utils::{size_and_rows, squash_batches};
use crate::sync::utils::{get_size_and_rows, squash_batches};
use crate::sync::{Origin, SequenceNumber, SyncCommitInfo, SyncError, SyncResult};

// Denotes the last sequence number that was fully committed
Expand Down Expand Up @@ -117,7 +117,7 @@ pub(super) struct DataSyncCollection {
pub(super) struct DataSyncItem {
// Flag denoting whether the batches have been physically squashed yet or not. If true there is
// a single record batch in the `data` field, and this is strictly the case during flushing.
pub(super) squashed: bool,
pub(super) is_squashed: bool,
// The (internal) id of the transaction that this change belongs to; for now it corresponds to
// a random v4 UUID generated for the first message received in this transaction.
pub(super) tx_ids: Vec<Uuid>,
Expand Down Expand Up @@ -162,7 +162,7 @@ impl SeafowlDataSyncWriter {
) -> SyncResult<()> {
let url = log_store.root_uri();

let (sync_size, sync_rows) = size_and_rows(&batches);
let (sync_size, sync_rows) = get_size_and_rows(&batches);

// Upsert a sequence entry for this origin and sequence number
let tx_id = if let Some((tx_id, tx)) = self.txs.last_mut()
Expand Down Expand Up @@ -225,18 +225,18 @@ impl SeafowlDataSyncWriter {
.entry(url)
.and_modify(|entry| {
let prev_item = entry.syncs.last_mut().unwrap();
let (_, prev_rows) = size_and_rows(&prev_item.data);
if prev_item.sync_schema.same(&sync_schema)
let (_, prev_rows) = get_size_and_rows(&prev_item.data);
if prev_item.sync_schema.is_compatible_with(&sync_schema)
&& prev_rows + sync_rows <= MAX_ROWS_PER_SYNC
{
// Just append to the last item if the sync schema matches and the row count
// is smaller than a predefined value
prev_item.squashed = false;
prev_item.is_squashed = false;
prev_item.tx_ids.push(tx_id);
prev_item.data.extend(batches.clone());
} else {
entry.syncs.push(DataSyncItem {
squashed: false,
is_squashed: false,
tx_ids: vec![tx_id],
sync_schema: sync_schema.clone(),
data: batches.clone(),
Expand All @@ -251,7 +251,7 @@ impl SeafowlDataSyncWriter {
insertion_time: now(),
log_store,
syncs: vec![DataSyncItem {
squashed: false,
is_squashed: false,
tx_ids: vec![tx_id],
sync_schema: sync_schema.clone(),
data: batches,
Expand Down Expand Up @@ -381,7 +381,12 @@ impl SeafowlDataSyncWriter {
}
};

info!("Flushing {} syncs for url {url}", entry.syncs.len());
info!(
"Flushing {} bytes in {} rows across {} sync items for url {url}",
entry.size,
entry.rows,
entry.syncs.len()
);

let start = Instant::now();
let insertion_time = entry.insertion_time;
Expand Down Expand Up @@ -497,14 +502,12 @@ impl SeafowlDataSyncWriter {
// Perform physical squashing of change batches for a particular table
fn physical_squashing(&mut self, url: &String) -> SyncResult<()> {
if let Some(table_syncs) = self.syncs.get_mut(url) {
debug!(
"Total {} in-memory bytes before squashing syncs for {url}",
self.size
);
let old_size = self.size;
let start = Instant::now();
// Squash the batches and measure the time it took and the reduction in rows/size
// TODO: parallelize this
for item in table_syncs.syncs.iter_mut().filter(|i| !i.squashed) {
let (old_size, old_rows) = size_and_rows(&item.data);
for item in table_syncs.syncs.iter_mut().filter(|i| !i.is_squashed) {
let (old_size, old_rows) = get_size_and_rows(&item.data);

let start = Instant::now();
let batch = squash_batches(&item.sync_schema, &item.data)?;
Expand Down Expand Up @@ -537,13 +540,14 @@ impl SeafowlDataSyncWriter {
table_syncs.rows -= old_rows;
table_syncs.rows += rows;

item.squashed = true;
item.is_squashed = true;
item.data = vec![batch];
}

debug!(
"Total {} in-memory bytes after squashing syncs for {url}",
self.size
let duration = start.elapsed().as_millis();
info!(
"Squashed {} bytes in {duration} ms for {url}",
old_size - self.size,
);
};

Expand Down

0 comments on commit 22400e5

Please sign in to comment.