-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils_.py
118 lines (95 loc) · 3.62 KB
/
utils_.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import re
import numpy as np
import torch
import torch.distributed as dist
import collections
import logging
def get_world_size() -> int:
if not dist.is_available():
return 1
if not dist.is_initialized():
return 1
return dist.get_world_size()
class LossMeter(object):
def __init__(self, maxlen=100):
"""Computes and stores the running average"""
self.vals = collections.deque([], maxlen=maxlen)
def __len__(self):
return len(self.vals)
def update(self, new_val):
self.vals.append(new_val)
@property
def val(self):
return sum(self.vals) / len(self.vals)
def __repr__(self):
return str(self.val)
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
def load_state_dict(state_dict_path, loc='cpu'):
state_dict = torch.load(state_dict_path, map_location=loc)
# Change Multi GPU to single GPU
original_keys = list(state_dict.keys())
for key in original_keys:
if key.startswith("module."):
new_key = key[len("module."):]
state_dict[new_key] = state_dict.pop(key)
return state_dict
def set_global_logging_level(level=logging.ERROR, prefices=[""]):
"""
Override logging levels of different modules based on their name as a prefix.
It needs to be invoked after the modules have been loaded so that their loggers have been initialized.
Args:
- level: desired level. e.g. logging.INFO. Optional. Default is logging.ERROR
- prefices: list of one or more str prefices to match (e.g. ["transformers", "torch"]). Optional.
Default is `[""]` to match all active loggers.
The match is a case-sensitive `module_name.startswith(prefix)`
"""
prefix_re = re.compile(fr'^(?:{ "|".join(prefices) })')
for name in logging.root.manager.loggerDict:
if re.match(prefix_re, name):
logging.getLogger(name).setLevel(level)
def reduce_dict(input_dict, average=True):
"""
Reduce the values in the dictionary from all processes so that process with rank
0 has the reduced results.
Args:
input_dict (dict): inputs to be reduced. (values not necessarily tensors).
average (bool): whether to do average or sum
Returns:
a dict with the same keys as input_dict, after reduction.
"""
world_size = get_world_size()
if world_size < 2:
return input_dict
with torch.no_grad():
# Convert to CUDA Tensor for dist.reduce()
input_dict_cuda_vals = {}
for k, v in input_dict.items():
if type(v) == torch.Tensor:
input_dict_cuda_vals[k] = v.to('cuda')
else:
input_dict_cuda_vals[k] = torch.tensor(v, device='cuda')
names = []
values = []
for k, v in sorted(input_dict_cuda_vals.items()):
names.append(k)
values.append(v)
values = torch.stack(values, dim=0)
dist.reduce(values, dst=0) # reduce to gpu 0
if dist.get_rank() == 0 and average:
# only main process gets accumulated, so only divide by
# world_size in this case
values /= world_size
reduced_dict = {k: v for k, v in zip(names, values)}
return reduced_dict
def setup_for_distributed(is_master):
"""
This function disables printing when not in master process
"""
import builtins as __builtin__
builtin_print = __builtin__.print
def print(*args, **kwargs):
force = kwargs.pop("force", False)
if is_master or force:
builtin_print(*args, **kwargs)
__builtin__.print = print