Skip to content

Implement preemption via recomputation & Refactor scheduling logic #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cacheflow/http_frontend/fastapi_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ async def generate(self, request_dict: Dict):
seq = Sequence(seq_id, token_ids, block_size=self.block_size)
seqs.append(seq)

arrival_time = time.time()
group_id = next(self.seq_group_counter)
seq_group = SequenceGroup(group_id, seqs)
seq_group = SequenceGroup(group_id, seqs, arrival_time)
group_event = asyncio.Event()
self.sequence_group_events[group_id] = group_event
await self.server.add_sequence_groups.remote([(seq_group, sampling_params)])
Expand Down
3 changes: 2 additions & 1 deletion cacheflow/master/block_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def __init__(
self.block_tables: Dict[int, BlockTable] = {}

def can_allocate(self, seq_group: SequenceGroup) -> bool:
# NOTE: Here we assume that all sequences in the group have the same prompt.
# FIXME(woosuk): Here we assume that all sequences in the group share
# the same prompt. This may not be true for preempted sequences.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, is this function only wrong when we use recomputation preemption for parallel decoding?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and for beam search as well.

seq = seq_group.seqs[0]
num_required_blocks = len(seq.logical_token_blocks)
num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()
Expand Down
45 changes: 45 additions & 0 deletions cacheflow/master/policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import List

from cacheflow.sequence import SequenceGroup


class Policy:

def get_priority(
self,
now: float,
seq_group: SequenceGroup,
) -> float:
raise NotImplementedError

def sort_by_priority(
self,
now: float,
seq_groups: List[SequenceGroup],
) -> List[SequenceGroup]:
return sorted(
seq_groups,
key=lambda seq_group: self.get_priority(now, seq_group),
reverse=True,
)


class FCFS(Policy):

def get_priority(
self,
now: float,
seq_group: SequenceGroup,
) -> float:
return now - seq_group.arrival_time


class PolicyFactory:

_POLICY_REGISTRY = {
'fcfs': FCFS,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we add SSF in another PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In this PR, I tried to make minimal changes.

}

@classmethod
def get_policy(cls, policy_name: str, **kwargs) -> Policy:
return cls._POLICY_REGISTRY[policy_name](**kwargs)
Loading