Skip to content

Commit

Permalink
reuse chunks for fp16 training.
Browse files Browse the repository at this point in the history
  • Loading branch information
feifeibear committed May 20, 2021
1 parent a22fa1a commit 6849adc
Show file tree
Hide file tree
Showing 14 changed files with 583 additions and 621 deletions.
26 changes: 17 additions & 9 deletions client/chunk_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,18 @@ def delete_free_chunks(self):
试图删除当前不被使用的chunk,即chunk内的tensor都是free状态的chunk
"""
# 释放cpu和gpu上所有free chunk,统计目标设备上腾出的空间

for chunk_id, chunk in self.chunk_id_to_chunk_dict.items():
if chunk.get_status() == PSChunkStatus.FREE:
self._delete_chunk(chunk)
pass
# if self._time_profile:
# sub_start_time = time.time()
# # TODO(jiaruifang) 耗时
# status = chunk.get_status()
# if self._time_profile:
# global_timer.memory_delete_elapse += time.time() - sub_start_time

# if status == PSChunkStatus.FREE:
# self._delete_chunk(chunk)

def get_next_access_moment(self, chunk: Chunk,
target_device: torch.device):
Expand All @@ -246,11 +255,9 @@ def get_next_access_moment(self, chunk: Chunk,
return mom
return self.moments_cnt_of_iteration + chunk.access_moments[0]

def _chunk_to_move_out_for_room_making(
self,
size_in_bytes: int,
target_device: torch.device,
) -> List:
def _chunk_to_move_out_for_room_making(self, size_in_bytes: int,
target_device: torch.device
) -> List:
"""
为target device腾出size大小,找出需要移动出哪些chunk
先释放cpu,gpu的所有free
Expand All @@ -271,13 +278,14 @@ def _chunk_to_move_out_for_room_making(
for chunk_id, chunk in self.chunk_id_to_chunk_dict.items():
if chunk.get_device() is not None and chunk.get_device(
).type == target_device.type and chunk.get_status(
) == PSChunkStatus.HOLD:
) != PSChunkStatus.COMPUTE:
# 本设备下一次被需要的时刻?本设备下一次不被需要的时刻
# 如果target_device 是cuda,
next_mom = 0 #self.get_next_access_moment(chunk, target_device)
# 按照next_mom从大到小排序,如果相同则按照chunk_id排序(只在预热阶段出现)
Q.put((-next_mom, chunk_id))
assert chunk.get_status() != PSChunkStatus.FREE
# TODO(jiaruifang)不立刻释放FREE chunk,而是让它参与复用
# assert chunk.get_status() != PSChunkStatus.FREE

while Q:
next_mom, chunk_id = Q.get()
Expand Down
36 changes: 17 additions & 19 deletions client/chunk_schema_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
from .chunk_list import ChunkList
from .chunk_tensor_index import ChunkTensorIndex
from .const import AccessType, PSTensorStatus
from .parameter import PSParameter
from .parameter import PSParameter, register_param
import torch
import logging
import sys


class ChunkShemaScheduler(object):
Expand All @@ -29,26 +30,22 @@ def __init__(self, default_chunk_size, module, optimizer, chunk_list,
self.chunk_list = chunk_list
self.chunk_tensor_index = chunk_tensor_index

def register_param(self, param, name=None):
param.ps_attr = PSParameter(param, name)

def schedule(self):
"""
为module和optimizer的参数指定chunk schema
schedule过程为所有parameter进行ps化
schedule过程为所有parameter注册成ps_tensor
"""
# 模型参数的data和grad间隔排列。
# Optimizer的M,V间隔排列
acc_cnt = 0
chunk_id = 0

# 注册model和optimizer的param是否应该和chunk layout schedule耦合?
# FP16和FP32都需要注册的
max_param_size = 0
# TODO(jiaruifang)用一次FWD来得到正确的执行顺序
# FP16 和 FP32不一样
for name, param in self.module.named_parameters(recurse=True):
self.register_param(param, name)
register_param(param, name)
numel = param.numel()
max_param_size = max(max_param_size, numel)
data_type = param.dtype
self.chunk_tensor_index.add_tensor(chunk_id,
param.ps_attr.data_id(),
Expand All @@ -61,27 +58,25 @@ def schedule(self):
acc_cnt += numel * 2
if acc_cnt >= self.default_chunk_size:
self.chunk_list.new_chunk(chunk_id, acc_cnt, data_type)
self.chunk_tensor_index.add_chunk(chunk_id, acc_cnt, data_type)
chunk_id += 1
acc_cnt = 0

self.optimizer.max_param_size = max_param_size

# 收尾,剩下的tensor凑不成一个至少default size大小的chunk
if acc_cnt > 0:
self.chunk_list.new_chunk(chunk_id, acc_cnt, data_type)
self.chunk_tensor_index.add_chunk(chunk_id, acc_cnt, data_type)
chunk_id += 1
acc_cnt = 0

# fp32 data和grad需要一个chunk即可。找到fp16最大的chunk
if hasattr(self.optimizer, "fp32_from_fp16_groups"):
# 分配一个chunk
logging.info(
f'schedule for fp16 fp32_from_fp16_groups, max_param_size {max_param_size}'
)
logging.info(f'schedule for fp16 fp32_from_fp16_groups')
for param_group in self.optimizer.fp32_from_fp16_groups:
for param in param_group:
# TODO, 还不能获取name
self.register_param(param, 'master')
register_param(param, 'master')
numel = param.ps_attr.ps_numel
data_type = param.dtype
self.chunk_tensor_index.add_tensor(chunk_id,
Expand All @@ -91,11 +86,14 @@ def schedule(self):
acc_cnt += numel
if acc_cnt > self.default_chunk_size:
self.chunk_list.new_chunk(chunk_id, acc_cnt, data_type)
self.chunk_tensor_index.add_chunk(
chunk_id, acc_cnt, data_type)
chunk_id += 1
acc_cnt = 0
# 收尾
if acc_cnt > 0:
self.chunk_list.new_chunk(chunk_id, acc_cnt, data_type)
self.chunk_tensor_index.add_chunk(chunk_id, acc_cnt, data_type)
chunk_id += 1
acc_cnt = 0

Expand Down Expand Up @@ -138,10 +136,10 @@ def schedule(self):
requires_grad=False)

ps_name_prefix = p.ps_attr.ps_name
self.register_param(state['exp_avg'],
f'{ps_name_prefix}.exp_avg')
self.register_param(state['exp_avg_sq'],
f'{ps_name_prefix}.exp_avg_sq')
register_param(state['exp_avg'],
f'{ps_name_prefix}.exp_avg')
register_param(state['exp_avg_sq'],
f'{ps_name_prefix}.exp_avg_sq')

numel = p.ps_attr.ps_numel
self.chunk_tensor_index.add_tensor(
Expand Down
51 changes: 48 additions & 3 deletions client/chunk_tensor_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,59 @@ def __init__(self):
self.dict_tensor_id_info: dict[int, TensorInfo] = {}
# 1-N dict, chunk_id -> List(tensor_id) in order of start_offset
self.dict_chunk_id_tensor_id: dict[int, List[int]] = {}
self.dict_chunk_id_chunk_info: dict[int, tuple] = {}

def add_chunk(self, chunk_id, chunk_size, data_type):
self.dict_chunk_id_chunk_info[chunk_id] = (chunk_size, data_type)

def find_gap(self, numel, data_type):
"""
在chunk list寻找满足numel大小,类型为data type的空隙
TODO(jiaruifang) 应该优先分配同设备的gap
实际使用场景非常具体:在fp16 BWD时,分配grad会在data的chunk内。
"""
for chunk_id, tensor_info_list in self.dict_chunk_id_tensor_id.items():
chunk_size, chunk_data_type = self.dict_chunk_id_chunk_info[
chunk_id]
if chunk_data_type != data_type or chunk_size < numel:
continue
prev_end = 0
for tensor_id in tensor_info_list:
info = self.dict_tensor_id_info[tensor_id]
status = info.status()
if status == PSTensorStatus.FREE:
continue
start = info.start_offset
gap = start - prev_end
if gap >= numel:
return chunk_id, prev_end
prev_end = start + info.numel

if chunk_size - prev_end >= numel:
return chunk_id, prev_end

return None, None

def reset(self):
self.dict_tensor_id_info.clear()
self.dict_chunk_id_tensor_id.clear()

def generate_grad_tensor_param(self):
"""
按chunk内部排列顺序生成所有当前没有被free的grad tensor所在的param
"""
for chunk_id, tensor_id_list in self.dict_chunk_id_tensor_id.items():
for tensor_id in tensor_id_list:
info = self.dict_tensor_id_info[tensor_id]
if info.access_type == AccessType.GRAD and info.status(
) != PSTensorStatus.FREE:
yield info.param

def generate_all_tensor_info(self):
"""
展示每个chunk中tensor的状态
"""
for chunk_id, tensor_info_list in self.dict_chunk_id_tenTensorInfosor_id.items(
):
for chunk_id, tensor_info_list in self.dict_tensor_id_info.items():
for tensor_id in tensor_info_list:
yield self.dict_tensor_id_info[tensor_id]

Expand Down Expand Up @@ -192,7 +234,10 @@ def tensor_id_to_chunk_id(self, tensor_id) -> int:

def get_chunk_id(self, param: PSParameter, access_type: AccessType) -> int:
tensor_id = param.ps_attr.get_tensor_id(access_type)
return self.dict_tensor_id_info[tensor_id].chunk_id
info = self.dict_tensor_id_info.get(tensor_id)
if info is None:
return None
return info.chunk_id

def visit_chunks(self, chunk_list: ChunkList):
total_bytes = 0
Expand Down
Loading

0 comments on commit 6849adc

Please sign in to comment.