Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use real time range to filter memtable #1233

Merged
merged 4 commits into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ impl FlushTask {
let sst_meta = MetaData {
min_key,
max_key,
time_range: memtable_state.time_range,
time_range: memtable_state.aligned_time_range,
max_sequence,
schema: self.table_data.schema(),
};
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl Instance {

for memtable in read_view.memtables {
let aligned_ts = memtable
.time_range
.aligned_time_range
.inclusive_start()
.truncate_by(segment_duration);
let entry = read_view_by_time
Expand Down
8 changes: 7 additions & 1 deletion analytic_engine/src/memtable/columnar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use std::{
use arena::MonoIncArena;
use bytes_ext::Bytes;
use common_types::{
column::Column, column_schema::ColumnId, datum::Datum, row::Row, schema::Schema, SequenceNumber,
column::Column, column_schema::ColumnId, datum::Datum, row::Row, schema::Schema,
time::TimeRange, SequenceNumber,
};
use generic_error::BoxError;
use log::debug;
Expand Down Expand Up @@ -230,6 +231,11 @@ impl MemTable for ColumnarMemTable {
self.last_sequence.load(Ordering::Relaxed)
}

// TODO: implement this.
fn time_range(&self) -> Option<TimeRange> {
None
}

fn metrics(&self) -> MemtableMetrics {
let row_raw_size = self.metrics.row_raw_size.load(Ordering::Relaxed);
let row_count = self.metrics.row_count.load(Ordering::Relaxed);
Expand Down
7 changes: 7 additions & 0 deletions analytic_engine/src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_types::{
record_batch::RecordBatchWithKey,
row::Row,
schema::{IndexInWriterSchema, Schema},
time::TimeRange,
SequenceNumber,
};
use generic_error::GenericError;
Expand Down Expand Up @@ -134,6 +135,9 @@ pub enum Error {

#[snafu(display("msg:{msg}"))]
InternalNoCause { msg: String },

#[snafu(display("Timestamp is not found in row.\nBacktrace:\n{backtrace}"))]
TimestampNotFound { backtrace: Backtrace },
}

define_result!(Error);
Expand Down Expand Up @@ -255,6 +259,9 @@ pub trait MemTable {
/// If the memtable is empty, then the last sequence is 0.
fn last_sequence(&self) -> SequenceNumber;

/// Time range of written rows.
fn time_range(&self) -> Option<TimeRange>;

/// Metrics of inner state.
fn metrics(&self) -> Metrics;
}
Expand Down
9 changes: 4 additions & 5 deletions analytic_engine/src/memtable/skiplist/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ impl Factory for SkiplistMemTableFactory {
fn create_memtable(&self, opts: Options) -> Result<MemTableRef> {
let arena = MonoIncArena::with_collector(opts.arena_block_size as usize, opts.collector);
let skiplist = Skiplist::with_arena(BytewiseComparator, arena);
let memtable = Arc::new(SkiplistMemTable {
schema: opts.schema,
let memtable = Arc::new(SkiplistMemTable::new(
opts.schema,
skiplist,
last_sequence: AtomicU64::new(opts.creation_sequence),
metrics: Default::default(),
});
AtomicU64::new(opts.creation_sequence),
));

Ok(memtable)
}
Expand Down
57 changes: 53 additions & 4 deletions analytic_engine/src/memtable/skiplist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,28 @@
pub mod factory;
pub mod iter;

use std::sync::atomic::{self, AtomicU64, AtomicUsize};
use std::sync::atomic::{self, AtomicI64, AtomicU64, AtomicUsize};

use arena::{Arena, BasicStats};
use bytes_ext::Bytes;
use codec::Encoder;
use common_types::{
row::{contiguous::ContiguousRowWriter, Row},
schema::Schema,
time::TimeRange,
SequenceNumber,
};
use generic_error::BoxError;
use log::{debug, trace};
use skiplist::{BytewiseComparator, Skiplist};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};

use crate::memtable::{
key::{ComparableInternalKey, KeySequence},
reversed_iter::ReversedColumnarIterator,
skiplist::iter::ColumnarIterImpl,
ColumnarIterPtr, EncodeInternalKey, InvalidPutSequence, InvalidRow, MemTable,
Metrics as MemtableMetrics, PutContext, Result, ScanContext, ScanRequest,
Metrics as MemtableMetrics, PutContext, Result, ScanContext, ScanRequest, TimestampNotFound,
};

#[derive(Default, Debug)]
Expand All @@ -48,7 +49,7 @@ struct Metrics {
}

/// MemTable implementation based on skiplist
pub struct SkiplistMemTable<A: Arena<Stats = BasicStats> + Clone + Sync + Send> {
pub struct SkiplistMemTable<A: Arena<Stats = BasicStats> + Clone> {
/// Schema of this memtable, is immutable.
schema: Schema,
skiplist: Skiplist<BytewiseComparator, A>,
Expand All @@ -57,6 +58,27 @@ pub struct SkiplistMemTable<A: Arena<Stats = BasicStats> + Clone + Sync + Send>
last_sequence: AtomicU64,

metrics: Metrics,
min_time: AtomicI64,
max_time: AtomicI64,
}

impl<A: Arena<Stats = BasicStats> + Clone> SkiplistMemTable<A> {
fn new(
schema: Schema,
skiplist: Skiplist<BytewiseComparator, A>,
last_sequence: AtomicU64,
) -> Self {
Self {
schema,
skiplist,
last_sequence,
metrics: Metrics::default(),
// Init to max value first, so we can use `min(min_time, row.time)` to get real min
// time.
min_time: AtomicI64::new(i64::MAX),
max_time: AtomicI64::new(i64::MIN),
}
}
}

impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send + 'static> MemTable
Expand Down Expand Up @@ -117,6 +139,27 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send + 'static> MemTable
let encoded_size = internal_key.len() + row_value.len();
self.skiplist.put(internal_key, row_value);

// Update min/max time
let timestamp = row.timestamp(schema).context(TimestampNotFound)?.as_i64();
_ = self
.min_time
.fetch_update(atomic::Ordering::Relaxed, atomic::Ordering::Relaxed, |v| {
if timestamp < v {
Some(timestamp)
} else {
None
}
});
_ = self
.max_time
.fetch_update(atomic::Ordering::Relaxed, atomic::Ordering::Relaxed, |v| {
if timestamp > v {
Some(timestamp)
} else {
None
}
});

// Update metrics
self.metrics
.row_raw_size
Expand Down Expand Up @@ -180,6 +223,12 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send + 'static> MemTable
self.last_sequence.load(atomic::Ordering::Relaxed)
}

fn time_range(&self) -> Option<TimeRange> {
let min_time = self.min_time.load(atomic::Ordering::Relaxed);
let max_time = self.max_time.load(atomic::Ordering::Relaxed);
TimeRange::new(min_time.into(), (max_time + 1).into())
}

fn metrics(&self) -> MemtableMetrics {
let row_raw_size = self.metrics.row_raw_size.load(atomic::Ordering::Relaxed);
let row_encoded_size = self
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ impl TableData {
)?;
let mem_state = MemTableState {
mem,
time_range,
aligned_time_range: time_range,
id: self.alloc_memtable_id(),
};

Expand Down Expand Up @@ -931,7 +931,7 @@ pub mod tests {
assert_eq!(2, mem_state.id);
let time_range =
TimeRange::bucket_of(now_ts, table_options::DEFAULT_SEGMENT_DURATION).unwrap();
assert_eq!(time_range, mem_state.time_range);
assert_eq!(time_range, mem_state.aligned_time_range);
}

#[test]
Expand Down
47 changes: 27 additions & 20 deletions analytic_engine/src/table/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub struct MemTableState {
pub mem: MemTableRef,
/// The `time_range` is estimated via the time range of the first row group
/// write to this memtable and is aligned to segment size
pub time_range: TimeRange,
pub aligned_time_range: TimeRange,
/// Id of the memtable, newer memtable has greater id
pub id: MemTableId,
}
Expand All @@ -135,12 +135,17 @@ impl MemTableState {
pub fn last_sequence(&self) -> SequenceNumber {
self.mem.last_sequence()
}

pub fn real_time_range(&self) -> TimeRange {
self.mem.time_range().unwrap_or(self.aligned_time_range)
}
}

impl fmt::Debug for MemTableState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MemTableState")
.field("time_range", &self.time_range)
.field("aligned_time_range", &self.aligned_time_range)
.field("real_time_range", &self.real_time_range())
.field("id", &self.id)
.field("mem", &self.mem.approximate_memory_usage())
.field("metrics", &self.mem.metrics())
Expand All @@ -166,7 +171,7 @@ impl MemTableForWrite {
pub fn accept_timestamp(&self, timestamp: Timestamp) -> bool {
match self {
MemTableForWrite::Sampling(_) => true,
MemTableForWrite::Normal(v) => v.time_range.contains(timestamp),
MemTableForWrite::Normal(v) => v.aligned_time_range.contains(timestamp),
}
}

Expand Down Expand Up @@ -407,7 +412,7 @@ impl MutableMemTableSet {
.range((Bound::Excluded(timestamp), Bound::Unbounded))
.next()
{
if memtable.time_range.contains(timestamp) {
if memtable.aligned_time_range.contains(timestamp) {
return Some(memtable);
}
}
Expand All @@ -419,7 +424,7 @@ impl MutableMemTableSet {
/// present.
fn insert(&mut self, memtable: MemTableState) -> Option<MemTableState> {
// Use end time of time range as key
let end = memtable.time_range.exclusive_end();
let end = memtable.aligned_time_range.exclusive_end();
self.0.insert(end, memtable)
}

Expand Down Expand Up @@ -453,10 +458,11 @@ impl MutableMemTableSet {
let iter = self
.0
.range((Bound::Excluded(inclusive_start), Bound::Unbounded));
for (_end_ts, mem) in iter {
for (_end_ts, mem_state) in iter {
// We need to iterate all candidate memtables as their start time is unspecific
if mem.time_range.intersect_with(time_range) {
mems.push(mem.clone());
let memtable_time_range = mem_state.real_time_range();
if memtable_time_range.intersect_with(time_range) {
mems.push(mem_state.clone());
}
}
}
Expand All @@ -482,9 +488,10 @@ impl ImmutableMemTableSet {
}

fn memtables_for_read(&self, time_range: TimeRange, mems: &mut MemTableVec) {
for mem in self.0.values() {
if mem.time_range.intersect_with(time_range) {
mems.push(mem.clone());
for mem_state in self.0.values() {
let memtable_time_range = mem_state.real_time_range();
if memtable_time_range.intersect_with(time_range) {
mems.push(mem_state.clone());
}
}
}
Expand Down Expand Up @@ -1054,11 +1061,11 @@ mod tests {
let flushable_mems = version.pick_memtables_to_flush(last_sequence);
assert!(flushable_mems.sampling_mem.unwrap().freezed);

let time_range =
let aligned_time_range =
TimeRange::bucket_of(now, table_options::DEFAULT_SEGMENT_DURATION).unwrap();

// Sampling memtable still readable after freezed.
let read_view = version.pick_read_view(time_range);
let read_view = version.pick_read_view(aligned_time_range);
assert!(read_view.contains_sampling());
assert_eq!(memtable_id1, read_view.sampling_mem.as_ref().unwrap().id);
assert!(read_view.sampling_mem.unwrap().freezed);
Expand All @@ -1067,7 +1074,7 @@ mod tests {
let memtable_id2 = 2;
let mem_state = MemTableState {
mem: memtable,
time_range,
aligned_time_range,
id: memtable_id2,
};
// Insert a mutable memtable.
Expand All @@ -1079,11 +1086,11 @@ mod tests {
.unwrap()
.unwrap();
let mutable = mutable.as_normal();
assert_eq!(time_range, mutable.time_range);
assert_eq!(aligned_time_range, mutable.aligned_time_range);
assert_eq!(memtable_id2, mutable.id);

// Need to read sampling memtable and mutable memtable.
let read_view = version.pick_read_view(time_range);
let read_view = version.pick_read_view(aligned_time_range);
assert_eq!(memtable_id1, read_view.sampling_mem.as_ref().unwrap().id);
assert_eq!(1, read_view.memtables.len());
assert_eq!(memtable_id2, read_view.memtables[0].id);
Expand Down Expand Up @@ -1120,15 +1127,15 @@ mod tests {
version.freeze_sampling_memtable();

let now = Timestamp::now();
let time_range =
let aligned_time_range =
TimeRange::bucket_of(now, table_options::DEFAULT_SEGMENT_DURATION).unwrap();

// Prepare mutable memtable.
let memtable = MemTableMocker.build();
let memtable_id2 = 2;
let mem_state = MemTableState {
mem: memtable,
time_range,
aligned_time_range,
id: memtable_id2,
};
// Insert a mutable memtable.
Expand All @@ -1140,7 +1147,7 @@ mod tests {
let max_sequence = 120;
let file_id = 13;
let add_file = AddFileMocker::new(file_id)
.time_range(time_range)
.time_range(aligned_time_range)
.max_seq(max_sequence)
.build();
let edit = VersionEdit {
Expand All @@ -1153,7 +1160,7 @@ mod tests {
version.apply_edit(edit);

// Only pick ssts after flushed.
let read_view = version.pick_read_view(time_range);
let read_view = version.pick_read_view(aligned_time_range);
assert!(!read_view.contains_sampling());
assert!(read_view.sampling_mem.is_none());
assert!(read_view.memtables.is_empty());
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/merge_memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl MergeMemTableBench {

memtables.push(MemTableState {
mem: memtable,
time_range: TimeRange::min_to_max(),
aligned_time_range: TimeRange::min_to_max(),
id: *id,
});
}
Expand Down
Loading
Loading