Skip to content

Commit

Permalink
Fix reverse iterator in RocksDB (#2398)
Browse files Browse the repository at this point in the history
## Linked Issues/PRs
Closes #2044

## Description

### Problem analysis

Our database columns use the prefix extractor configuration which allow
rocksdb to make optimizations on storage based on prefix. However, this
breaks the lexicographic order between prefixs and we need to pass a
special option to the read iterator to bypass the "prefix sharding"
(source:
https://github.com/facebook/rocksdb/wiki/Prefix-Seek/3a5e28faf6c2d0ec1bdb99763043e1e3322007e9#how-to-ignore-prefix-bloom-filters-in-read).

### Current solution

I took the same approach as before and started iteration from the next
prefix using the bypass read option argument.

However I change the behavior to make only iterator to simplify and
optimize the solution.

### Better solution for the future

I think that breaking the "prefix sharding" from RocksDB to iterate can
make the iterator way more costly.

There is a way to avoid it because we can iterate in reverse order
inside a prefix (source:
https://github.com/facebook/rocksdb/wiki/SeekForPrev) but we need to get
the maximum value for a key in a given prefix to use it as a starting
point of our iteration which is not possible to get in the current code
because we don't have any information about the key.

Happy to here your thoughts @FuelLabs/client :)

EDIT: Not done : #2405

## Checklist
- [x] Breaking changes are clearly marked as such in the PR description
and changelog
- [x] New behavior is reflected in tests
- [x] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [x] I have reviewed the code myself
- [x] I have created follow-up issues caused by this PR and linked them
here

---------

Co-authored-by: green <xgreenx9999@gmail.com>
  • Loading branch information
AurelienFT and xgreenx authored Nov 14, 2024
1 parent 552ba75 commit 524b7f0
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2366](https://github.com/FuelLabs/fuel-core/pull/2366): The `importer_gas_price_for_block` metric is properly collected.
- [2369](https://github.com/FuelLabs/fuel-core/pull/2369): The `transaction_insertion_time_in_thread_pool_milliseconds` metric is properly collected.
- [2413](https://github.com/FuelLabs/fuel-core/issues/2413): block production immediately errors if unable to lock the mutex.
- [2389](https://github.com/FuelLabs/fuel-core/pull/2389): Fix construction of reverse iterator in RocksDB.

### Changed
- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ where
fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
let read_history = &self.read_db;
let height_key = height_key(key, &self.height);
let options = ReadOptions::default();
let mut options = ReadOptions::default();
// We need this option because our iterator will try to start in the `height_key` prefix section
// but if there is no data in this section, we expect the iterator to fetch data in an other prefix section.
// Without this option it's not guarantee that we fetch the correct next prefix section.
// Source : https://github.com/facebook/rocksdb/wiki/Prefix-Seek#how-to-ignore-prefix-bloom-filters-in-read
// and https://github.com/facebook/rocksdb/wiki/Prefix-Seek#general-prefix-seek-api
options.set_total_order_seek(true);
let nearest_modification = read_history
.iterator::<KeyAndValue>(
Column::HistoricalDuplicateColumn(column),
Expand Down
169 changes: 140 additions & 29 deletions crates/fuel-core/src/state/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,12 +443,9 @@ where
}

/// RocksDB prefix iteration doesn't support reverse order,
/// but seeking the start key and iterating in reverse order works.
/// So we can create a workaround. We need to find the next available
/// element and use it as an anchor for reverse iteration,
/// but skip the first element to jump on the previous prefix.
/// If we can't find the next element, we are at the end of the list,
/// so we can use `IteratorMode::End` to start reverse iteration.
/// so we need to to force the RocksDB iterator to order
/// all the prefix in the iterator so that we can take the next prefix
/// as start of iterator and iterate in reverse.
fn reverse_prefix_iter<T>(
&self,
prefix: &[u8],
Expand All @@ -457,28 +454,24 @@ where
where
T: ExtractItem,
{
let maybe_next_item = next_prefix(prefix.to_vec())
.and_then(|next_prefix| {
self.iter_store(
column,
Some(next_prefix.as_slice()),
None,
IterDirection::Forward,
)
.next()
})
.and_then(|res| res.ok());

if let Some((next_start_key, _)) = maybe_next_item {
let iter_mode = IteratorMode::From(
next_start_key.as_slice(),
rocksdb::Direction::Reverse,
);
let reverse_iterator = next_prefix(prefix.to_vec()).map(|next_prefix| {
let mut opts = self.read_options();
// We need this option because our iterator start in the `next_prefix` prefix section
// and continue in `prefix` section. Without this option the correct
// iteration between prefix section isn't guaranteed
// Source : https://github.com/facebook/rocksdb/wiki/Prefix-Seek#how-to-ignore-prefix-bloom-filters-in-read
// and https://github.com/facebook/rocksdb/wiki/Prefix-Seek#general-prefix-seek-api
opts.set_total_order_seek(true);
self.iterator::<T>(
column,
opts,
IteratorMode::From(next_prefix.as_slice(), rocksdb::Direction::Reverse),
)
});

if let Some(iterator) = reverse_iterator {
let prefix = prefix.to_vec();
self
.iterator::<T>(column, self.read_options(), iter_mode)
// Skip the element under the `next_start_key` key.
.skip(1)
iterator
.take_while(move |item| {
if let Ok(item) = item {
T::starts_with(item, prefix.as_slice())
Expand Down Expand Up @@ -612,8 +605,14 @@ where
// start iterating in a certain direction from the start key
let iter_mode =
IteratorMode::From(start, convert_to_rocksdb_direction(direction));
self.iterator::<T>(column, self.read_options(), iter_mode)
.into_boxed()
let mut opts = self.read_options();
// We need this option because our iterator start in the `start` prefix section
// and continue in next sections. Without this option the correct
// iteration between prefix section isn't guaranteed
// Source : https://github.com/facebook/rocksdb/wiki/Prefix-Seek#how-to-ignore-prefix-bloom-filters-in-read
// and https://github.com/facebook/rocksdb/wiki/Prefix-Seek#general-prefix-seek-api
opts.set_total_order_seek(true);
self.iterator::<T>(column, opts, iter_mode).into_boxed()
}
(Some(prefix), Some(start)) => {
// TODO: Maybe we want to allow the `start` to be without a `prefix` in the future.
Expand Down Expand Up @@ -1218,4 +1217,116 @@ mod tests {
let _ = open_with_part_of_columns
.expect("Should open the database with shorter number of columns");
}

#[test]
fn iter_store__reverse_iterator__no_target_prefix() {
// Given
let (mut db, _tmp) = create_db();
let value = Arc::new(Vec::new());
let key_1 = [1, 1];
let key_2 = [2, 2];
let key_3 = [9, 3];
let key_4 = [10, 0];
db.put(&key_1, Column::Metadata, value.clone()).unwrap();
db.put(&key_2, Column::Metadata, value.clone()).unwrap();
db.put(&key_3, Column::Metadata, value.clone()).unwrap();
db.put(&key_4, Column::Metadata, value.clone()).unwrap();

// When
let db_iter = db
.iter_store(
Column::Metadata,
Some(vec![5].as_slice()),
None,
IterDirection::Reverse,
)
.map(|item| item.map(|(key, _)| key))
.collect::<Vec<_>>();

// Then
assert_eq!(db_iter, vec![]);
}

#[test]
fn iter_store__reverse_iterator__target_prefix_at_the_middle() {
// Given
let (mut db, _tmp) = create_db();
let value = Arc::new(Vec::new());
let key_1 = [1, 1];
let key_2 = [2, 2];
let key_3 = [2, 3];
let key_4 = [10, 0];
db.put(&key_1, Column::Metadata, value.clone()).unwrap();
db.put(&key_2, Column::Metadata, value.clone()).unwrap();
db.put(&key_3, Column::Metadata, value.clone()).unwrap();
db.put(&key_4, Column::Metadata, value.clone()).unwrap();

// When
let db_iter = db
.iter_store(
Column::Metadata,
Some(vec![2].as_slice()),
None,
IterDirection::Reverse,
)
.map(|item| item.map(|(key, _)| key))
.collect::<Vec<_>>();

// Then
assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
}

#[test]
fn iter_store__reverse_iterator__target_prefix_at_the_end() {
// Given
let (mut db, _tmp) = create_db();
let value = Arc::new(Vec::new());
let key_1 = [1, 1];
let key_2 = [2, 2];
let key_3 = [2, 3];
db.put(&key_1, Column::Metadata, value.clone()).unwrap();
db.put(&key_2, Column::Metadata, value.clone()).unwrap();
db.put(&key_3, Column::Metadata, value.clone()).unwrap();

// When
let db_iter = db
.iter_store(
Column::Metadata,
Some(vec![2].as_slice()),
None,
IterDirection::Reverse,
)
.map(|item| item.map(|(key, _)| key))
.collect::<Vec<_>>();

// Then
assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
}

#[test]
fn iter_store__reverse_iterator__target_prefix_at_the_end__overflow() {
// Given
let (mut db, _tmp) = create_db();
let value = Arc::new(Vec::new());
let key_1 = [1, 1];
let key_2 = [255, 254];
let key_3 = [255, 255];
db.put(&key_1, Column::Metadata, value.clone()).unwrap();
db.put(&key_2, Column::Metadata, value.clone()).unwrap();
db.put(&key_3, Column::Metadata, value.clone()).unwrap();

// When
let db_iter = db
.iter_store(
Column::Metadata,
Some(vec![255].as_slice()),
None,
IterDirection::Reverse,
)
.map(|item| item.map(|(key, _)| key))
.collect::<Vec<_>>();

// Then
assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
}
}
50 changes: 50 additions & 0 deletions tests/tests/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,56 @@ async fn get_transactions_by_owner_returns_correct_number_of_results(
assert_eq!(transactions_forward.len(), 5);
}

#[test_case::test_case(PageDirection::Forward; "forward")]
#[test_case::test_case(PageDirection::Backward; "backward")]
#[tokio::test]
async fn get_transactions_by_owners_multiple_owners_returns_correct_number_of_results(
direction: PageDirection,
) {
let alice = Address::from([1; 32]);
let bob = Address::from([2; 32]);
let charlie = Address::from([3; 32]);

// Given
let mut context = TestContext::new(100).await;
let _ = context.transfer(bob, alice, 1).await.unwrap();
let _ = context.transfer(bob, alice, 2).await.unwrap();
let _ = context.transfer(bob, charlie, 1).await.unwrap();
let _ = context.transfer(alice, bob, 3).await.unwrap();
let _ = context.transfer(alice, bob, 4).await.unwrap();
let _ = context.transfer(alice, bob, 5).await.unwrap();
let _ = context.transfer(charlie, alice, 1).await.unwrap();
let _ = context.transfer(charlie, bob, 2).await.unwrap();
let _ = context.transfer(charlie, bob, 3).await.unwrap();
let _ = context.transfer(charlie, bob, 4).await.unwrap();
let _ = context.transfer(charlie, bob, 5).await.unwrap();

let client = context.client;
let all_transactions_with_direction = PaginationRequest {
cursor: None,
results: 10,
direction,
};

// When
let response = client
.transactions_by_owner(&bob, all_transactions_with_direction)
.await
.unwrap();

let transactions = response
.results
.into_iter()
.map(|tx| {
assert!(matches!(tx.status, TransactionStatus::Success { .. }));
tx.transaction
})
.collect_vec();

// Then
assert_eq!(transactions.len(), 10);
}

#[test_case::test_case(PageDirection::Forward; "forward")]
#[test_case::test_case(PageDirection::Backward; "backward")]
#[tokio::test]
Expand Down

0 comments on commit 524b7f0

Please sign in to comment.