Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync batch coalescing #690

Merged
merged 4 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/config/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl Default for DataSyncConfig {
Self {
max_in_memory_bytes: 3 * 1024 * 1024 * 1024,
max_replication_lag_s: 600,
max_syncs_per_url: 50,
max_syncs_per_url: 100,
write_lock_timeout_s: 3,
flush_task_interval_s: 900,
}
Expand Down
14 changes: 9 additions & 5 deletions src/sync/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ impl SeafowlSyncPlanner {
) -> SyncResult<(SyncSchema, DataFrame)> {
let first_sync = syncs.first().unwrap();
let mut sync_schema = first_sync.sync_schema.clone();
let first_batch = first_sync.batch.clone();
let provider = MemTable::try_new(first_batch.schema(), vec![vec![first_batch]])?;
let first_sync_data = first_sync.data.clone();
let provider = MemTable::try_new(
first_sync_data.first().unwrap().schema(),
vec![first_sync_data],
)?;
let mut sync_plan = LogicalPlanBuilder::scan(
LOWER_REL,
provider_as_source(Arc::new(provider)),
Expand All @@ -148,7 +151,7 @@ impl SeafowlSyncPlanner {
&sync_schema,
sync_plan,
&sync.sync_schema,
sync.batch.clone(),
sync.data.first().unwrap().clone(),
)?;
}

Expand Down Expand Up @@ -945,9 +948,10 @@ mod tests {
)?;

let sync_item = DataSyncItem {
tx_id: Default::default(),
squashed: true,
tx_ids: Default::default(),
sync_schema,
batch,
data: vec![batch],
};

ctx.plan_query("CREATE TABLE test_table(c1 INT, c2 TEXT)")
Expand Down
15 changes: 15 additions & 0 deletions src/sync/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,21 @@ impl SyncSchema {
})
.unwrap_or_default()
}

// Check whether this and the other sync schemas are the same.
//
// This is a narrower check than the equality check, since it checks for the sync column roles,
// names, data types and order (and doesn't inspect the underlying arrows field equality).
pub fn same(&self, other: &SyncSchema) -> bool {
gruuya marked this conversation as resolved.
Show resolved Hide resolved
self.columns()
.iter()
.zip(other.columns().iter())
.all(|(this, other)| {
this.name == other.name
&& this.role == other.role
&& this.field().data_type() == other.field().data_type()
})
}
}

impl Display for SyncSchema {
Expand Down
50 changes: 35 additions & 15 deletions src/sync/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,28 @@ use datafusion_expr::{col, lit, Accumulator, Expr};
use std::collections::{HashMap, HashSet, VecDeque};
use tracing::log::warn;

// Returns the total number of bytes and rows in the slice of batches
pub(super) fn size_and_rows(batches: &[RecordBatch]) -> (usize, usize) {
gruuya marked this conversation as resolved.
Show resolved Hide resolved
batches.iter().fold((0, 0), |(size, rows), batch| {
(
size + batch.get_array_memory_size(),
rows + batch.num_rows(),
)
})
}

// Compact a set of record batches into a single one, squashing any chain of changes to a given row
// into a single row in the output batch.
// This means that if a row is changed multiple times, only the last change will be reflected in the
// output batch (meaning the last NewPk and Value role columns and the last Value column where the
// accompanying Changed field was `true`).
pub(super) fn squash_batches(
sync_schema: &SyncSchema,
data: Vec<RecordBatch>,
data: &[RecordBatch],
) -> Result<RecordBatch> {
// Concatenate all the record batches into a single one
let schema = data.first().unwrap().schema();
let batch = concat_batches(&schema, &data)?;
let batch = concat_batches(&schema, data)?;

// Get columns, sort fields and null arrays for a particular role
let columns = |role: ColumnRole| -> (Vec<ArrayRef>, (Vec<SortField>, Vec<ArrayRef>)) {
Expand Down Expand Up @@ -252,7 +262,9 @@ pub(super) fn construct_qualifier(
.try_for_each(|(pk_col, (min_value, max_value))| {
let field = sync.sync_schema.column(pk_col, role).unwrap().field();

if let Some(pk_array) = sync.batch.column_by_name(field.name()) {
if let Some(pk_array) =
sync.data.first().unwrap().column_by_name(field.name())
{
min_value.update_batch(&[pk_array.clone()])?;
max_value.update_batch(&[pk_array.clone()])?;
}
Expand Down Expand Up @@ -347,8 +359,12 @@ pub(super) fn get_prune_map(
.iter()
.filter(|col| col.role() == role)
{
let array =
sync.batch.column_by_name(pk_col.field().name()).unwrap();
let array = sync
.data
.first()
.unwrap()
.column_by_name(pk_col.field().name())
.unwrap();

// Scope out any NULL values, which only denote no-PKs when inserting/deleting.
// We re-use the same non-null map since there can't be a scenario where
Expand Down Expand Up @@ -486,7 +502,7 @@ mod tests {
],
)?;

let squashed = squash_batches(&sync_schema, vec![batch.clone()])?;
let squashed = squash_batches(&sync_schema, &[batch.clone()])?;

let expected = [
"+-----------+-----------+----------+------------+----------+",
Expand Down Expand Up @@ -635,7 +651,7 @@ mod tests {
],
)?;

let squashed = squash_batches(&sync_schema, vec![batch.clone()])?;
let squashed = squash_batches(&sync_schema, &[batch.clone()])?;
println!(
"Squashed PKs from {row_count} to {} rows",
squashed.num_rows()
Expand Down Expand Up @@ -684,14 +700,16 @@ mod tests {

let syncs = &[
DataSyncItem {
tx_id: Uuid::new_v4(),
squashed: true,
tx_ids: vec![Uuid::new_v4()],
sync_schema: sync_schema.clone(),
batch: batch_1,
data: vec![batch_1],
},
DataSyncItem {
tx_id: Uuid::new_v4(),
squashed: true,
tx_ids: vec![Uuid::new_v4()],
sync_schema,
batch: batch_2,
data: vec![batch_2],
},
];

Expand Down Expand Up @@ -807,14 +825,16 @@ mod tests {

let syncs = &[
DataSyncItem {
tx_id: Uuid::new_v4(),
squashed: true,
tx_ids: vec![Uuid::new_v4()],
sync_schema: sync_schema.clone(),
batch: batch_1,
data: vec![batch_1],
},
DataSyncItem {
tx_id: Uuid::new_v4(),
squashed: true,
tx_ids: vec![Uuid::new_v4()],
sync_schema,
batch: batch_2,
data: vec![batch_2],
},
];

Expand Down
Loading