Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add max_input_sstable_size #483

Merged
merged 5 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct SizeTieredCompactionOptions {
pub min_sstable_size: ReadableSize,
pub min_threshold: usize,
pub max_threshold: usize,
pub max_input_sstable_size: ReadableSize,
}

#[derive(Debug, Clone, Copy, Deserialize, PartialEq)]
Expand All @@ -88,6 +89,7 @@ impl Default for SizeTieredCompactionOptions {
min_sstable_size: ReadableSize::mb(50),
min_threshold: 4,
max_threshold: 16,
max_input_sstable_size: ReadableSize::mb(1200),
}
}
}
Expand All @@ -113,6 +115,7 @@ const MIN_THRESHOLD_KEY: &str = "compaction_min_threshold";
const MAX_THRESHOLD_KEY: &str = "compaction_max_threshold";
const MIN_SSTABLE_SIZE_KEY: &str = "compaction_min_sstable_size";
const TIMESTAMP_RESOLUTION_KEY: &str = "compaction_timestamp_resolution";
const MAX_INPUT_SSTABLE_SIZE_KEY: &str = "compaction_max_input_sstable_size";
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
const DEFAULT_STRATEGY: &str = "default";
const STC_STRATEGY: &str = "size_tiered";
const TWC_STRATEGY: &str = "time_window";
Expand Down Expand Up @@ -187,6 +190,10 @@ impl SizeTieredCompactionOptions {
MIN_THRESHOLD_KEY.to_string(),
format!("{}", self.min_threshold),
);
m.insert(
MAX_INPUT_SSTABLE_SIZE_KEY.to_string(),
format!("{}", self.max_input_sstable_size.0),
);
}

pub(crate) fn parse_from(
Expand Down Expand Up @@ -226,6 +233,16 @@ impl SizeTieredCompactionOptions {
})?;
}

if let Some(v) = options.get(MAX_INPUT_SSTABLE_SIZE_KEY) {
opts.max_input_sstable_size =
v.parse::<ReadableSize>().map_err(|err| Error::ParseSize {
key: MIN_SSTABLE_SIZE_KEY.to_string(),
value: v.to_string(),
error: err,
backtrace: Backtrace::generate(),
})?;
}

opts.validate()?;

Ok(opts)
Expand Down Expand Up @@ -460,13 +477,14 @@ mod tests {
let c = CompactionStrategy::SizeTiered(opts);
let mut m = HashMap::new();
c.fill_raw_map(&mut m);
assert_eq!(6, m.len());
assert_eq!(7, m.len());
assert_eq!(m[COMPACTION_STRATEGY], "size_tiered");
assert_eq!(m[BUCKET_LOW_KEY], "0.1");
assert_eq!(m[BUCKET_HIGH_KEY], "1.5");
assert_eq!(m[MIN_SSTABLE_SIZE_KEY], "1024");
assert_eq!(m[MIN_THRESHOLD_KEY], "4");
assert_eq!(m[MAX_THRESHOLD_KEY], "10");
assert_eq!(m[MAX_INPUT_SSTABLE_SIZE_KEY], "1258291200");
assert_eq!(
c,
CompactionStrategy::parse_from("size_tiered", &m).unwrap()
Expand All @@ -480,14 +498,16 @@ mod tests {
let mut m = HashMap::new();
c.fill_raw_map(&mut m);

assert_eq!(7, m.len());
assert_eq!(8, m.len());
assert_eq!(m[COMPACTION_STRATEGY], "time_window");
assert_eq!(m[BUCKET_LOW_KEY], "0.1");
assert_eq!(m[BUCKET_HIGH_KEY], "1.5");
assert_eq!(m[MIN_SSTABLE_SIZE_KEY], "1024");
assert_eq!(m[MIN_THRESHOLD_KEY], "4");
assert_eq!(m[MAX_THRESHOLD_KEY], "10");
assert_eq!(m[TIMESTAMP_RESOLUTION_KEY], "milliseconds");
assert_eq!(m[MAX_INPUT_SSTABLE_SIZE_KEY], "1258291200");

assert_eq!(
c,
CompactionStrategy::parse_from("time_window", &m).unwrap()
Expand Down
94 changes: 83 additions & 11 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ fn find_uncompact_files(
pub struct SizeTieredPicker {}

/// Similar size files group
#[derive(Debug)]
#[derive(Debug, Clone)]
struct Bucket {
pub avg_size: usize,
pub files: Vec<FileHandle>,
Expand Down Expand Up @@ -206,10 +206,8 @@ impl Bucket {

#[inline]
fn hotness(f: &FileHandle) -> f64 {
let row_num = match f.row_num() {
0 => 1, //prevent NAN hotness
v => v,
};
//prevent NAN hotness
let row_num = f.row_num().max(1);
f.read_meter().h2_rate() / (row_num as f64)
}
}
Expand Down Expand Up @@ -242,8 +240,12 @@ impl LevelPicker for SizeTieredPicker {
opts.min_sstable_size.as_bytes() as f32,
);

let files =
Self::most_interesting_bucket(buckets, opts.min_threshold, opts.max_threshold);
let files = Self::most_interesting_bucket(
buckets,
opts.min_threshold,
opts.max_threshold,
opts.max_input_sstable_size.as_bytes(),
);

if files.is_some() {
info!(
Expand Down Expand Up @@ -311,6 +313,7 @@ impl SizeTieredPicker {
buckets: Vec<Bucket>,
min_threshold: usize,
max_threshold: usize,
max_input_sstable_size: u64,
) -> Option<Vec<FileHandle>> {
debug!(
"Find most_interesting_bucket buckets:{:?}, min:{}, max:{}",
Expand All @@ -321,7 +324,8 @@ impl SizeTieredPicker {
// skip buckets containing less than min_threshold sstables,
// and limit other buckets to max_threshold sstables
for bucket in buckets {
let (bucket, hotness) = Self::trim_to_threshold_with_hotness(bucket, max_threshold);
let (bucket, hotness) =
Self::trim_to_threshold_with_hotness(bucket, max_threshold, max_input_sstable_size);
if bucket.files.len() >= min_threshold {
pruned_bucket_and_hotness.push((bucket, hotness));
}
Expand Down Expand Up @@ -375,7 +379,11 @@ impl SizeTieredPicker {
files_by_segment
}

fn trim_to_threshold_with_hotness(bucket: Bucket, max_threshold: usize) -> (Bucket, f64) {
fn trim_to_threshold_with_hotness(
bucket: Bucket,
max_threshold: usize,
max_input_sstable_size: u64,
) -> (Bucket, f64) {
let hotness_snapshot = bucket.get_hotness_map();

// Sort by sstable hotness (descending).
Expand All @@ -389,9 +397,14 @@ impl SizeTieredPicker {

// and then trim the coldest sstables off the end to meet the max_threshold
let len = sorted_files.len();
let mut input_size = 0;
let pruned_bucket: Vec<FileHandle> = sorted_files
.into_iter()
.take(std::cmp::min(max_threshold, len))
.take_while(|f| {
input_size += f.size();
input_size <= max_input_sstable_size
})
.collect();

// bucket hotness is the sum of the hotness of all sstable members
Expand Down Expand Up @@ -488,6 +501,7 @@ impl TimeWindowPicker {
buckets,
size_tiered_opts.min_threshold,
size_tiered_opts.max_threshold,
size_tiered_opts.max_input_sstable_size.as_bytes(),
);

if files.is_some() {
Expand Down Expand Up @@ -592,11 +606,13 @@ mod tests {
tests::build_schema,
time::{TimeRange, Timestamp},
};
use tokio::sync::mpsc;

use super::*;
use crate::{
compaction::{picker::PickerContext, CompactionStrategy, PickerManager},
compaction::PickerManager,
sst::{
file::SstMetaData,
file::{FileMeta, FilePurgeQueue, SstMetaData},
manager::{tests::LevelsControllerMockBuilder, LevelsController},
},
};
Expand Down Expand Up @@ -744,4 +760,60 @@ mod tests {
assert!(task.expired[0].files.is_empty());
}
}

fn build_file_handles(sizes: Vec<u64>) -> Vec<FileHandle> {
let (tx, _rx) = mpsc::unbounded_channel();

sizes
.into_iter()
.map(|size| {
let file_meta = FileMeta {
id: 1,
meta: build_sst_meta_data(TimeRange::empty(), size),
};
let queue = FilePurgeQueue::new(1, 1.into(), tx.clone());
FileHandle::new(file_meta, queue)
})
.collect()
}

#[test]
fn test_size_tiered_picker() {
let bucket = Bucket::with_files(build_file_handles(vec![100, 110, 200]));

let (out_bucket, _) =
SizeTieredPicker::trim_to_threshold_with_hotness(bucket.clone(), 10, 300);
// limited by max input size
assert_eq!(
vec![100, 110],
out_bucket
.files
.iter()
.map(|f| f.size())
.collect::<Vec<_>>()
);

// no limit
let (out_bucket, _) =
SizeTieredPicker::trim_to_threshold_with_hotness(bucket.clone(), 10, 3000);
assert_eq!(
vec![100, 110, 200],
out_bucket
.files
.iter()
.map(|f| f.size())
.collect::<Vec<_>>()
);

// limited by max_threshold
let (out_bucket, _) = SizeTieredPicker::trim_to_threshold_with_hotness(bucket, 2, 3000);
assert_eq!(
vec![100, 110],
out_bucket
.files
.iter()
.map(|f| f.size())
.collect::<Vec<_>>()
);
}
}
3 changes: 3 additions & 0 deletions analytic_engine/src/table_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ impl From<SizeTieredCompactionOptions> for common_pb::CompactionOptions {
max_threshold: opts.max_threshold as u32,
// FIXME: Is it ok to use the default timestamp resolution here?
timestamp_resolution: common_pb::TimeUnit::Nanoseconds as i32,
max_input_sstable_size: opts.max_input_sstable_size.as_bytes() as u32,
}
}
}
Expand All @@ -471,6 +472,7 @@ impl From<common_pb::CompactionOptions> for SizeTieredCompactionOptions {
min_sstable_size: ReadableSize(opts.min_sstable_size as u64),
min_threshold: opts.min_threshold as usize,
max_threshold: opts.max_threshold as usize,
max_input_sstable_size: ReadableSize(opts.max_input_sstable_size as u64),
}
}
}
Expand All @@ -484,6 +486,7 @@ impl From<TimeWindowCompactionOptions> for common_pb::CompactionOptions {
min_threshold: v.size_tiered.min_threshold as u32,
max_threshold: v.size_tiered.max_threshold as u32,
timestamp_resolution: common_pb::TimeUnit::from(v.timestamp_resolution) as i32,
max_input_sstable_size: v.size_tiered.max_input_sstable_size.as_bytes() as u32,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions proto/protos/analytic_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ message CompactionOptions {
uint32 max_threshold = 5;
// Options for TWCS
TimeUnit timestamp_resolution = 6;
uint32 max_input_sstable_size = 7;
}

enum TimeUnit {
Expand Down