Skip to content

fix shared memory usage increasing bug and disenable drop_remainder i… #202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion configs/rec/crnn/crnn_icdar15.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ eval:
image_shape: [32, 100] # H, W
infer_mode: *infer_mode
character_dict_path: *character_dict_path
padding: True # aspect ratio will be preserved if true.
padding: False # aspect ratio will be preserved if true.
- NormalizeImage: # different from paddle (paddle wrongly normalize BGR image with RGB mean/std from ImageNet for det, and simple rescale to [-1, 1] in rec.
bgr_to_rgb: True
is_hwc: True
Expand Down
2 changes: 1 addition & 1 deletion configs/rec/crnn/crnn_resnet34.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@ eval:
loader:
shuffle: False # TODO: tbc
batch_size: 64
drop_remainder: True
drop_remainder: False
max_rowsize: 12
num_workers: 8
2 changes: 1 addition & 1 deletion configs/rec/crnn/crnn_vgg7.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@ eval:
loader:
shuffle: False # TODO: tbc
batch_size: 16
drop_remainder: True
drop_remainder: False
max_rowsize: 12
num_workers: 8
29 changes: 11 additions & 18 deletions mindocr/data/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,18 @@ def build_dataset(

# Set default multiprocessing params for data pipeline
## num_parallel_workers: Number of subprocesses used to fetch the dataset, transform data, or load batch in parallel
num_workers = loader_config.get("num_workers", 8)
cores = multiprocessing.cpu_count()
num_devices = 1 if num_shards is None else num_shards
cores = multiprocessing.cpu_count()
NUM_WORKERS_BATCH = 2
NUM_WORKERS_MAP = int(cores / num_devices - NUM_WORKERS_BATCH) # optimal num workers assuming all cpu cores are used in this job
num_workers = loader_config.get("num_workers", NUM_WORKERS_MAP)
if num_workers > int(cores / num_devices):
print(f'WARNING: num_workers is adjusted to {int(cores / num_devices)} since {num_workers}x{num_devices} exceeds the number of CPU cores {cores}')
num_workers = int(cores / num_devices)
## prefetch_size: the length of the cache queue in the data pipeline for each worker, used to reduce waiting time. Larger value leads to more memory consumption. Default: 16
prefetch_size = loader_config.get("prefetch_size", 16) #
ms.dataset.config.set_prefetch_size(prefetch_size)
## max_rowsize: MB of shared memory between processes to copy data
## max_rowsize: MB of shared memory between processes to copy data. Only used when python_multiprocessing is True.
max_rowsize = loader_config.get("max_rowsize", 64)
# auto tune num_workers, prefetch. (This conflicts the profiler)
#ms.dataset.config.set_autotune_interval(5)
Expand All @@ -111,20 +113,6 @@ def build_dataset(
dataset_column_names = dataset.get_output_columns()
print('==> Dataset output columns: \n\t', dataset_column_names)

# TODO: find optimal setting automatically according to num of CPU cores
num_workers = loader_config.get("num_workers", 8) # Number of subprocesses used to fetch the dataset/map data row/gen batch in parallel
cores = multiprocessing.cpu_count()
num_devices = 1 if num_shards is None else num_shards
if num_workers > int(cores / num_devices):
num_workers = int(cores / num_devices)
print('WARNING: num_workers is adjusted to {num_workers}, to fit {cores} CPU cores shared for {num_devices} devices')

prefetch_size = loader_config.get("prefetch_size", 16) # the length of the cache queue in the data pipeline for each worker, used to reduce waiting time. Larger value leads to more memory consumption. Default: 16
max_rowsize = loader_config.get("max_rowsize", 64) # MB of shared memory between processes to copy data

ms.dataset.config.set_prefetch_size(prefetch_size)
#print('Prefetch size: ', ms.dataset.config.get_prefetch_size())

## Generate source dataset (source w.r.t. the dataset.map pipeline) based on python callable numpy dataset in parallel
ds = ms.dataset.GeneratorDataset(
dataset,
Expand All @@ -150,7 +138,12 @@ def build_dataset(

drop_remainder = loader_config.get('drop_remainder', is_train)
if is_train and drop_remainder == False:
print('WARNING: drop_remainder should be True for training, otherwise the last batch may lead to training fail.')
print('WARNING: drop_remainder should be True for training, otherwise the last batch may lead to training fail in Graph mode')
if not is_train:
if drop_remainder:
print("WARNING: drop_remainder is forced to be False for evaluation to include the last batch for accurate evaluation." )
drop_remainder = False

dataloader = ds.batch(
batch_size,
drop_remainder=drop_remainder,
Expand Down
19 changes: 11 additions & 8 deletions mindocr/utils/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ class Evaluator:
metric:
"""

def __init__(self, network, loss_fn=None, postprocessor=None, metrics=None, visualize=False, verbose=False,
def __init__(self, network, dataloader, loss_fn=None, postprocessor=None, metrics=None, num_epochs_train=-1, visualize=False, verbose=False,
**kwargs):
self.net = network
self.postprocessor = postprocessor
# FIXME: process when metrics is not None
self.metrics = metrics if isinstance(metrics, List) else [metrics]
self.metric_names = []
for m in metrics:
Expand All @@ -41,24 +40,29 @@ def __init__(self, network, loss_fn=None, postprocessor=None, metrics=None, visu
# TODO: add support for computing evaluation loss
assert eval_loss == False, 'not impl'

def eval(self, dataloader, num_columns_to_net=1, num_keys_of_labels=None):
# create iterator
self.iterator = dataloader.create_tuple_iterator(num_epochs=num_epochs_train, output_numpy=False, do_copy=False)
self.num_batches_eval = dataloader.get_dataset_size()

def eval(self, num_columns_to_net=1, num_keys_of_labels=None):
"""
Args:
dataloader (Dataset): data iterator which generates tuple of Tensor defined by the transform pipeline and 'output_columns'
"""
eval_res = {}

self.net.set_train(False)
iterator = dataloader.create_tuple_iterator(num_epochs=1, output_numpy=False, do_copy=False)
for m in self.metrics:
m.clear()

for i, data in tqdm(enumerate(iterator), total=dataloader.get_dataset_size()):
for i, data in tqdm(enumerate(self.iterator), total=self.num_batches_eval):
# start = time.time()
# TODO: if network input is not just an image.
img = data[0] # ms.Tensor(batch[0])
gt = data[1:] # ground truth, (polys, ignore_tags) for det,
#print(i, img.shape, img.sum())

net_preds = self.net(img)
# net_inputs = data[:num_columns_to_net]
# gt = data[num_columns_to_net:] # ground truth
# preds = self.net(*net_inputs)
Expand All @@ -83,7 +87,6 @@ def eval(self, dataloader, num_columns_to_net=1, num_keys_of_labels=None):
for m in self.metrics:
res_dict = m.eval()
eval_res.update(res_dict)
# fps = total_frame / total_time

self.net.set_train(True)

Expand Down Expand Up @@ -125,7 +128,7 @@ def __init__(self,
self.log_interval = log_interval
self.batch_size = batch_size
if self.loader_eval is not None:
self.net_evaluator = Evaluator(network, loss_fn, postprocessor, metrics)
self.net_evaluator = Evaluator(network, loader, loss_fn, postprocessor, metrics)
self.main_indicator = main_indicator
self.best_perf = -1e8
else:
Expand Down Expand Up @@ -204,7 +207,7 @@ def on_train_epoch_end(self, run_context):
if self.loader_eval is not None:
if cur_epoch >= self.val_start_epoch and (cur_epoch - self.val_start_epoch) % self.val_interval == 0:
eval_start = time.time()
measures = self.net_evaluator.eval(self.loader_eval)
measures = self.net_evaluator.eval()
eval_done = True
if self.is_main_device:
perf = measures[self.main_indicator]
Expand Down
14 changes: 7 additions & 7 deletions tools/eval.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'''
Model evaluation
Model evaluation
'''
import sys
import os
Expand Down Expand Up @@ -55,15 +55,15 @@ def main(cfg):
if cfg.system.amp_level != 'O0':
print('INFO: Evaluation will run in full-precision(fp32)')

# TODO: check float type conversion in official Model.eval
#ms.amp.auto_mixed_precision(network, amp_level='O0')
# TODO: check float type conversion in official Model.eval
#ms.amp.auto_mixed_precision(network, amp_level='O0')

# postprocess, metric
postprocessor = build_postprocess(cfg.postprocess)
# postprocess network prediction
metric = build_metric(cfg.metric)

net_evaluator = Evaluator(network, None, postprocessor, [metric])
net_evaluator = Evaluator(network, loader_eval, None, postprocessor, [metric])

# log
print('='*40)
Expand All @@ -73,8 +73,8 @@ def main(cfg):
else:
print(f'Model: {cfg.model.backbone.name}-{cfg.model.neck.name}-{cfg.model.head.name}')
print('='*40)
measures = net_evaluator.eval(loader_eval)

measures = net_evaluator.eval()
if is_main_device:
print('Performance: ', measures)

Expand All @@ -97,5 +97,5 @@ def parse_args():
config = Dict(config)

#print(config)

main(config)