Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/benchmark_single_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class BenchmarkConfig:
cache_ratio: float
clear_cpu_cache: bool

def run_tp_client(dp_client_id, tp_rank, server_recv_port, model_config, cache_config):
def run_tp_client(dp_client_id, tp_rank, gpu_register_port, model_config, cache_config):
"""Run tp_client process"""
device_id = tp_rank + dp_client_id * model_config.tp_size
tp_client = KVTPClient(server_recv_port, dp_client_id, device_id)
tp_client = KVTPClient(gpu_register_port, dp_client_id, device_id)

num_gpu_blocks = cache_config.num_gpu_blocks

Expand Down
4 changes: 4 additions & 0 deletions benchmarks/benchmark_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def create_cpu_gpu_worker(
tokens_per_block=cache_config.tokens_per_block,
num_head=model_config.num_kv_heads,
head_size=model_config.head_size,
is_mla=model_config.use_mla,
)
gpu_layout = KVCacheLayout(
type=KVCacheLayoutType.LAYERFIRST,
Expand All @@ -71,6 +72,7 @@ def create_cpu_gpu_worker(
tokens_per_block=cache_config.tokens_per_block,
num_head=model_config.num_kv_heads,
head_size=model_config.head_size,
is_mla=model_config.use_mla,
)
gpu_layout = gpu_layout.div_head(model_config.tp_size) if not model_config.use_mla else gpu_layout
cpu_handle = CPUAllocator.allocate(
Expand Down Expand Up @@ -140,6 +142,7 @@ def create_cpu_ssd_worker(
tokens_per_block=cache_config.tokens_per_block,
num_head=model_config.num_kv_heads,
head_size=model_config.head_size,
is_mla=model_config.use_mla
)
ssd_layout = KVCacheLayout(
type=GLOBAL_CONFIG_FROM_ENV.ssd_layout_type,
Expand All @@ -148,6 +151,7 @@ def create_cpu_ssd_worker(
tokens_per_block=cache_config.tokens_per_block,
num_head=model_config.num_kv_heads,
head_size=model_config.head_size,
is_mla=model_config.use_mla
)
cpu_handle = CPUAllocator.allocate(
layout=cpu_layout,
Expand Down
82 changes: 53 additions & 29 deletions csrc/radix_tree.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include <errno.h>
#include <torch/extension.h>
#include <algorithm>
#include <deque>
#include <errno.h>
#include <memory>
#include <torch/extension.h>
#include <type_traits>
#include <algorithm>

#include "cache_utils.h"
#include "radix_tree.h"
Expand Down Expand Up @@ -49,11 +49,16 @@ CRadixNode *CRadixNode::split(int prefix_length) {
auto &new_block_hashes = new_node->get_block_hashes();
auto &new_physical_blocks = new_node->get_physical_blocks();

new_block_hashes.insert(new_block_hashes.end(), block_hashes.cbegin(), block_hashes.cbegin() + prefix_length);
new_physical_blocks.insert(new_physical_blocks.end(), physical_blocks.cbegin(), physical_blocks.cbegin() + prefix_length);
new_block_hashes.insert(new_block_hashes.end(), block_hashes.cbegin(),
block_hashes.cbegin() + prefix_length);
new_physical_blocks.insert(new_physical_blocks.end(),
physical_blocks.cbegin(),
physical_blocks.cbegin() + prefix_length);

block_hashes.erase(block_hashes.begin(), block_hashes.begin() + prefix_length);
physical_blocks.erase(physical_blocks.begin(), physical_blocks.begin() + prefix_length);
block_hashes.erase(block_hashes.begin(),
block_hashes.begin() + prefix_length);
physical_blocks.erase(physical_blocks.begin(),
physical_blocks.begin() + prefix_length);

parent->set_child(new_node->get_head_hash(), new_node);
new_node->set_parent(parent);
Expand All @@ -70,9 +75,10 @@ void CRadixNode::merge_child() {
assert(child->is_leaf());

block_hashes.insert(block_hashes.end(), child->get_block_hashes().cbegin(),
child->get_block_hashes().cend());
physical_blocks.insert(physical_blocks.end(), child->get_physical_blocks().cbegin(),
child->get_physical_blocks().cend());
child->get_block_hashes().cend());
physical_blocks.insert(physical_blocks.end(),
child->get_physical_blocks().cbegin(),
child->get_physical_blocks().cend());

set_time(std::max(get_time(), child->get_time()));
children.clear();
Expand All @@ -91,17 +97,24 @@ std::deque<int64_t> *CRadixNode::shrink(int length) {
auto remaining_length = size() - length;
auto shrink_blocks = new std::deque<int64_t>();

shrink_blocks->insert(shrink_blocks->end(), physical_blocks.begin() + remaining_length, physical_blocks.end());
shrink_blocks->insert(shrink_blocks->end(),
physical_blocks.begin() + remaining_length,
physical_blocks.end());

block_hashes.erase(block_hashes.begin() + remaining_length, block_hashes.end());
physical_blocks.erase(physical_blocks.begin() + remaining_length, physical_blocks.end());
block_hashes.erase(block_hashes.begin() + remaining_length,
block_hashes.end());
physical_blocks.erase(physical_blocks.begin() + remaining_length,
physical_blocks.end());

return shrink_blocks;
}

CRadixNode *CRadixTreeIndex::insert(torch::Tensor &physical_block_ids,
torch::Tensor &block_hashes, int num_blocks, int num_insert_blocks, bool ready,
CRadixNode *last_node, int num_matched_blocks, int last_node_matched_length) {
torch::Tensor &block_hashes, int num_blocks,
int num_insert_blocks, bool ready,
CRadixNode *last_node,
int num_matched_blocks,
int last_node_matched_length) {
if (num_insert_blocks == -1) {
num_insert_blocks = num_blocks;
}
Expand Down Expand Up @@ -131,8 +144,10 @@ CRadixNode *CRadixTreeIndex::insert(torch::Tensor &physical_block_ids,
auto block_hashes_ptr = block_hashes.data_ptr<int64_t>();
auto physical_block_ids_ptr = physical_block_ids.data_ptr<int64_t>();
for (auto i = 0; i + num_matched_blocks < num_insert_blocks; i++) {
new_block_hashes.insert(new_block_hashes.end(), block_hashes_ptr[i+num_matched_blocks]);
new_physical_blocks.insert(new_physical_blocks.end(), physical_block_ids_ptr[i]);
new_block_hashes.insert(new_block_hashes.end(),
block_hashes_ptr[i + num_matched_blocks]);
new_physical_blocks.insert(new_physical_blocks.end(),
physical_block_ids_ptr[i]);
}

if (last_node_matched_length < last_node->size()) {
Expand All @@ -156,7 +171,9 @@ CRadixNode *CRadixTreeIndex::insert(torch::Tensor &physical_block_ids,
int CRadixTreeIndex::evict(torch::Tensor &evicted_blocks, int num_evicted) {
int64_t *evicted_blocks_ptr = evicted_blocks.data_ptr<int64_t>();
int has_evicted = 0;
std::priority_queue<CRadixNode*, std::vector<CRadixNode*>, CRadixNode::Compare> candidate;
std::priority_queue<CRadixNode *, std::vector<CRadixNode *>,
CRadixNode::Compare>
candidate;

for (auto it = leaf_list.begin(); it != leaf_list.end(); it++) {
if ((*it)->evictable()) {
Expand Down Expand Up @@ -202,8 +219,9 @@ int CRadixTreeIndex::evict(torch::Tensor &evicted_blocks, int num_evicted) {
return has_evicted;
}

std::shared_ptr<CMatchResult> CRadixTreeIndex::match_prefix(
torch::Tensor &block_hashes, int num_blocks, bool update_cache_info) {
std::shared_ptr<CMatchResult>
CRadixTreeIndex::match_prefix(torch::Tensor &block_hashes, int num_blocks,
bool update_cache_info) {
auto current_node = root;
auto last_ready_node = root;
auto prefix_blocks_num = 0;
Expand All @@ -218,34 +236,39 @@ std::shared_ptr<CMatchResult> CRadixTreeIndex::match_prefix(
current_node->update_time(hit_reward_seconds);
}

child_hash = HashType(block_hashes_ptr[prefix_blocks_num + current_node->size()]);
child_hash =
HashType(block_hashes_ptr[prefix_blocks_num + current_node->size()]);
if (current_node->lookup_child(child_hash)) {
if (current_node->is_ready()) {
last_ready_node = current_node;
ready_prefix_blocks_num += current_node->size();
}
prefix_blocks_num += current_node->size();
physical_blocks->insert(physical_blocks->end(), current_node->get_physical_blocks().begin(),
current_node->get_physical_blocks().end());
physical_blocks->insert(physical_blocks->end(),
current_node->get_physical_blocks().begin(),
current_node->get_physical_blocks().end());
current_node = current_node->get_child(child_hash);
} else {
auto matched_length = 0;
if (is_root(current_node) == false) {
auto cmp_length = std::min(current_node->size(), num_blocks - prefix_blocks_num);
auto cmp_length =
std::min(current_node->size(), num_blocks - prefix_blocks_num);
auto left = 0;
auto right = cmp_length;

while (left < right) {
auto mid = (left + right) / 2;
if (current_node->get_hash(mid) == HashType(block_hashes_ptr[prefix_blocks_num+mid])) {
if (current_node->get_hash(mid) ==
HashType(block_hashes_ptr[prefix_blocks_num + mid])) {
left = mid + 1;
} else {
right = mid;
}
}
matched_length = left;
physical_blocks->insert(physical_blocks->end(), current_node->get_physical_blocks().begin(),
current_node->get_physical_blocks().begin() + matched_length);
physical_blocks->insert(
physical_blocks->end(), current_node->get_physical_blocks().begin(),
current_node->get_physical_blocks().begin() + matched_length);
} else {
matched_length = 0;
}
Expand All @@ -261,8 +284,9 @@ std::shared_ptr<CMatchResult> CRadixTreeIndex::match_prefix(
}
}

return std::make_shared<CMatchResult>(prefix_blocks_num, ready_prefix_blocks_num, last_node_matched_length,
last_ready_node, current_node, physical_blocks);
return std::make_shared<CMatchResult>(
ready_prefix_blocks_num, prefix_blocks_num, last_node_matched_length,
last_ready_node, current_node, physical_blocks);
}

} // namespace flexkv
8 changes: 4 additions & 4 deletions csrc/transfer_ssd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ static void _transfer_iouring_impl(
ssd_block_id /= num_files_per_device; // block id in single file

if (enable_block_first_transfer) {
int layers_chunk_size_in_bytes =
int64_t layers_chunk_size_in_bytes =
cpu_layer_stride_in_bytes * (end_layer - start_layer);
int cpu_layers_chunk_offset = start_layer * cpu_layer_stride_in_bytes;
int ssd_layers_chunk_offset = start_layer * ssd_layer_stride_in_bytes;
int64_t cpu_layers_chunk_offset = start_layer * cpu_layer_stride_in_bytes;
int64_t ssd_layers_chunk_offset = start_layer * ssd_layer_stride_in_bytes;
void *cpu_block_ptr = reinterpret_cast<char *>(cpu_tensor_ptr) +
block_stride_in_bytes * cpu_block_id +
cpu_layers_chunk_offset;
int ssd_block_offset =
int64_t ssd_block_offset =
ssd_block_id * block_stride_in_bytes + ssd_layers_chunk_offset;

ssize_t bytes_transfer = 0;
Expand Down
4 changes: 2 additions & 2 deletions docs/flexkv_config_reference/README_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ Some configurations can only be set through environment variables.
| Environment Variable | Type | Default | Description |
|---------------------|------|---------|-------------|
| `FLEXKV_MAX_FILE_SIZE_GB` | float | -1 | Maximum size of a single SSD file, -1 means unlimited |
| `FLEXKV_IORING_ENTRIES` | int | 512 | io_uring queue depth. Recommended to set to `512` to improve concurrent I/O performance |
| `FLEXKV_IORING_FLAGS` | int | 0 | io_uring flags, default is 0 |
| `FLEXKV_IOURING_ENTRIES` | int | 512 | io_uring queue depth. Recommended to set to `512` to improve concurrent I/O performance |
| `FLEXKV_IOURING_FLAGS` | int | 0 | io_uring flags, default is 0 |



Expand Down
4 changes: 2 additions & 2 deletions docs/flexkv_config_reference/README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ enable_gds: false
| 环境变量 | 类型 | 默认值 | 说明 |
|--------|------|--------|------|
| `FLEXKV_MAX_FILE_SIZE_GB` | float | -1 | 单个 SSD 文件的最大大小,-1表示不限 |
| `FLEXKV_IORING_ENTRIES` | int | 512 | io_uring 队列深度,推荐设为 `512` 以提升并发 IO 性能 |
| `FLEXKV_IORING_FLAGS` | int | 0 | io_uring 标志位,默认为 0|
| `FLEXKV_IOURING_ENTRIES` | int | 512 | io_uring 队列深度,推荐设为 `512` 以提升并发 IO 性能 |
| `FLEXKV_IOURING_FLAGS` | int | 0 | io_uring 标志位,默认为 0|



Expand Down
6 changes: 0 additions & 6 deletions examples/vllm_adaption/flexkv_config.json

This file was deleted.

4 changes: 4 additions & 0 deletions examples/vllm_adaption/flexkv_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cpu_cache_gb: 32
ssd_cache_gb: 64
ssd_cache_dir: ./ssd_cache/
enable_gds: false
31 changes: 16 additions & 15 deletions flexkv/cache/cache_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from flexkv.cache.transfer_pattern import add_virtal_op_for_mutiple_finished_ops
from flexkv.common.block import SequenceMeta
from flexkv.common.config import CacheConfig, ModelConfig, GLOBAL_CONFIG_FROM_ENV
from flexkv.common.exceptions import InvalidConfigError, NotEnoughSpaceError
from flexkv.common.transfer import (
DeviceType, TransferOpGraph, TransferOp, TransferType
)
Expand All @@ -55,11 +54,11 @@ def __init__(self,
evict_ratio: float,
hit_reward_seconds: int = 0):
if not isinstance(device_type, DeviceType):
raise InvalidConfigError(f"Unknown device type: {device_type}")
raise ValueError(f"Unknown device type: {device_type}")
if num_total_blocks <= 0:
raise InvalidConfigError(f"Invalid num_total_blocks: {num_total_blocks}")
raise ValueError(f"Invalid num_total_blocks: {num_total_blocks}")
if tokens_per_block <= 0 or (tokens_per_block & (tokens_per_block - 1)) != 0:
raise InvalidConfigError(f"Invalid tokens_per_block: {tokens_per_block}, "
raise ValueError(f"Invalid tokens_per_block: {tokens_per_block}, "
f"tokens_per_block must be a power of 2")

self.device_type = device_type
Expand Down Expand Up @@ -137,9 +136,9 @@ def take(self,
if protected_node is not None:
self.index.unlock(protected_node)
if strict and num_required_blocks > self.mempool.num_free_blocks:
raise NotEnoughSpaceError("Not enough free blocks to take, ",
required=num_required_blocks,
available=self.mempool.num_free_blocks)
raise RuntimeError(f"Not enough free blocks to take, "
f"required: {num_required_blocks}, "
f"available: {self.mempool.num_free_blocks}")
num_allocated_blocks = min(num_required_blocks, self.mempool.num_free_blocks)
return self.mempool.allocate_blocks(num_allocated_blocks)

Expand All @@ -154,11 +153,11 @@ def __init__(self,
evict_ratio: float,
hit_reward_seconds: int = 0):
if not isinstance(device_type, DeviceType):
raise InvalidConfigError(f"Unknown device type: {device_type}")
raise ValueError(f"Unknown device type: {device_type}")
if num_total_blocks <= 0:
raise InvalidConfigError(f"Invalid num_total_blocks: {num_total_blocks}")
raise ValueError(f"Invalid num_total_blocks: {num_total_blocks}")
if tokens_per_block <= 0 or (tokens_per_block & (tokens_per_block - 1)) != 0:
raise InvalidConfigError(f"Invalid tokens_per_block: {tokens_per_block}, "
raise ValueError(f"Invalid tokens_per_block: {tokens_per_block}, "
f"tokens_per_block must be a power of 2")

self.device_type = device_type
Expand Down Expand Up @@ -218,9 +217,9 @@ def take(self,
if protected_node is not None:
self.index.unlock(protected_node)
if strict and num_required_blocks > self.mempool.num_free_blocks:
raise NotEnoughSpaceError("Not enough free blocks to take, ",
required=num_required_blocks,
available=self.mempool.num_free_blocks)
raise RuntimeError("Not enough free blocks to take, ",
f"required: {num_required_blocks}, "
f"available: {self.mempool.num_free_blocks}")
num_allocated_blocks = min(num_required_blocks, self.mempool.num_free_blocks)
return self.mempool.allocate_blocks(num_allocated_blocks)

Expand Down Expand Up @@ -626,7 +625,9 @@ def _get_impl_local(self,
)
transfer_graph.add_transfer_op(op_gds_transfer)
finished_ops_ids.append(op_gds_transfer.op_id)
op_node_to_ready[op_gds_transfer.op_id] = (DeviceType.SSD, ssd_node_to_unlock, ssd_node_to_unlock.size())
op_node_to_ready[op_gds_transfer.op_id] = (DeviceType.SSD,
ssd_node_to_unlock,
ssd_node_to_unlock.size())
else:
fragment2_cpu_blocks = self.cpu_cache_engine.take(
num_required_blocks=fragment2_num_blocks,
Expand Down Expand Up @@ -970,7 +971,7 @@ def _put_impl_local(self,
protected_node = cpu_matched_result.last_node,
strict=False
)

if self.cache_config.enable_ssd:
fragment2_ssd_blocks = self.ssd_cache_engine.take(
num_required_blocks=fragment2_num_blocks,
Expand Down
4 changes: 1 addition & 3 deletions flexkv/cache/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

import numpy as np

from flexkv.common.exceptions import NotEnoughSpaceError


class Mempool:
def __init__(
Expand All @@ -29,7 +27,7 @@ def allocate_blocks(self, num: int) -> np.ndarray:
if num < 0:
raise ValueError(f"num must be greater than 0, but got {num}")
if num > self._num_free:
raise NotEnoughSpaceError("Not enough free blocks", required=num, available=self._num_free)
raise ValueError(f"Not enough free blocks, required: {num}, available: {self._num_free}")

if num > len(self._free_ids) - self._free_ids_offset:
self._update_free_ids()
Expand Down
Loading