diff --git a/kvstore/src/io_utils.rs b/kvstore/src/io_utils.rs index cd9f1175917cd7..0c59cdd18f8abc 100644 --- a/kvstore/src/io_utils.rs +++ b/kvstore/src/io_utils.rs @@ -43,6 +43,11 @@ pub struct CRCReader { chunk_size: usize, } +/// Helper trait to make zeroing buffers easier +pub trait Fill { + fn fill(&mut self, v: T); +} + impl SharedWriter { pub fn new(buf: Arc>>) -> SharedWriter { SharedWriter { buf, pos: 0 } @@ -146,6 +151,17 @@ impl CRCReader { } } +impl Fill for [T] +where + T: Clone, +{ + fn fill(&mut self, v: T) { + for i in self { + *i = v.clone() + } + } +} + impl Deref for MemMap { type Target = [u8]; diff --git a/kvstore/src/sstable.rs b/kvstore/src/sstable.rs index 3d9e4c700a2fcb..e319bd5eb9a90d 100644 --- a/kvstore/src/sstable.rs +++ b/kvstore/src/sstable.rs @@ -1,31 +1,21 @@ use crate::error::Result; -use crate::io_utils::{MemMap, Writer}; +use crate::io_utils::{Fill, MemMap}; -use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use byteorder::{BigEndian, ByteOrder}; use std::borrow::Borrow; -use std::collections::{BTreeMap, HashMap}; -use std::io::{prelude::*, Cursor, Seek, SeekFrom}; +use std::collections::BTreeMap; +use std::fmt; +use std::io::prelude::*; +use std::mem; use std::ops::RangeInclusive; use std::sync::Arc; use std::u64; -// ___________________________________________ -// | start_key | end_key | level | data_size | -// ------------------------------------------- -const IDX_META_SIZE: usize = KEY_LEN + KEY_LEN + 1 + 8; - -const KEY_LEN: usize = 3 * 8; -// _________________ -// | offset | size | -// ----------------- -const PTR_SIZE: usize = 2 * 8; -// __________________________________________ -// | key | timestamp | pointer OR tombstone | -// ------------------------------------------ -const INDEX_ENTRY_SIZE: usize = KEY_LEN + 8 + PTR_SIZE; -// Represented by zero offset and size -const TOMBSTONE: [u8; PTR_SIZE] = [0u8; PTR_SIZE]; +const INDEX_META_SIZE: usize = mem::size_of::(); +const KEY_LEN: usize = mem::size_of::(); +const INDEX_ENTRY_SIZE: usize = mem::size_of::(); +const INDEX_RECORD_SIZE: usize = KEY_LEN + INDEX_ENTRY_SIZE; #[derive(Clone, Debug)] pub struct SSTable { @@ -34,7 +24,7 @@ pub struct SSTable { meta: IndexMeta, } -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] pub struct IndexMeta { pub level: u8, pub data_size: u64, @@ -42,26 +32,30 @@ pub struct IndexMeta { pub end: Key, } -#[derive(Debug, Default, PartialEq, PartialOrd, Eq, Ord, Clone, Copy, Hash)] +#[derive( + Debug, Default, PartialEq, PartialOrd, Eq, Ord, Clone, Copy, Hash, Serialize, Deserialize, +)] pub struct Key(pub [u8; 24]); -#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Copy, Clone)] -pub struct Index { +#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)] +pub struct IndexEntry { + pub timestamp: i64, pub offset: u64, pub size: u64, } -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct Value { pub ts: i64, pub val: Option>, } -/// An iterator that produces logical view over a set of SSTables +/// An iterator that produces logical view over a set of SSTables. +/// It implements [direct k-way merge](https://en.wikipedia.org/wiki/K-way_merge_algorithm#Heap) +/// and reconciles out-of-date/deleted values in a lazy fashion. Inputs *MUST* be sorted pub struct Merged { sources: Vec, heads: BTreeMap<(Key, usize), Value>, - seen: HashMap, } impl SSTable { @@ -71,7 +65,7 @@ impl SSTable { #[allow(dead_code)] pub fn num_keys(&self) -> u64 { - ((self.index.len() - IDX_META_SIZE) / INDEX_ENTRY_SIZE) as u64 + ((self.index.len() - INDEX_META_SIZE) / INDEX_ENTRY_SIZE) as u64 } pub fn get(&self, key: &Key) -> Result> { @@ -92,8 +86,8 @@ impl SSTable { rows: &mut I, level: u8, max_table_size: u64, - data_wtr: &mut Writer, - index_wtr: &mut Writer, + data_wtr: &mut dyn Write, + index_wtr: &mut dyn Write, ) where I: Iterator, K: Borrow, @@ -123,8 +117,12 @@ impl SSTable { index_wtr.flush().expect(INDEX_ERR); } - pub fn create(rows: &mut I, level: u8, data_wtr: &mut Writer, index_wtr: &mut Writer) - where + pub fn create( + rows: &mut I, + level: u8, + data_wtr: &mut dyn Write, + index_wtr: &mut dyn Write, + ) where I: Iterator, K: Borrow, V: Borrow, @@ -133,7 +131,14 @@ impl SSTable { } pub fn from_parts(data: Arc, index: Arc) -> Result { - sst_from_parts(data, index) + let len = index.len() as usize; + + assert!(len > INDEX_META_SIZE); + assert_eq!((len - INDEX_META_SIZE) % INDEX_RECORD_SIZE, 0); + + let meta = bincode::deserialize_from(&index[..INDEX_META_SIZE])?; + + Ok(SSTable { data, index, meta }) } pub fn could_contain(&self, key: &Key) -> bool { @@ -165,8 +170,8 @@ impl SSTable { } impl Key { - pub const MIN: Key = Key([0u8; KEY_LEN as usize]); - pub const MAX: Key = Key([255u8; KEY_LEN as usize]); + pub const MIN: Key = Key([0u8; KEY_LEN]); + pub const MAX: Key = Key([255u8; KEY_LEN]); pub const ALL_INCLUSIVE: RangeInclusive = RangeInclusive::new(Key::MIN, Key::MAX); pub fn write(&self, wtr: &mut W) -> Result<()> { @@ -181,6 +186,15 @@ impl Key { } } +impl Value { + pub fn new(commit: i64, data: Option>) -> Value { + Value { + ts: commit, + val: data, + } + } +} + struct Scan { bounds: RangeInclusive, data: Arc, @@ -194,18 +208,19 @@ impl Scan { bounds, data, index, - index_pos: IDX_META_SIZE as usize, + index_pos: INDEX_META_SIZE as usize, } } fn step(&mut self) -> Result> { while self.index_pos < self.index.len() { let pos = self.index_pos as usize; - let end = pos + INDEX_ENTRY_SIZE; - let (key, ts, idx) = read_index_rec(&self.index[pos..end]); + let end = pos + INDEX_RECORD_SIZE; + + let (key, entry): (Key, IndexEntry) = bincode::deserialize_from(&self.index[pos..end])?; + self.index_pos = end; if key < *self.bounds.start() { - self.index_pos = end; continue; } @@ -214,22 +229,29 @@ impl Scan { return Ok(None); } - let bytes_opt = idx.map(|ptr| get_val(&self.data, ptr).to_vec()); - - let val = Value { ts, val: bytes_opt }; + let record_range = entry.offset as usize..(entry.offset + entry.size) as usize; + let (data_key, value) = bincode::deserialize_from(&self.data[record_range])?; + assert_eq!(data_key, key); - self.index_pos = end; - - return Ok(Some((key, val))); + return Ok(Some((data_key, value))); } Ok(None) } } +impl fmt::Display for Key { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let k0 = BigEndian::read_u64(&self.0[..8]); + let k1 = BigEndian::read_u64(&self.0[8..16]); + let k2 = BigEndian::read_u64(&self.0[16..]); + write!(f, "Key({}, {}, {})", k0, k1, k2) + } +} + impl From<(u64, u64, u64)> for Key { fn from((k0, k1, k2): (u64, u64, u64)) -> Self { - let mut buf = [0u8; KEY_LEN as usize]; + let mut buf = [0u8; KEY_LEN]; BigEndian::write_u64(&mut buf[..8], k0); BigEndian::write_u64(&mut buf[8..16], k1); @@ -239,46 +261,6 @@ impl From<(u64, u64, u64)> for Key { } } -impl Index { - fn write(&self, wtr: &mut W) -> Result<()> { - wtr.write_u64::(self.offset)?; - wtr.write_u64::(self.size)?; - Ok(()) - } - - #[inline] - fn read(bytes: &[u8]) -> Index { - let offset = BigEndian::read_u64(&bytes[..8]); - let size = BigEndian::read_u64(&bytes[8..16]); - - Index { offset, size } - } -} - -impl IndexMeta { - fn write(&self, wtr: &mut W) -> Result<()> { - self.start.write(wtr)?; - self.end.write(wtr)?; - wtr.write_u8(self.level)?; - wtr.write_u64::(self.data_size)?; - Ok(()) - } - - fn read(data: &[u8]) -> Self { - let start = Key::read(&data[..24]); - let end = Key::read(&data[24..48]); - let level = data[48]; - let data_size = BigEndian::read_u64(&data[49..57]); - - IndexMeta { - start, - end, - level, - data_size, - } - } -} - impl Merged where I: Iterator, @@ -292,11 +274,7 @@ where } } - Merged { - sources, - heads, - seen: HashMap::new(), - } + Merged { sources, heads } } } @@ -308,30 +286,38 @@ where fn next(&mut self) -> Option { while !self.heads.is_empty() { + // get new key let (key, source_idx) = *self.heads.keys().next().unwrap(); - let val = self.heads.remove(&(key, source_idx)).unwrap(); + let mut val = self.heads.remove(&(key, source_idx)).unwrap(); // replace if let Some((k, v)) = self.sources[source_idx].next() { self.heads.insert((k, source_idx), v); } - // merge logic - // if deleted, remember - let (deleted, stale) = match self.seen.get(&key) { - Some(&seen_ts) if seen_ts < val.ts => { - // fresh val - self.seen.insert(key, val.ts); - (val.val.is_none(), false) - } - Some(_) => (val.val.is_none(), true), - None => { - self.seen.insert(key, val.ts); - (val.val.is_none(), false) + // check for other versions of this record + while !self.heads.is_empty() { + let (next_key, source_idx) = *self.heads.keys().next().unwrap(); + + // Found a different version of the record + if key == next_key { + // pop this version, check if it's newer + let other_version = self.heads.remove(&(next_key, source_idx)).unwrap(); + if other_version.ts > val.ts { + val = other_version; + } + + // replace + if let Some((k, v)) = self.sources[source_idx].next() { + self.heads.insert((k, source_idx), v); + } + } else { + break; } - }; + } - if !(stale || deleted) { + // Don't produce deleted records + if val.val.is_some() { return Some((key, val)); } } @@ -358,66 +344,64 @@ impl Iterator for Scan { } } -fn sst_from_parts(data: Arc, index: Arc) -> Result { - let len = index.len() as usize; - - assert!(len > IDX_META_SIZE); - assert_eq!((len - IDX_META_SIZE) % INDEX_ENTRY_SIZE, 0); - - let mut rdr = Cursor::new(&**index); - let mut idx_buf = [0; IDX_META_SIZE]; - rdr.read_exact(&mut idx_buf)?; - - let meta = IndexMeta::read(&idx_buf); - - Ok(SSTable { data, index, meta }) -} - fn flush_index( - index: &BTreeMap)>, + index: &BTreeMap, meta: &IndexMeta, - wtr: &mut Writer, + writer: &mut dyn Write, ) -> Result<()> { - meta.write(wtr)?; + let mut entry_buffer = [0u8; INDEX_RECORD_SIZE]; + let mut meta_buffer = [0u8; INDEX_META_SIZE]; + + bincode::serialize_into(&mut meta_buffer[..], meta)?; + writer.write_all(&meta_buffer)?; + + for (key, entry) in index.iter() { + let rec = (key, entry); + entry_buffer.fill(0); - for (&key, &(ts, idx)) in index.iter() { - write_index_rec(wtr, (key, ts, idx))?; + bincode::serialize_into(&mut entry_buffer[..], &rec)?; + writer.write_all(&entry_buffer)?; } Ok(()) } -#[allow(clippy::type_complexity)] + fn flush_mem_table_capped( rows: &mut I, - wtr: &mut Writer, + mut wtr: &mut dyn Write, max_table_size: u64, -) -> Result<(u64, BTreeMap)>)> +) -> Result<(u64, BTreeMap)> where I: Iterator, K: Borrow, V: Borrow, { - let mut ssi = BTreeMap::new(); + let mut index = BTreeMap::new(); let mut size = 0; + let bincode_config = bincode::config(); for (key, val) in rows { - let (key, val) = (key.borrow(), val.borrow()); - let ts = val.ts; + let record = (key.borrow(), val.borrow()); + + let serialized_size = bincode_config.serialized_size(&record)?; + bincode::serialize_into(&mut wtr, &record)?; - let (index, item_size) = match val.val { - Some(ref bytes) => (Some(write_val(wtr, bytes)?), bytes.len()), - None => (None, 0), + let entry = IndexEntry { + timestamp: record.1.ts, + offset: size, + size: serialized_size, }; - size += item_size as u64; - ssi.insert(*key, (ts, index)); + size += serialized_size; + + index.insert(*record.0, entry); if size >= max_table_size { break; } } - Ok((size, ssi)) + Ok((size, index)) } #[inline] @@ -425,52 +409,165 @@ fn overlapping(r1: &RangeInclusive, r2: &RangeInclusive) -> b r1.start() <= r2.end() && r2.start() <= r1.end() } -#[inline] -fn write_val(wtr: &mut W, val: &[u8]) -> Result { - let offset = wtr.seek(SeekFrom::Current(0))?; - let size = val.len() as u64; +#[cfg(test)] +mod test { + use super::*; + use rand::{thread_rng, Rng}; + use std::sync::{Arc, RwLock}; - wtr.write_all(val)?; - Ok(Index { offset, size }) -} + #[test] + fn test_dump_data() { + let mut data_buffer = vec![]; + let records: BTreeMap<_, _> = gen_records().take(512).collect(); -#[inline] -fn get_val(mmap: &MemMap, idx: Index) -> &[u8] { - let row = &mmap[idx.offset as usize..(idx.offset + idx.size) as usize]; - assert_eq!(row.len(), idx.size as usize); - row -} + let (_, index) = + flush_mem_table_capped(&mut records.iter(), &mut data_buffer, u64::MAX).unwrap(); -#[inline] -fn write_index_rec(wtr: &mut W, (key, ts, ptr): (Key, i64, Option)) -> Result<()> { - key.write(wtr)?; + assert_eq!(index.len(), records.len()); + assert!(index.keys().eq(records.keys())); - wtr.write_i64::(ts)?; + let mut retrieved = BTreeMap::new(); - match ptr { - Some(idx) => idx.write(wtr)?, - None => wtr.write_all(&TOMBSTONE)?, - }; + for (key, entry) in index.iter() { + let range = entry.offset as usize..(entry.offset + entry.size) as usize; + let (data_key, value) = bincode::deserialize_from(&data_buffer[range]).unwrap(); + assert_eq!(&data_key, key); + retrieved.insert(data_key, value); + } - Ok(()) -} + assert_eq!(records, retrieved); + } -#[inline] -fn read_index_rec(bytes: &[u8]) -> (Key, i64, Option) { - assert_eq!(bytes.len(), INDEX_ENTRY_SIZE); - const TS_END: usize = KEY_LEN + 8; + #[test] + fn test_dump_indexes() { + let mut data_buffer = vec![]; + let mut index_buffer = vec![]; + let records: BTreeMap<_, _> = gen_records().take(512).collect(); - let mut key_buf = [0; KEY_LEN as usize]; - key_buf.copy_from_slice(&bytes[..KEY_LEN as usize]); - let key = Key(key_buf); - let ts = BigEndian::read_i64(&bytes[KEY_LEN..TS_END]); + let (data_size, index) = + flush_mem_table_capped(&mut records.iter(), &mut data_buffer, u64::MAX).unwrap(); - let idx_slice = &bytes[TS_END..INDEX_ENTRY_SIZE]; - let idx = if idx_slice == TOMBSTONE { - None - } else { - Some(Index::read(idx_slice)) - }; + let (&start, &end) = ( + index.keys().next().unwrap(), + index.keys().next_back().unwrap(), + ); + + let meta = IndexMeta { + start, + end, + data_size, + level: 0, + }; + + flush_index(&index, &meta, &mut index_buffer).unwrap(); + + let retrieved_meta = bincode::deserialize_from(&index_buffer[..INDEX_META_SIZE]).unwrap(); + assert_eq!(meta, retrieved_meta); + + // By iterating over the BTreeMap we also check the order of index entries as written + for (i, (key, entry)) in index.iter().enumerate() { + let start = i * INDEX_RECORD_SIZE + INDEX_META_SIZE; + let end = start + INDEX_RECORD_SIZE; + + let (retrieved_key, retrieved_entry) = + bincode::deserialize_from(&index_buffer[start..end]).unwrap(); + + assert_eq!(key, &retrieved_key); + assert_eq!(entry, &retrieved_entry); + } + } + + #[test] + fn test_sstable_scan() { + let mut data_buffer = vec![]; + let mut index_buffer = vec![]; + let records: BTreeMap<_, _> = gen_records().take(512).collect(); + + SSTable::create(&mut records.iter(), 0, &mut data_buffer, &mut index_buffer); + + let data = MemMap::Mem(Arc::new(RwLock::new(data_buffer))); + let index = MemMap::Mem(Arc::new(RwLock::new(index_buffer))); + + let sst = SSTable::from_parts(Arc::new(data), Arc::new(index)).unwrap(); + + let output_iter = Scan::new( + Key::ALL_INCLUSIVE, + Arc::clone(&sst.data), + Arc::clone(&sst.index), + ); + + assert!(output_iter.eq(records.into_iter())); + } + + #[test] + fn test_merge_2way() { + let records: BTreeMap<_, _> = gen_records().take(512).collect(); + let updates: BTreeMap<_, _> = records + .iter() + .map(|(k, v)| (*k, Value::new(v.ts + 1, Some(vec![])))) + .collect(); + let deletes: BTreeMap<_, _> = records + .iter() + .map(|(k, v)| (*k, Value::new(v.ts + 1, None))) + .collect(); + + let owned = |(k, v): (&Key, &Value)| (*k, v.clone()); + + let sources = vec![records.iter().map(owned), updates.iter().map(owned)]; + let merged: Vec<_> = Merged::new(sources).collect(); + assert!(merged.into_iter().eq(updates.into_iter())); + + let sources = vec![records.into_iter(), deletes.into_iter()]; + let merged: Vec<_> = Merged::new(sources).collect(); + assert_eq!(merged.len(), 0); + } + + #[test] + fn test_merge_4way() { + // delete last half, then update first half, then delete last half of first half + let start: BTreeMap<_, _> = gen_records().take(512).collect(); + let deletes: BTreeMap<_, _> = start + .iter() + .skip(256) + .map(|(k, v)| (*k, Value::new(v.ts + 1, None))) + .collect(); + let updates: BTreeMap<_, _> = start + .iter() + .take(256) + .map(|(k, v)| (*k, Value::new(v.ts + 2, Some(vec![])))) + .collect(); + let more_deletes: BTreeMap<_, _> = updates + .iter() + .skip(128) + .map(|(k, v)| (*k, Value::new(v.ts + 3, None))) + .collect(); + + let sources = vec![ + more_deletes.into_iter(), + updates.clone().into_iter(), + start.into_iter(), + deletes.into_iter(), + ]; + + let merged: Vec<_> = Merged::new(sources).collect(); + let expected: Vec<_> = updates.into_iter().take(128).collect(); + + assert_eq!(merged.len(), expected.len()); + assert_eq!(merged, expected); + } + + fn gen_records() -> impl Iterator { + let mut rng = thread_rng(); + let commit = rng.gen(); + + std::iter::repeat_with(move || { + let buf: [u8; KEY_LEN] = rng.gen(); + let data_size: u8 = buf[0]; + + let val = Some(vec![0; data_size as usize]); + + (Key(buf), Value::new(commit, val)) + }) + } - (key, ts, idx) } diff --git a/kvstore/src/writelog.rs b/kvstore/src/writelog.rs index 1c5987408c0f49..12fe68589e37f9 100644 --- a/kvstore/src/writelog.rs +++ b/kvstore/src/writelog.rs @@ -2,11 +2,10 @@ use crate::error::Result; use crate::io_utils::{CRCReader, CRCWriter}; use crate::sstable::Value; use crate::Key; -use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use memmap::Mmap; use std::collections::BTreeMap; use std::fs::{self, File}; -use std::io::{Read, Write}; +use std::io::Write; use std::path::{Path, PathBuf}; // RocksDb's log uses this size. @@ -162,7 +161,8 @@ impl LogWriter for CRCWriter { fn log(logger: &mut Logger, key: &Key, commit: i64, data: Option<&[u8]>) -> Result<()> { let writer = &mut logger.writer; - write_value(writer, key, commit, data)?; + + bincode::serialize_into(writer, &(key, commit, data))?; Ok(()) } @@ -184,90 +184,33 @@ fn file_opts() -> fs::OpenOptions { fn read_log(log_buf: &[u8]) -> Result> { let mut map = BTreeMap::new(); - if log_buf.len() <= 8 + 24 + 8 + 1 { - return Ok(map); - } let mut reader = CRCReader::new(log_buf, BLOCK_SIZE); - while let Ok((key, val)) = read_value(&mut reader) { - map.insert(key, val); + while let Ok((key, commit, opt_bytes)) = bincode::deserialize_from(&mut reader) { + map.insert(key, Value::new(commit, opt_bytes)); } Ok(map) } -#[inline] -fn write_value( - writer: &mut W, - key: &Key, - commit: i64, - data: Option<&[u8]>, -) -> Result<()> { - let len = 24 + 8 + 1 + data.map(<[u8]>::len).unwrap_or(0); - - writer.write_u64::(len as u64)?; - writer.write_all(&key.0)?; - writer.write_i64::(commit)?; - - match data { - Some(data) => { - writer.write_u8(1)?; - writer.write_all(data)?; - } - None => { - writer.write_u8(0)?; - } - } - - Ok(()) -} - -#[inline] -fn read_value(reader: &mut R) -> Result<(Key, Value)> { - let len = reader.read_u64::()?; - let data_len = len as usize - (24 + 8 + 1); - - let mut reader = reader.by_ref().take(len); - - let mut key_buf = [0; 24]; - reader.read_exact(&mut key_buf)?; - let key = Key(key_buf); - - let commit = reader.read_i64::()?; - let exists = reader.read_u8()? != 0; - - let data = if exists { - let mut buf = Vec::with_capacity(data_len); - reader.read_to_end(&mut buf)?; - Some(buf) - } else { - None - }; - - let val = Value { - ts: commit, - val: data, - }; - Ok((key, val)) -} - #[cfg(test)] mod test { use super::*; #[test] fn test_log_serialization() { - let (key, commit, data) = (&Key::from((1, 2, 3)), 4, vec![0; 1024]); + let (key, commit, data) = (Key::from((1, 2, 3)), 4, Some(vec![0; 1024])); let mut buf = vec![]; - write_value(&mut buf, key, commit, Some(&data)).unwrap(); + bincode::serialize_into(&mut buf, &(&key, commit, &data)).unwrap(); + buf.extend(std::iter::repeat(0).take(buf.len())); - let (stored_key, stored_val) = read_value(&mut &buf[..]).unwrap(); - assert_eq!(&stored_key, key); - assert_eq!(stored_val.val.as_ref().unwrap(), &data); - assert_eq!(stored_val.ts, commit); + let log_record: (Key, i64, Option>) = bincode::deserialize_from(&buf[..]).unwrap(); + assert_eq!(log_record.0, key); + assert_eq!(log_record.1, commit); + assert_eq!(log_record.2, data); } #[test]