Conversation
PR ReviewP0: Bug —
|
I feel like a very efficient way would be to tokenize in parallel, and then collect all tokenized data and compute in the end. IIRC tokenization is usually the bottleneck, and I'd imagine the tokenized data is smaller than the original text, especially if you are able to filter out tokens that aren't relevant to the query. |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
This should be doable! We can tokenize and discard all tokens that aren't in the query and then accumulate. We might also need to accumulate token counts for each row but that should be small too. Maybe for a first pass, if we exceed some limit (e.g. 1GB) then we log a warning and just keep going (eventually OOM). The warning could be something like... |
|
Both of claude's suggestions are valid. I will revisit this weekend / Monday when I have time to add some regression tests for these cases. |
81dbfbf to
902b77e
Compare
Ok, I've implemented this approach. We now only accumulate (2 + N) * u64. The first two u64 are row id and total tokens in doc. The N u64s are the token count for each token in the query. Since query strings are relatively bounded this should be relatively modest in the M's of rows. Once we get to 100's of M or B's this will start to trigger OOMs and so it is important to create an FTS index before that. |
| if has_query_token(doc, &mut tokenizer, &query_tokens) { | ||
| results.push(row_id_array.value(i)); | ||
| // What is this assertion for? Why would doc contain query? Don't we reach | ||
| // here only if they share at least one token? Why is it not debug_assert? |
There was a problem hiding this comment.
ah i can't remember why i added this assertion but it looks not reasonable, free to remove it
| if score > 0.0 { | ||
| row_ids_builder.append_value(row_id); | ||
| scores_builder.append_value(score); |
There was a problem hiding this comment.
it seems to append the row_id multiple times then we will get duplicated results if we have multiple tokens in the query?
There was a problem hiding this comment.
Ouch, good catch. I moved the append outside the for loop.
| for token in query_tokens { | ||
| let freq = doc_token_count.get(token).copied().unwrap_or_default() as f32; | ||
|
|
||
| let freq = query_token_counts_iter.next().expect_ok()? as f32; |
There was a problem hiding this comment.
should we consume this before continue at line 2700?
wjones127
left a comment
There was a problem hiding this comment.
I like the approach. Had some minor questions plus I think Yang has some good questions about correctness within flat_bm25_score
| } | ||
|
|
||
| pub fn num_docs_containing_token(&self, token: &str) -> usize { | ||
| pub fn num_docs_containing_token(&self, token: &String) -> usize { |
There was a problem hiding this comment.
Reverted. I swear at one point it was complaining but it seems fine now.
| state.accumulated.pop().unwrap() | ||
| } else { | ||
| let b = | ||
| arrow_select::concat::concat_batches(&state.input_schema, &state.accumulated) |
There was a problem hiding this comment.
The logic here appears to be quite sensitive to max_bytes. Could our data in state.accumulated be repeatedly concatenated and sliced?
For example, if max_bytes is 1MiB and we got 64MiB data, the data inside state.accumulated with be 64 -> 63 -> 62 ... -> 2 -> 1. Should we maintain an offset about the sliced data and make sure we only concat once on the raw input?
There was a problem hiding this comment.
Good catch. I've modified the code so it only concatenates if the first slice is not large enough (smaller than min_bytes). Since min_bytes should be sufficiently less than max_bytes I think we should be good in most cases. In the event we have some really large outlier row and, as a result, slice inappropriately, we may still concatenate but I think that's enough of an outlier for the moment.
d8121cf to
2bdcdce
Compare
This adds various performance improvements to the flat FTS search. The most significant improvement is that it parallelizes the search. This does have some impact to accuracy. To calculate bm25 we typically need to make two passes through the data. The first to count token frequencies and the second to count token scores. The current implementation avoids this by using "token frequency so far" when calculating the bm25 score. This is generally accurate when there is a lot of indexed data and a small amount of unindexed data because the "token frequency so far" gets bootstrapped by the frequencies from the index and so the effect of the unindexed frequencies are minimal. However, this can be more significant when there is no index or the unindexed data makes up a significant portion of the data. In that case the "token frequency so far" can be quite inaccurate for the first few documents. In parallelizing this search we make this problem worse since each thread is calculating its own independent "token frequency so far" and it will take longer for each one to arrive at a more accurate result. The most accurate approach would probably be to just accumulate all data in memory, tokenize (in parallel), count token frequencies (back to serial), then calculate scores (in parallel). This does run the risk of accumulating too much data however. Another alternative could be to accumulate up to some amount (e.g. 100MB), calculate initial token frequencies, and then parallelize the rest of the search using those initial token frequencies. I'm open to suggestions. In the meantime we could probably proceed with this PR as-is. In addition to the parallelization this PR also makes various changes to the algorithm itself to avoid string copies. This cuts down on the CPU time by 5x on my system.
This adds various performance improvements to the flat FTS search. The most significant improvement is that it parallelizes the search.
This does have some impact to accuracy. To calculate bm25 we typically need to make two passes through the data. The first to count token frequencies and the second to count token scores. The current implementation avoids this by using "token frequency so far" when calculating the bm25 score. This is generally accurate when there is a lot of indexed data and a small amount of unindexed data because the "token frequency so far" gets bootstrapped by the frequencies from the index and so the effect of the unindexed frequencies are minimal.
However, this can be more significant when there is no index or the unindexed data makes up a significant portion of the data. In that case the "token frequency so far" can be quite inaccurate for the first few documents.
In parallelizing this search we make this problem worse since each thread is calculating its own independent "token frequency so far" and it will take longer for each one to arrive at a more accurate result.
The most accurate approach would probably be to just accumulate all data in memory, tokenize (in parallel), count token frequencies (back to serial), then calculate scores (in parallel). This does run the risk of accumulating too much data however.
Another alternative could be to accumulate up to some amount (e.g. 100MB), calculate initial token frequencies, and then parallelize the rest of the search using those initial token frequencies. I'm open to suggestions. In the meantime we could probably proceed with this PR as-is.
In addition to the parallelization this PR also makes various changes to the algorithm itself to avoid string copies. This cuts down on the CPU time by 5x on my system.