Skip to content

Commit 540a61c

Browse files
committed
Add error handling for ranged seeks on non-range compat indices (IndexCannotSeekRange).
Use point scans in query execution (`IxScanEq`). Refactor table index code with macro `same_for_all_types`.
1 parent c98532d commit 540a61c

File tree

12 files changed

+279
-248
lines changed

12 files changed

+279
-248
lines changed

crates/datastore/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ pub enum IndexError {
129129
NotUnique(IndexId),
130130
#[error("Key {1:?} was not found in index {0:?}")]
131131
KeyNotFound(IndexId, AlgebraicValue),
132+
#[error("IndexId {0:?} does not support seeking for a range")]
133+
IndexCannotSeekRange(IndexId),
132134
}
133135

134136
#[derive(Error, Debug, PartialEq, Eq)]

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use spacetimedb_table::{
5050
indexes::{RowPointer, SquashedOffset},
5151
page_pool::PagePool,
5252
table::{IndexScanPointIter, IndexScanRangeIter, InsertError, RowRef, Table, TableAndIndex, TableScanIter},
53+
table_index::IndexSeekRangeResult,
5354
};
5455
use std::collections::BTreeMap;
5556
use std::sync::Arc;
@@ -213,8 +214,8 @@ impl StateView for CommittedState {
213214
range: R,
214215
) -> Result<Self::IterByColRange<'_, R>> {
215216
match self.index_seek_range(table_id, &cols, &range) {
216-
Some(iter) => Ok(ScanOrIndex::Index(iter)),
217-
None => Ok(ScanOrIndex::Scan(ApplyFilter::new(
217+
Some(Ok(iter)) => Ok(ScanOrIndex::Index(iter)),
218+
None | Some(Err(_)) => Ok(ScanOrIndex::Scan(ApplyFilter::new(
218219
RangeOnColumn { cols, range },
219220
self.iter(table_id)?,
220221
))),
@@ -948,7 +949,7 @@ impl CommittedState {
948949
table_id: TableId,
949950
cols: &ColList,
950951
range: &impl RangeBounds<AlgebraicValue>,
951-
) -> Option<IndexScanRangeIter<'a>> {
952+
) -> Option<IndexSeekRangeResult<IndexScanRangeIter<'a>>> {
952953
self.tables
953954
.get(&table_id)?
954955
.get_index_by_cols_with_table(&self.blob_store, cols)

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ use spacetimedb_table::{
6767
BlobNumBytes, DuplicateError, IndexScanPointIter, IndexScanRangeIter, InsertError, RowRef, Table,
6868
TableAndIndex, UniqueConstraintViolation,
6969
},
70-
table_index::TableIndex,
70+
table_index::{IndexCannotSeekRange, IndexSeekRangeResult, TableIndex},
7171
};
7272
use std::{
7373
sync::Arc,
@@ -429,7 +429,8 @@ impl Datastore for MutTxId {
429429
.get_table_and_index(index_id)
430430
.ok_or_else(|| IndexError::NotFound(index_id))?;
431431

432-
Ok(self.index_scan_range_inner(table_id, tx_index, commit_index, range))
432+
self.index_scan_range_inner(table_id, tx_index, commit_index, range)
433+
.map_err(|IndexCannotSeekRange| IndexError::IndexCannotSeekRange(index_id).into())
433434
}
434435

435436
fn index_scan_point<'a>(
@@ -1373,27 +1374,36 @@ impl MutTxId {
13731374
let bounds =
13741375
Self::range_scan_decode_bounds(index_ty, prefix, prefix_elems, rstart, rend).map_err(IndexError::Decode)?;
13751376

1376-
let iter = self.index_scan_range_inner(table_id, tx_index, commit_index, &bounds);
1377+
let iter = self
1378+
.index_scan_range_inner(table_id, tx_index, commit_index, &bounds)
1379+
.map_err(|IndexCannotSeekRange| IndexError::IndexCannotSeekRange(index_id))?;
13771380

13781381
let (lower, upper) = bounds;
13791382
Ok((table_id, lower, upper, iter))
13801383
}
13811384

13821385
/// See [`MutTxId::index_scan_range`].
1386+
#[inline(always)]
13831387
fn index_scan_range_inner<'a>(
13841388
&'a self,
13851389
table_id: TableId,
13861390
tx_index: Option<TableAndIndex<'a>>,
13871391
commit_index: TableAndIndex<'a>,
13881392
bounds: &impl RangeBounds<AlgebraicValue>,
1389-
) -> IndexScanRanged<'a> {
1393+
) -> IndexSeekRangeResult<IndexScanRanged<'a>> {
13901394
// Get an index seek iterator for the tx and committed state.
1391-
let tx_iter = tx_index.map(|i| i.seek_range(bounds));
1395+
let tx_iter = tx_index.map(|i| i.seek_range(bounds)).transpose();
13921396
let commit_iter = commit_index.seek_range(bounds);
13931397

1398+
// Deal with not having ranged indices.
1399+
let (tx_iter, commit_iter) = match (tx_iter, commit_iter) {
1400+
(Ok(t), Ok(c)) => (t, c),
1401+
(Err(e), _) | (_, Err(e)) => return Err(e),
1402+
};
1403+
13941404
// Combine it all.
13951405
let dt = self.tx_state.get_delete_table(table_id);
1396-
ScanMutTx::combine(dt, tx_iter, commit_iter)
1406+
Ok(ScanMutTx::combine(dt, tx_iter, commit_iter))
13971407
}
13981408

13991409
/// Translate `index_id` to the table id, and commit/tx indices.
@@ -3176,8 +3186,10 @@ fn iter_by_col_range<'a, R: RangeBounds<AlgebraicValue>>(
31763186
// If there's an index, use that.
31773187
// It's sufficient to check that the committed state has an index
31783188
// as index schema changes are applied immediately.
3179-
if let Some(commit_iter) = committed_state.index_seek_range(table_id, &cols, &range) {
3180-
let tx_iter = tx_state.index_seek_range_by_cols(table_id, &cols, &range);
3189+
if let Some(Ok(commit_iter)) = committed_state.index_seek_range(table_id, &cols, &range) {
3190+
let tx_iter = tx_state
3191+
.index_seek_range_by_cols(table_id, &cols, &range)
3192+
.map(|r| r.expect("got a commit index so we should have a compatible tx index"));
31813193
let delete_table = tx_state.get_delete_table(table_id);
31823194
let iter = ScanMutTx::combine(delete_table, tx_iter, commit_iter);
31833195
Ok(ScanOrIndex::Index(iter))

crates/datastore/src/locking_tx_datastore/tx.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@ use super::{
44
state_view::{IterByColRangeTx, StateView},
55
IterByColEqTx, SharedReadGuard,
66
};
7-
use crate::execution_context::ExecutionContext;
7+
use crate::{error::IndexError, execution_context::ExecutionContext};
88
use spacetimedb_durability::TxOffset;
99
use spacetimedb_execution::Datastore;
1010
use spacetimedb_lib::metrics::ExecutionMetrics;
1111
use spacetimedb_primitives::{ColList, IndexId, TableId};
1212
use spacetimedb_sats::AlgebraicValue;
1313
use spacetimedb_schema::schema::TableSchema;
14-
use spacetimedb_table::table::{IndexScanPointIter, IndexScanRangeIter, TableAndIndex, TableScanIter};
14+
use spacetimedb_table::{
15+
table::{IndexScanPointIter, IndexScanRangeIter, TableAndIndex, TableScanIter},
16+
table_index::IndexCannotSeekRange,
17+
};
1518
use std::sync::Arc;
1619
use std::{future, num::NonZeroU64};
1720
use std::{
@@ -64,7 +67,8 @@ impl Datastore for TxId {
6467
index_id: IndexId,
6568
range: &impl RangeBounds<AlgebraicValue>,
6669
) -> anyhow::Result<Self::RangeIndexIter<'a>> {
67-
self.with_index(table_id, index_id, |i| i.seek_range(range))
70+
self.with_index(table_id, index_id, |i| i.seek_range(range))?
71+
.map_err(|IndexCannotSeekRange| IndexError::IndexCannotSeekRange(index_id).into())
6872
}
6973

7074
fn index_scan_point<'a>(

crates/datastore/src/locking_tx_datastore/tx_state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use spacetimedb_table::{
1111
pointer_map::PointerMap,
1212
static_assert_size,
1313
table::{IndexScanPointIter, IndexScanRangeIter, RowRef, Table, TableAndIndex},
14-
table_index::TableIndex,
14+
table_index::{IndexSeekRangeResult, TableIndex},
1515
};
1616
use std::collections::{btree_map, BTreeMap};
1717
use thin_vec::ThinVec;
@@ -142,7 +142,7 @@ impl TxState {
142142
table_id: TableId,
143143
cols: &ColList,
144144
range: &impl RangeBounds<AlgebraicValue>,
145-
) -> Option<IndexScanRangeIter<'a>> {
145+
) -> Option<IndexSeekRangeResult<IndexScanRangeIter<'a>>> {
146146
self.insert_tables
147147
.get(&table_id)?
148148
.get_index_by_cols_with_table(&self.blob_store, cols)

crates/execution/src/pipelined.rs

Lines changed: 111 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,8 @@ impl PipelinedProject {
284284
#[derive(Debug)]
285285
pub enum PipelinedExecutor {
286286
TableScan(PipelinedScan),
287-
IxScan(PipelinedIxScan),
287+
IxScanEq(PipelinedIxScanEq),
288+
IxScanRange(PipelinedIxScanRange),
288289
IxJoin(PipelinedIxJoin),
289290
IxDeltaScan(PipelinedIxDeltaScan),
290291
IxDeltaJoin(PipelinedIxDeltaJoin),
@@ -302,7 +303,22 @@ impl From<PhysicalPlan> for PipelinedExecutor {
302303
limit,
303304
delta,
304305
}),
305-
PhysicalPlan::IxScan(scan @ IxScan { delta: None, .. }, _) => Self::IxScan(scan.into()),
306+
PhysicalPlan::IxScan(
307+
scan @ IxScan {
308+
delta: None,
309+
arg: Sarg::Eq(..),
310+
..
311+
},
312+
_,
313+
) => Self::IxScanEq(scan.into()),
314+
PhysicalPlan::IxScan(
315+
scan @ IxScan {
316+
delta: None,
317+
arg: Sarg::Range(..),
318+
..
319+
},
320+
_,
321+
) => Self::IxScanRange(scan.into()),
306322
PhysicalPlan::IxScan(scan, _) => Self::IxDeltaScan(scan.into()),
307323
PhysicalPlan::IxJoin(
308324
IxJoin {
@@ -391,15 +407,16 @@ impl PipelinedExecutor {
391407
lhs.visit(f);
392408
rhs.visit(f);
393409
}
394-
Self::TableScan(..) | Self::IxScan(..) | Self::IxDeltaScan(..) => {}
410+
Self::TableScan(..) | Self::IxScanEq(..) | Self::IxScanRange(..) | Self::IxDeltaScan(..) => {}
395411
}
396412
}
397413

398414
/// Does this operation contain an empty delta scan?
399415
pub fn is_empty(&self, tx: &impl DeltaStore) -> bool {
400416
match self {
401417
Self::TableScan(scan) => scan.is_empty(tx),
402-
Self::IxScan(scan) => scan.is_empty(tx),
418+
Self::IxScanEq(scan) => scan.is_empty(tx),
419+
Self::IxScanRange(scan) => scan.is_empty(tx),
403420
Self::IxDeltaScan(scan) => scan.is_empty(tx),
404421
Self::IxJoin(join) => join.is_empty(tx),
405422
Self::IxDeltaJoin(join) => join.is_empty(tx),
@@ -418,7 +435,8 @@ impl PipelinedExecutor {
418435
) -> Result<()> {
419436
match self {
420437
Self::TableScan(scan) => scan.execute(tx, metrics, f),
421-
Self::IxScan(scan) => scan.execute(tx, metrics, f),
438+
Self::IxScanEq(scan) => scan.execute(tx, metrics, f),
439+
Self::IxScanRange(scan) => scan.execute(tx, metrics, f),
422440
Self::IxDeltaScan(scan) => scan.execute(tx, metrics, f),
423441
Self::IxJoin(join) => join.execute(tx, metrics, f),
424442
Self::IxDeltaJoin(join) => join.execute(tx, metrics, f),
@@ -506,7 +524,7 @@ impl PipelinedScan {
506524

507525
/// An index scan executor for a delta table.
508526
///
509-
/// TODO: There is much overlap between this executor and [PipelinedIxScan].
527+
/// TODO: There is much overlap between this executor and [PipelinedIxScanRange].
510528
/// But merging them requires merging the [Datastore] and [DeltaStore] traits,
511529
/// since the index scan interface is right now split between both.
512530
#[derive(Debug)]
@@ -659,9 +677,9 @@ impl PipelinedIxDeltaScan {
659677
}
660678
}
661679

662-
/// A pipelined executor for scanning an index
680+
/// A pipelined executor for range scanning an index
663681
#[derive(Debug)]
664-
pub struct PipelinedIxScan {
682+
pub struct PipelinedIxScanRange {
665683
/// The table id
666684
pub table_id: TableId,
667685
/// The index id
@@ -675,7 +693,7 @@ pub struct PipelinedIxScan {
675693
pub upper: Bound<AlgebraicValue>,
676694
}
677695

678-
impl From<IxScan> for PipelinedIxScan {
696+
impl From<IxScan> for PipelinedIxScanRange {
679697
fn from(scan: IxScan) -> Self {
680698
match scan {
681699
IxScan {
@@ -712,7 +730,7 @@ impl From<IxScan> for PipelinedIxScan {
712730
}
713731
}
714732

715-
impl PipelinedIxScan {
733+
impl PipelinedIxScanRange {
716734
/// We don't know statically if an index scan will return rows
717735
pub fn is_empty(&self, _: &impl DeltaStore) -> bool {
718736
false
@@ -794,6 +812,89 @@ impl PipelinedIxScan {
794812
}
795813
}
796814

815+
/// A pipelined executor for equality scanning an index
816+
#[derive(Debug)]
817+
pub struct PipelinedIxScanEq {
818+
/// The table id
819+
pub table_id: TableId,
820+
/// The index id
821+
pub index_id: IndexId,
822+
pub limit: Option<u64>,
823+
/// The point to scan the index for.
824+
pub point: AlgebraicValue,
825+
}
826+
827+
impl From<IxScan> for PipelinedIxScanEq {
828+
fn from(scan: IxScan) -> Self {
829+
match scan {
830+
IxScan {
831+
schema,
832+
limit,
833+
delta: _,
834+
index_id,
835+
prefix,
836+
arg: Sarg::Eq(_, v),
837+
} => {
838+
let point = if prefix.is_empty() {
839+
let mut elems = Vec::with_capacity(prefix.len() + 1);
840+
elems.extend(prefix.into_iter().map(|(_, v)| v));
841+
elems.push(v);
842+
AlgebraicValue::product(elems)
843+
} else {
844+
v
845+
};
846+
847+
Self {
848+
table_id: schema.table_id,
849+
index_id,
850+
limit,
851+
point,
852+
}
853+
}
854+
IxScan {
855+
arg: Sarg::Range(..), ..
856+
} => unreachable!(),
857+
}
858+
}
859+
}
860+
861+
impl PipelinedIxScanEq {
862+
/// We don't know statically if an index scan will return rows
863+
pub fn is_empty(&self, _: &impl DeltaStore) -> bool {
864+
false
865+
}
866+
867+
pub fn execute<'a, Tx: Datastore + DeltaStore>(
868+
&self,
869+
tx: &'a Tx,
870+
metrics: &mut ExecutionMetrics,
871+
f: &mut dyn FnMut(Tuple<'a>) -> Result<()>,
872+
) -> Result<()> {
873+
// Scan without a row limit.
874+
let scan = || tx.index_scan_point(self.table_id, self.index_id, &self.point);
875+
// Scan with an optional row limit.
876+
let scan_opt_limit = |limit| match limit {
877+
None => scan().map(Either::Left),
878+
Some(n) => scan().map(|iter| iter.take(n)).map(Either::Right),
879+
};
880+
let mut n = 0;
881+
let mut f = |t| {
882+
n += 1;
883+
f(t)
884+
};
885+
for ptr in scan_opt_limit(self.limit.map(|n| n as usize))?
886+
.map(Row::Ptr)
887+
.map(Tuple::Row)
888+
{
889+
f(ptr)?;
890+
}
891+
892+
metrics.index_seeks += 1;
893+
metrics.rows_scanned += n;
894+
Ok(())
895+
}
896+
}
897+
797898
/// A pipelined index join executor
798899
#[derive(Debug)]
799900
pub struct PipelinedIxJoin {

crates/physical-plan/src/plan.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,6 +1185,17 @@ pub struct IxScan {
11851185
#[derive(Debug, Clone, PartialEq, Eq)]
11861186
pub enum Sarg {
11871187
Eq(ColId, AlgebraicValue),
1188+
/// NOTE(centril): We currently never construct this variant.
1189+
/// We do have non-ranged hash indices.
1190+
/// This means that when we get around to using this variant,
1191+
/// we must change the rewrite rules such that we do not emit
1192+
/// [`IxScan`] on a hash index.
1193+
///
1194+
/// Moreover, an equality scan (the variant above)
1195+
/// `(a0, b0)` on an index `(a, b, c)` is actually a ranged scan.
1196+
/// We also currently do not emit such `IxScan`s where the number
1197+
/// of equalities provided are fewer than the number of columns in the index.
1198+
/// When we do, we must also account for hash indices in the rewrite rules.
11881199
Range(ColId, Bound<AlgebraicValue>, Bound<AlgebraicValue>),
11891200
}
11901201

crates/subscription/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use anyhow::{bail, Result};
22
use spacetimedb_execution::{
33
pipelined::{
4-
PipelinedExecutor, PipelinedIxDeltaJoin, PipelinedIxDeltaScan, PipelinedIxJoin, PipelinedIxScan,
5-
PipelinedProject,
4+
PipelinedExecutor, PipelinedIxDeltaJoin, PipelinedIxDeltaScan, PipelinedIxJoin, PipelinedIxScanEq,
5+
PipelinedIxScanRange, PipelinedProject,
66
},
77
Datastore, DeltaStore, Row,
88
};
@@ -36,7 +36,8 @@ impl Fragments {
3636
let mut index_ids = HashSet::new();
3737
for plan in self.insert_plans.iter().chain(self.delete_plans.iter()) {
3838
plan.visit(&mut |plan| match plan {
39-
PipelinedExecutor::IxScan(PipelinedIxScan { table_id, index_id, .. })
39+
PipelinedExecutor::IxScanEq(PipelinedIxScanEq { table_id, index_id, .. })
40+
| PipelinedExecutor::IxScanRange(PipelinedIxScanRange { table_id, index_id, .. })
4041
| PipelinedExecutor::IxDeltaScan(PipelinedIxDeltaScan { table_id, index_id, .. })
4142
| PipelinedExecutor::IxJoin(PipelinedIxJoin {
4243
rhs_table: table_id,

crates/table/benches/page_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ fn index_seek(c: &mut Criterion) {
917917
let mut elapsed = WallTime.zero();
918918
for _ in 0..num_iters {
919919
let (row, none) = time(&mut elapsed, || {
920-
let mut iter = index.seek_range(&col_to_seek);
920+
let mut iter = index.seek_range(&col_to_seek).unwrap();
921921
(iter.next(), iter.next())
922922
});
923923
assert!(

0 commit comments

Comments
 (0)