Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: incomplete generations w/ single tokens generations and models that did not support chunking #2770

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions backends/v3/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ pub(crate) async fn batching_task(
};

// Try to get a new batch
if let Some((new_entries, new_batch, span)) = queue
if let Some((mut new_entries, new_batch, span)) = queue
.next_batch(min_size, max_size, prefill_token_budget, token_budget)
.await
{
Expand All @@ -209,11 +209,26 @@ pub(crate) async fn batching_task(
};
counter.increment(1);
}
let cached_batch = if support_chunking {
// Concat current batch to the new one
batches.pop()

let new_cached_batch = if support_chunking {
// Get cached batch
let cached_batch = batches.pop();
// Extend entries with the new entries since the batch will be
// concatenated during the prefill op server side
entries.extend(new_entries);
// Generate one token for both the cached batch and the new batch
let new_cached_batch =
prefill(&mut client, new_batch, cached_batch, &mut entries)
.instrument(span)
.await;
if new_cached_batch.is_none() {
// New cached batch is empty, no work left
break;
}
new_cached_batch
} else {
// Request are waiting only if we don't support chunking
// Request are waiting because we cannot concatenate the batches if the
// model/server does not support chunking
entries.iter_mut().for_each(|(_, entry)| {
// Create a new span to add the info that this entry is waiting
// because a new batch is being computed
Expand All @@ -224,23 +239,24 @@ pub(crate) async fn batching_task(
// Update entry
entry.temp_span = Some(entry_waiting_span);
});
None

// Generate one token for this new batch to have the attention past in cache
let new_cached_batch =
prefill(&mut client, new_batch, None, &mut new_entries)
.instrument(span)
.await;
if new_cached_batch.is_some() {
// Extend entries
entries.extend(new_entries);
}
new_cached_batch
};
entries.extend(new_entries);

// Generate one token for this new batch to have the attention past in cache
let new_cached_batch =
prefill(&mut client, new_batch, cached_batch, &mut entries)
.instrument(span)
.await;
// Reset waiting counter
waiting_tokens = 1;
// Extend current batch with the new batch
if let Some(new_cached_batch) = new_cached_batch {
batches.push(new_cached_batch);
} else if support_chunking {
// New cached batch is empty, no work left
break;
}
}

Expand Down
Loading