Skip to content

perf: speedup flat fts#6054

Merged
westonpace merged 8 commits intolance-format:mainfrom
westonpace:perf/speedup-flat-fts
Mar 3, 2026
Merged

perf: speedup flat fts#6054
westonpace merged 8 commits intolance-format:mainfrom
westonpace:perf/speedup-flat-fts

Conversation

@westonpace
Copy link
Member

This adds various performance improvements to the flat FTS search. The most significant improvement is that it parallelizes the search.

This does have some impact to accuracy. To calculate bm25 we typically need to make two passes through the data. The first to count token frequencies and the second to count token scores. The current implementation avoids this by using "token frequency so far" when calculating the bm25 score. This is generally accurate when there is a lot of indexed data and a small amount of unindexed data because the "token frequency so far" gets bootstrapped by the frequencies from the index and so the effect of the unindexed frequencies are minimal.

However, this can be more significant when there is no index or the unindexed data makes up a significant portion of the data. In that case the "token frequency so far" can be quite inaccurate for the first few documents.

In parallelizing this search we make this problem worse since each thread is calculating its own independent "token frequency so far" and it will take longer for each one to arrive at a more accurate result.

The most accurate approach would probably be to just accumulate all data in memory, tokenize (in parallel), count token frequencies (back to serial), then calculate scores (in parallel). This does run the risk of accumulating too much data however.

Another alternative could be to accumulate up to some amount (e.g. 100MB), calculate initial token frequencies, and then parallelize the rest of the search using those initial token frequencies. I'm open to suggestions. In the meantime we could probably proceed with this PR as-is.

In addition to the parallelization this PR also makes various changes to the algorithm itself to avoid string copies. This cuts down on the CPU time by 5x on my system.

@github-actions
Copy link
Contributor

PR Review

P0: Bug — MemBM25Scorer::update silently drops tokens when index is None

In the None (no index) path of flat_bm25_search_stream, the scorer is initialized with an empty HashMap:

None => MemBM25Scorer::new(0, 0, HashMap::new()),

But MemBM25Scorer::update was changed to only update existing keys:

if let Some(old_count) = self.token_docs.get_mut(token) {
    *old_count += *count;
} else {
    log::warn!("Token {} not found in token_docs", token);
}

The old code used .entry(token.clone()).or_insert(0) which would create new entries. The new code never populates token_docs, so num_docs_containing_token() always returns 0, producing incorrect (inflated) IDF values for all queries when there is no index.

The fix applied to the index_bm25_scorer.num_docs() == 0 branch (pre-populating token_docs from query tokens) should also be applied to the None branch.

P1: doc_norm uses matching-token count instead of total document length

let doc_norm = K1 * (1.0 - B + B * num_matching_tokens as f32 / scorer.avg_doc_length());

In BM25, dl should be the total number of tokens in the document (num_tokens), not just the tokens matching the query (num_matching_tokens). avg_doc_length() correctly uses all tokens, so the numerator should too. This was also wrong in the old code (used doc_tokens.len() after filtering), but since the code is being rewritten here, this would be a good time to fix it.

Minor

  • ESTIMATED_MAX_TOKENS_PER_ROW is defined but never used — should be removed or used.
  • num_docs_containing_token(&self, token: &String) changed from &str to &String — prefer keeping &str as it's more general.
  • There's a review question left as a code comment in do_flat_full_text_search (// What is this assertion for? ...). This should be resolved or removed before merge.

@wjones127
Copy link
Contributor

The most accurate approach would probably be to just accumulate all data in memory, tokenize (in parallel), count token frequencies (back to serial), then calculate scores (in parallel). This does run the risk of accumulating too much data however.

I feel like a very efficient way would be to tokenize in parallel, and then collect all tokenized data and compute in the end.

IIRC tokenization is usually the bottleneck, and I'd imagine the tokenized data is smaller than the original text, especially if you are able to filter out tokens that aren't relevant to the query.

@codecov
Copy link

codecov bot commented Feb 28, 2026

@westonpace
Copy link
Member Author

I feel like a very efficient way would be to tokenize in parallel, and then collect all tokenized data and compute in the end.

IIRC tokenization is usually the bottleneck, and I'd imagine the tokenized data is smaller than the original text, especially if you are able to filter out tokens that aren't relevant to the query.

This should be doable! We can tokenize and discard all tokens that aren't in the query and then accumulate. We might also need to accumulate token counts for each row but that should be small too. Maybe for a first pass, if we exceed some limit (e.g. 1GB) then we log a warning and just keep going (eventually OOM). The warning could be something like...

Accumualted more than 1GB of tokenized flat search data.  This means there is a lot of unindexed text data being searched.  The search is going to be very RAM intensive and may eventually OOM.  Create (or update) an index on the text column to avoid this issue.

@westonpace
Copy link
Member Author

Both of claude's suggestions are valid. I will revisit this weekend / Monday when I have time to add some regression tests for these cases.

@westonpace westonpace force-pushed the perf/speedup-flat-fts branch from 81dbfbf to 902b77e Compare March 2, 2026 15:59
@github-actions github-actions bot added the python label Mar 2, 2026
@westonpace
Copy link
Member Author

This should be doable! We can tokenize and discard all tokens that aren't in the query and then accumulate. We might also need to accumulate token counts for each row but that should be small too. Maybe for a first pass, if we exceed some limit (e.g. 1GB) then we log a warning and just keep going (eventually OOM). The warning could be something like...

Ok, I've implemented this approach. We now only accumulate (2 + N) * u64. The first two u64 are row id and total tokens in doc. The N u64s are the token count for each token in the query. Since query strings are relatively bounded this should be relatively modest in the M's of rows. Once we get to 100's of M or B's this will start to trigger OOMs and so it is important to create an FTS index before that.

if has_query_token(doc, &mut tokenizer, &query_tokens) {
results.push(row_id_array.value(i));
// What is this assertion for? Why would doc contain query? Don't we reach
// here only if they share at least one token? Why is it not debug_assert?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i can't remember why i added this assertion but it looks not reasonable, free to remove it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Comment on lines +2708 to +2710
if score > 0.0 {
row_ids_builder.append_value(row_id);
scores_builder.append_value(score);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems to append the row_id multiple times then we will get duplicated results if we have multiple tokens in the query?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch, good catch. I moved the append outside the for loop.

for token in query_tokens {
let freq = doc_token_count.get(token).copied().unwrap_or_default() as f32;

let freq = query_token_counts_iter.next().expect_ok()? as f32;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we consume this before continue at line 2700?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good catch.

Copy link
Contributor

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the approach. Had some minor questions plus I think Yang has some good questions about correctness within flat_bm25_score

}

pub fn num_docs_containing_token(&self, token: &str) -> usize {
pub fn num_docs_containing_token(&self, token: &String) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted. I swear at one point it was complaining but it seems fine now.

state.accumulated.pop().unwrap()
} else {
let b =
arrow_select::concat::concat_batches(&state.input_schema, &state.accumulated)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here appears to be quite sensitive to max_bytes. Could our data in state.accumulated be repeatedly concatenated and sliced?

For example, if max_bytes is 1MiB and we got 64MiB data, the data inside state.accumulated with be 64 -> 63 -> 62 ... -> 2 -> 1. Should we maintain an offset about the sliced data and make sure we only concat once on the raw input?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've modified the code so it only concatenates if the first slice is not large enough (smaller than min_bytes). Since min_bytes should be sufficiently less than max_bytes I think we should be good in most cases. In the event we have some really large outlier row and, as a result, slice inappropriately, we may still concatenate but I think that's enough of an outlier for the moment.

@westonpace westonpace force-pushed the perf/speedup-flat-fts branch from d8121cf to 2bdcdce Compare March 3, 2026 14:16
@westonpace westonpace merged commit 669d6b6 into lance-format:main Mar 3, 2026
28 checks passed
wjones127 pushed a commit to wjones127/lance that referenced this pull request Mar 4, 2026
This adds various performance improvements to the flat FTS search. The
most significant improvement is that it parallelizes the search.

This does have some impact to accuracy. To calculate bm25 we typically
need to make two passes through the data. The first to count token
frequencies and the second to count token scores. The current
implementation avoids this by using "token frequency so far" when
calculating the bm25 score. This is generally accurate when there is a
lot of indexed data and a small amount of unindexed data because the
"token frequency so far" gets bootstrapped by the frequencies from the
index and so the effect of the unindexed frequencies are minimal.

However, this can be more significant when there is no index or the
unindexed data makes up a significant portion of the data. In that case
the "token frequency so far" can be quite inaccurate for the first few
documents.

In parallelizing this search we make this problem worse since each
thread is calculating its own independent "token frequency so far" and
it will take longer for each one to arrive at a more accurate result.

The most accurate approach would probably be to just accumulate all data
in memory, tokenize (in parallel), count token frequencies (back to
serial), then calculate scores (in parallel). This does run the risk of
accumulating too much data however.

Another alternative could be to accumulate up to some amount (e.g.
100MB), calculate initial token frequencies, and then parallelize the
rest of the search using those initial token frequencies. I'm open to
suggestions. In the meantime we could probably proceed with this PR
as-is.

In addition to the parallelization this PR also makes various changes to
the algorithm itself to avoid string copies. This cuts down on the CPU
time by 5x on my system.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants