Skip to content

Commit

Permalink
fix: limit input sst size when compact for old bucket (#910)
Browse files Browse the repository at this point in the history
## Related Issues
Closes #

## Detailed Changes

- Adjust logs to make it more readable.
- `trim_to_threshold` respect `max_input_sstable_size`.

## Test Plan 
Manually
  • Loading branch information
jiacai2050 authored May 23, 2023
1 parent 12f8656 commit d628a52
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 35 deletions.
28 changes: 26 additions & 2 deletions analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

//! Compaction.

use std::{collections::HashMap, str::FromStr, sync::Arc};
use std::{collections::HashMap, fmt, str::FromStr, sync::Arc};

use common_util::config::{ReadableSize, TimeUnit};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -316,7 +316,7 @@ pub struct ExpiredFiles {
pub files: Vec<FileHandle>,
}

#[derive(Debug, Default, Clone)]
#[derive(Default, Clone)]
pub struct CompactionTask {
pub compaction_inputs: Vec<CompactionInputFiles>,
pub expired: Vec<ExpiredFiles>,
Expand Down Expand Up @@ -352,6 +352,30 @@ impl CompactionTask {
}
}

impl fmt::Debug for CompactionTask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CompactionTask")
.field("inputs", &self.compaction_inputs)
.field(
"expired",
&self
.expired
.iter()
.map(|expired| {
format!(
"level:{}, files:{:?}",
expired.level,
expired.files.iter().map(|f| f.id())
)
})
// only print first 10 files
.take(10)
.collect::<Vec<_>>(),
)
.finish()
}
}

pub struct PickerManager {
default_picker: CompactionPickerRef,
time_window_picker: CompactionPickerRef,
Expand Down
66 changes: 35 additions & 31 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,24 @@ fn find_uncompact_files(
.collect()
}

// Trim the largest sstables off the end to meet the `max_threshold` and
// `max_input_sstable_size`
fn trim_to_threshold(
input_files: Vec<FileHandle>,
max_threshold: usize,
max_input_sstable_size: u64,
) -> Vec<FileHandle> {
let mut input_size = 0;
input_files
.into_iter()
.take(max_threshold)
.take_while(|f| {
input_size += f.size();
input_size <= max_input_sstable_size
})
.collect()
}

/// Size tiered compaction strategy
/// See https://github.com/jeffjirsa/twcs/blob/master/src/main/java/com/jeffjirsa/cassandra/db/compaction/SizeTieredCompactionStrategy.java
#[derive(Default)]
Expand Down Expand Up @@ -394,18 +412,7 @@ impl SizeTieredPicker {
.reverse()
});

// and then trim the coldest sstables off the end to meet the max_threshold
let len = sorted_files.len();
let mut input_size = 0;
let pruned_bucket: Vec<FileHandle> = sorted_files
.into_iter()
.take(std::cmp::min(max_threshold, len))
.take_while(|f| {
input_size += f.size();
input_size <= max_input_sstable_size
})
.collect();

let pruned_bucket = trim_to_threshold(sorted_files, max_threshold, max_input_sstable_size);
// bucket hotness is the sum of the hotness of all sstable members
let bucket_hotness = pruned_bucket.iter().map(Bucket::hotness).sum();

Expand Down Expand Up @@ -484,10 +491,12 @@ impl TimeWindowPicker {

let all_keys: BTreeSet<_> = buckets.keys().collect();

// First compact latest buckets
for key in all_keys.into_iter().rev() {
if let Some(bucket) = buckets.get(key) {
debug!("Key {}, now {}", key, now);

let max_input_sstable_size = size_tiered_opts.max_input_sstable_size.as_byte();
if bucket.len() >= size_tiered_opts.min_threshold && *key >= now {
// If we're in the newest bucket, we'll use STCS to prioritize sstables
let buckets = SizeTieredPicker::get_buckets(
Expand All @@ -500,17 +509,22 @@ impl TimeWindowPicker {
buckets,
size_tiered_opts.min_threshold,
size_tiered_opts.max_threshold,
size_tiered_opts.max_input_sstable_size.as_byte(),
max_input_sstable_size,
);

if files.is_some() {
return files;
}
} else if bucket.len() >= 2 && *key < now {
debug!("Bucket size {} >= 2 and not in current bucket, compacting what's here: {:?}", bucket.len(), bucket);
return Some(Self::trim_to_threshold(
bucket,
// Sort by sstable file size
let mut sorted_files = bucket.to_vec();
sorted_files.sort_unstable_by_key(FileHandle::size);

return Some(trim_to_threshold(
sorted_files,
size_tiered_opts.max_threshold,
max_input_sstable_size,
));
} else {
debug!(
Expand All @@ -526,19 +540,6 @@ impl TimeWindowPicker {
None
}

fn trim_to_threshold(files: &[FileHandle], max_threshold: usize) -> Vec<FileHandle> {
// Sort by sstable file size
let mut sorted_files = files.to_vec();
sorted_files.sort_unstable_by_key(FileHandle::size);

// Trim the largest sstables off the end to meet the maxThreshold
let len = sorted_files.len();
sorted_files
.into_iter()
.take(std::cmp::min(max_threshold, len))
.collect()
}

/// Get current window timestamp, the caller MUST ensure the level has ssts,
/// panic otherwise.
fn get_current_window(
Expand Down Expand Up @@ -577,7 +578,7 @@ impl LevelPicker for TimeWindowPicker {

debug!("TWCS compaction options: {:?}", opts);

let (buckets, ts) = Self::get_buckets(
let (buckets, max_bucket_ts) = Self::get_buckets(
&uncompact_files,
&ctx.segment_duration,
opts.timestamp_resolution,
Expand All @@ -589,8 +590,11 @@ impl LevelPicker for TimeWindowPicker {
&ctx.segment_duration,
opts.timestamp_resolution,
);
debug!("now {}, max_ts: {}", now, ts);
assert!(now >= ts);
debug!(
"TWCS current window is {}, max_bucket_ts: {}",
now, max_bucket_ts
);
assert!(now >= max_bucket_ts);

Self::newest_bucket(buckets, opts.size_tiered, now)
}
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ impl SpaceStore {
}

info!(
"try do compaction for table:{}#{}, estimated input files size:{}, input files number:{}",
"Try do compaction for table:{}#{}, estimated input files size:{}, input files number:{}",
table_data.name,
table_data.id,
task.estimated_total_input_file_size(),
Expand Down
1 change: 0 additions & 1 deletion analytic_engine/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ impl fmt::Debug for FileHandle {
f.debug_struct("FileHandle")
.field("meta", &self.inner.meta)
.field("being_compacted", &self.being_compacted())
.field("metrics", &self.inner.metrics)
.finish()
}
}
Expand Down

0 comments on commit d628a52

Please sign in to comment.