Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions fastdeploy/cache_manager/cache_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ def __init__(self, args):
def _init_gpu_cache(self, args):

if not args.create_cache_tensor:
logger.info("Waiting for runners to create kv cache.")
logger.info(f"[rank {self.rank}/{self.n_ranks}] Waiting for runners to create kv cache.")
while self.cache_ready_signal.value[self.rank] != 1:
time.sleep(1)
logger.info("OK! Stop waiting.")
time.sleep(0.1)
logger.info(f"[rank {self.rank}/{self.n_ranks}] OK! Stop waiting.")

logger.info("Initializing kv cache for all layers.")
logger.info(f"[rank {self.rank}/{self.n_ranks}] Initializing kv cache for all layers.")
paddle.set_device(f"gpu:{self.device}")
for i in range(args.num_layers + self.num_extra_layers):
num_gpu_blocks = args.num_gpu_blocks if i < args.num_layers else self.num_extra_layer_gpu_blocks
Expand All @@ -215,13 +215,13 @@ def _init_gpu_cache(self, args):
val_name = f"value_caches_{i}_rank{self.rank}.device{self.device}"

if args.create_cache_tensor:
logger.info(f"..creating kv cache for layer {i}: {cache_shape}")
logger.info(f"[rank {self.rank}/{self.n_ranks}] ..creating kv cache for layer {i}: {cache_shape}")
key_cache = paddle.full(shape=cache_shape, fill_value=0, dtype=args.cache_dtype)
val_cache = paddle.full(shape=cache_shape, fill_value=0, dtype=args.cache_dtype)
set_data_ipc(key_cache, key_name)
set_data_ipc(val_cache, val_name)
else:
logger.info(f"..attaching kv cache for layer {i}: {cache_shape}")
logger.info(f"[rank {self.rank}/{self.n_ranks}] ..attaching kv cache for layer {i}: {cache_shape}")
key_cache = paddle.empty(shape=[], dtype=args.cache_dtype)
val_cache = paddle.empty(shape=[], dtype=args.cache_dtype)
key_cache = share_external_data(key_cache, key_name, cache_shape)
Expand All @@ -233,33 +233,37 @@ def _init_gpu_cache(self, args):
self.gpu_cache_v_tensors.append(self.gpu_cache_kvs[val_name])

if args.create_cache_tensor:
logger.info("✅ kv cache is ready!")
logger.info("[rank {self.rank}/{self.n_ranks}] ✅ kv cache is ready!")
self.cache_ready_signal.value[self.rank] = 1

cache_kv_size_byte = sum([tmp.numel() * 1 for key, tmp in self.gpu_cache_kvs.items()])
logger.info(f"device :{self.device}")
logger.info(f"cache_kv_size_byte : {cache_kv_size_byte}")
logger.info(f"done init cache (full) gmem alloc : {paddle.device.cuda.memory_allocated()}")
logger.info(f"[rank {self.rank}/{self.n_ranks}] device :{self.device}")
logger.info(f"[rank {self.rank}/{self.n_ranks}] cache_kv_size_byte : {cache_kv_size_byte}")
logger.info(
f"[rank {self.rank}/{self.n_ranks}] done init cache (full) gmem alloc : {paddle.device.cuda.memory_allocated()}"
)

def _init_cpu_cache(self, args):
if args.num_cpu_blocks == 0:
logger.info("💡 no swap space (cpu cache) is specified.")
logger.info(f"[rank {self.rank}/{self.n_ranks}] 💡 no swap space (cpu cache) is specified.")
self.swap_space_ready_signal.value[self.rank] = 1
return
logger.info("Initializing swap space (cpu cache) for all layers.")
logger.info(f"[rank {self.rank}/{self.n_ranks}] Initializing swap space (cpu cache) for all layers.")
paddle.set_device("cpu")
self.k_dst_ptrs = []
self.v_dst_ptrs = []
for i in range(args.num_layers + self.num_extra_layers):
key_name = f"key_caches_{i}_rank{self.rank}"
val_name = f"value_caches_{i}_rank{self.rank}"
need_to_allocate_bytes = args.num_cpu_blocks * args.bytes_per_layer_per_block
logger.info(f"..creating cpu cache for layer {i}: {2 * need_to_allocate_bytes / 1024 ** 3:.2f}GB")
logger.info(
f"[rank {self.rank}/{self.n_ranks}] ..creating cpu cache for layer {i}: {2 * need_to_allocate_bytes / 1024 ** 3:.2f}GB"
)
self.cpu_cache_kvs[key_name] = cuda_host_alloc(need_to_allocate_bytes)
self.k_dst_ptrs.append(self.cpu_cache_kvs[key_name])
self.cpu_cache_kvs[val_name] = cuda_host_alloc(need_to_allocate_bytes)
self.v_dst_ptrs.append(self.cpu_cache_kvs[val_name])
logger.info("✅ swap space (cpu cache) is ready!")
logger.info(f"[rank {self.rank}/{self.n_ranks}] ✅ swap space (cpu cache) is ready!")
self.swap_space_ready_signal.value[self.rank] = 1

def _do_swap_to_cpu_task(
Expand Down Expand Up @@ -473,6 +477,10 @@ def clear_or_update_caches(self, args):
while True:
if kv_cache_status_signal.value[0] == KVCacheStatus.CLEARING:
try:
logger.info(
f"[rank {self.rank}/{self.n_ranks}] Start clearing caches {self.cache_ready_signal.value}"
)
# clear cpu caches
if envs.FD_ENABLE_SWAP_SPACE_CLEARING:
paddle.set_device("cpu")
for ptrs in self.k_dst_ptrs + self.v_dst_ptrs:
Expand All @@ -486,37 +494,58 @@ def clear_or_update_caches(self, args):
while np.sum(self.swap_space_ready_signal.value) != 0:
time.sleep(0.1)

# clear gpu caches
paddle.set_device(f"gpu:{self.device}")
for name, tensor in self.gpu_cache_kvs.items():
unset_data_ipc(tensor, name, True, False)
self.gpu_cache_kvs.clear()
self.gpu_cache_k_tensors.clear()
self.gpu_cache_v_tensors.clear()

# reset cache_ready_signal
self.cache_ready_signal.value[self.rank] = 0
if np.sum(self.cache_ready_signal.value) == 0:
logger.info(
f"[rank {self.rank}/{self.n_ranks}] Finish clearing caches {self.cache_ready_signal.value}"
)

# wait for all ranks caches to be cleared
if np.sum(self.cache_ready_signal.value) != 0:
time.sleep(0.1)

# reset kv_cache_status_signal
kv_cache_status_signal.value[0] = KVCacheStatus.CLEARED
logger.info("All ranks finish clearing caches")

except Exception as e:
logger.error(f"Failed to clear caches: {e}")
logger.error(f"[rank {self.rank}/{self.n_ranks}] Failed to clear caches: {e}")

elif kv_cache_status_signal.value[0] == KVCacheStatus.UPDATING:
try:
logger.info(
f"[rank {self.rank}/{self.n_ranks}] Start restoring caches {self.cache_ready_signal.value}"
)
# restore cpu cache
if envs.FD_ENABLE_SWAP_SPACE_CLEARING:
self._init_cpu_cache(args)
while np.sum(self.swap_space_ready_signal.value) != args.mp_num:
time.sleep(0.1)

# restore gpu cache and set cache_ready_signal
self._init_gpu_cache(args)
logger.info(
f"[rank {self.rank}/{self.n_ranks}] Finish restoring caches {self.cache_ready_signal.value}"
)

# wait for all ranks caches to be ready
while np.sum(self.cache_ready_signal.value) != args.mp_num:
time.sleep(0.1)

# set kv_cache_status_signal
logger.info("All ranks finish restoring caches")
kv_cache_status_signal.value[0] = KVCacheStatus.NORMAL

except Exception as e:
logger.error(f"Failed to restore caches: {e}")
logger.error(f"[rank {self.rank}/{self.n_ranks}] Failed to restore caches: {e}")

time.sleep(0.1)

Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/rl/dynamic_weight_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def clear_parameters(self, pid: int = 0) -> None:
self._verify_parameters("clearance")
if self.parallel_config.tensor_parallel_size > 1:
paddle.distributed.barrier(self.parallel_config.tp_group)
paddle.distributed.shutdown_process_group(self.parallel_config.tp_group)
paddle.distributed.shutdown_process_group(self.parallel_config.tp_group)
if self.parallel_config.enable_expert_parallel:
paddle.distributed.barrier(self.parallel_config.ep_group)
paddle.distributed.shutdown_process_group(self.parallel_config.ep_group)
Expand Down
8 changes: 4 additions & 4 deletions fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1028,12 +1028,12 @@ def initialize_kv_cache(self, profile: bool = False) -> None:
create_cache_tensor = profile or self.parallel_config.splitwise_role == "mixed"

if not create_cache_tensor:
logger.info("Waiting for cache managers to create kv cache..")
logger.info(f"Waiting for cache managers to create kv cache.. {cache_ready_signal.value}")
while cache_ready_signal.value[self.local_rank] != 1:
time.sleep(1)
logger.info("OK! Stop waiting.")
logger.info(f"OK! Stop waiting. {cache_ready_signal.value}")

logger.info("Initializing kv cache for all layers.")
logger.info(f"Initializing kv cache for all layers. {cache_ready_signal.value}")
cache_kvs_list = []
for i in range(self.model_config.num_hidden_layers):
key_cache_name = f"key_caches_{i}_rank{local_rank}.device{self.device_id}"
Expand All @@ -1054,8 +1054,8 @@ def initialize_kv_cache(self, profile: bool = False) -> None:
self.share_inputs["caches"] = cache_kvs_list

if not profile and create_cache_tensor:
logger.info("✅ kv cache is ready!")
cache_ready_signal.value[self.local_rank] = 1
logger.info(f"✅ kv cache is ready! {cache_ready_signal.value}")

paddle.device.cuda.empty_cache()

Expand Down
Loading