Skip to content

Commit

Permalink
[CodeStyle][UP031] fix /python/paddle/distributed/fleet/* - part 12 (
Browse files Browse the repository at this point in the history
  • Loading branch information
gouzil authored Jun 29, 2024
1 parent e9a4c42 commit 7384d3b
Show file tree
Hide file tree
Showing 18 changed files with 53 additions and 68 deletions.
8 changes: 3 additions & 5 deletions python/paddle/distributed/fleet/base/distributed_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,14 +776,13 @@ def sparse_optimizer_config(sgd, strategy, prefix):
def set_sparse_table_config(table_data, config):
for key in config:
if key not in support_sparse_key_list:
raise ValueError("strategy key '%s' not support" % (key))
raise ValueError(f"strategy key '{key}' not support")
table_class = config.get(
"sparse_table_class", "DownpourSparseTable"
)
if table_class not in support_sparse_table_class:
raise ValueError(
"support sparse_table_class: ['DownpourSparseTable, DownpourSparseSSDTable'], but actual %s"
% (table_class)
f"support sparse_table_class: ['DownpourSparseTable, DownpourSparseSSDTable'], but actual {table_class}"
)
if table_class == "DownpourSparseSSDTable":
table_data.table_class = 'SSDSparseTable'
Expand All @@ -806,8 +805,7 @@ def set_sparse_table_config(table_data, config):
)
if accessor_class not in support_sparse_accessor_class:
raise ValueError(
"support sparse_accessor_class: ['DownpourSparseValueAccessor', 'DownpourCtrAccessor', 'DownpourCtrDoubleAccessor', 'DownpourUnitAccessor', 'DownpourDoubleUnitAccessor', 'DownpourCtrDymfAccessor'], but actual %s"
% (accessor_class)
f"support sparse_accessor_class: ['DownpourSparseValueAccessor', 'DownpourCtrAccessor', 'DownpourCtrDoubleAccessor', 'DownpourUnitAccessor', 'DownpourDoubleUnitAccessor', 'DownpourCtrDymfAccessor'], but actual {accessor_class}"
)

if accessor_class.find("Double") >= 0:
Expand Down
4 changes: 2 additions & 2 deletions python/paddle/distributed/fleet/base/graphviz.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

def crepr(v):
if isinstance(v, str):
return '"%s"' % v
return f'"{v}"'
return str(v)


Expand Down Expand Up @@ -255,7 +255,7 @@ def add_op(self, opType, **kwargs):
highlight = kwargs['highlight']
del kwargs['highlight']
return self.graph.node(
"<<B>%s</B>>" % opType,
f"<<B>{opType}</B>>",
prefix="op",
description=opType,
shape="box",
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/fleet/base/strategy_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def __repr__(self):
if not self.list_of_group:
return debug_str + "No group."
for i in range(len(self.list_of_group)):
debug_str += f"Group[{i}]: {str(self.list_of_group[i])}; "
debug_str += f"Group[{i}]: {self.list_of_group[i]}; "
return debug_str


Expand Down
5 changes: 2 additions & 3 deletions python/paddle/distributed/fleet/base/util_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,7 @@ def _proto_check(self, config):
train_prog_var = train_prog.global_block().var(var_name)
except ValueError as e:
print(
"Not find variable '%s' in train program. please check pruning."
% var_name
f"Not find variable '{var_name}' in train program. please check pruning."
)
is_match = False
continue
Expand Down Expand Up @@ -669,7 +668,7 @@ def check_not_expected_ops(prog, not_expected_op_types):
return_numpy=return_numpy,
)
for i, v in enumerate(fetch_list):
print("fetch_targets name: %s" % v.name)
print(f"fetch_targets name: {v.name}")
print(f"fetch_targets: {results[i]}")
return results

Expand Down
14 changes: 6 additions & 8 deletions python/paddle/distributed/fleet/data_generator/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,10 @@ def _gen_str(self, line):
for item in line:
name, elements = item
if not isinstance(name, str):
raise ValueError("name%s must be in str type" % type(name))
raise ValueError(f"name{type(name)} must be in str type")
if not isinstance(elements, list):
raise ValueError(
"elements%s must be in list type" % type(elements)
f"elements{type(elements)} must be in list type"
)
if not elements:
raise ValueError(
Expand All @@ -336,8 +336,7 @@ def _gen_str(self, line):
self._proto_info[-1] = (name, "float")
elif not isinstance(elem, int):
raise ValueError(
"the type of element%s must be in int or float"
% type(elem)
f"the type of element{type(elem)} must be in int or float"
)
output += " " + str(elem)
else:
Expand All @@ -348,10 +347,10 @@ def _gen_str(self, line):
for index, item in enumerate(line):
name, elements = item
if not isinstance(name, str):
raise ValueError("name%s must be in str type" % type(name))
raise ValueError(f"name{type(name)} must be in str type")
if not isinstance(elements, list):
raise ValueError(
"elements%s must be in list type" % type(elements)
f"elements{type(elements)} must be in list type"
)
if not elements:
raise ValueError(
Expand All @@ -370,8 +369,7 @@ def _gen_str(self, line):
self._proto_info[index] = (name, "float")
elif not isinstance(elem, int):
raise ValueError(
"the type of element%s must be in int or float"
% type(elem)
f"the type of element{type(elem)} must be in int or float"
)
output += " " + str(elem)
return output + "\n"
3 changes: 1 addition & 2 deletions python/paddle/distributed/fleet/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,7 @@ def _check_use_var_with_data_generator(
for i, ele in enumerate(user_parsed_line):
if len(ele[1]) == 0:
raise ValueError(
"var length error: var %s's length in data_generator is 0"
% ele[0]
f"var length error: var {ele[0]}'s length in data_generator is 0"
)

if var_list[i].dtype == paddle.float32 and not all(
Expand Down
3 changes: 1 addition & 2 deletions python/paddle/distributed/fleet/fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ def __impl__(*args, **kwargs):
and cls._role_maker._is_non_distributed() is True
):
logger.warning(
"%s() function doesn't work when use non_distributed fleet."
% (func.__name__)
f"{func.__name__}() function doesn't work when use non_distributed fleet."
)
return

Expand Down
29 changes: 14 additions & 15 deletions python/paddle/distributed/fleet/launch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def get_cluster(
trainer.accelerators.extend(devices_per_proc[i])
else:
trainer.accelerators.append(devices_per_proc[i])
trainer.endpoint = "%s" % (cur_node_endpoints[i])
trainer.endpoint = f"{cur_node_endpoints[i]}"
trainer.rank = trainer_rank
trainer_rank += 1

Expand Down Expand Up @@ -498,7 +498,7 @@ def start_local_trainers(
for idx, t in enumerate(pod.trainers):
proc_env = {
"PADDLE_TRAINER_ID": "%d" % t.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
"PADDLE_CURRENT_ENDPOINT": f"{t.endpoint}",
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
"PADDLE_RANK_IN_NODE": str(idx),
Expand All @@ -523,18 +523,18 @@ def start_local_trainers(
]

if len(t.accelerators) > 0 and pod.device_mode == DeviceMode.GPU:
proc_env["FLAGS_selected_gpus"] = "%s" % ",".join(
[str(g) for g in t.accelerators]
proc_env["FLAGS_selected_gpus"] = "{}".format(
",".join([str(g) for g in t.accelerators])
)

if len(t.accelerators) > 0:
proc_env["FLAGS_selected_accelerators"] = "%s" % ",".join(
[str(g) for g in t.accelerators]
proc_env["FLAGS_selected_accelerators"] = "{}".format(
",".join([str(g) for g in t.accelerators])
)
# to do: same code style in future
if framework.core.is_compiled_with_xpu() and len(t.accelerators) > 0:
proc_env["FLAGS_selected_xpus"] = "%s" % ",".join(
[str(g) for g in t.accelerators]
proc_env["FLAGS_selected_xpus"] = "{}".format(
",".join([str(g) for g in t.accelerators])
)

current_env.update(proc_env)
Expand Down Expand Up @@ -571,9 +571,9 @@ def start_local_trainers(
pre_fn = None if os.name == 'nt' else os.setsid
if log_dir is not None:
os.makedirs(log_dir, exist_ok=True)
if os.path.exists("%s/endpoints.log" % log_dir):
if os.path.exists(f"{log_dir}/endpoints.log"):
os.remove(f"{log_dir}/endpoints.log")
with open("%s/endpoints.log" % log_dir, "w") as f:
with open(f"{log_dir}/endpoints.log", "w") as f:
f.write("PADDLE_TRAINER_ENDPOINTS: \n")
f.write("\n".join(cluster.trainers_endpoints()))
if (
Expand Down Expand Up @@ -613,8 +613,7 @@ def pull_worker_log(tp):
except UnicodeEncodeError:
sys.stdout.write(
'UnicodeEncodeError occurs at this line. '
'Please refer to the original log file "%s"\n'
% tp.log_fn.name
f'Please refer to the original log file "{tp.log_fn.name}"\n'
)
tp.log_offset = fin.tell()

Expand Down Expand Up @@ -883,7 +882,7 @@ def get_mapped_cluster_without_rank_mapping(
assert len(ranks_per_node) == 1
for i in range(len(ranks_per_node)):
trainer = Trainer()
trainer.endpoint = "%s" % (cur_node_endpoints[i])
trainer.endpoint = f"{cur_node_endpoints[i]}"
trainer.rank = ranks_per_node[i]
pod.trainers.append(trainer)
cluster.pods.append(pod)
Expand Down Expand Up @@ -1002,7 +1001,7 @@ def get_relative_gpu_id(gpu_id):
trainer.accelerators.append(
get_relative_gpu_id(local_device_ids[0])
)
trainer.endpoint = "%s" % (cur_node_endpoints[i])
trainer.endpoint = f"{cur_node_endpoints[i]}"
trainer.rank = ranks_per_node[i]
pod.trainers.append(trainer)
cluster.pods.append(pod)
Expand Down Expand Up @@ -1936,7 +1935,7 @@ def check_backend(backend):
"paddle.distributed initialize error, "
"backend argument can only be one of "
"'nccl', 'gloo', 'bkcl', 'auto', 'heter', 'xccl' "
"but got %s" % backend
f"but got {backend}"
)

if backend == 'nccl' and not framework.core.is_compiled_with_cuda():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ def __init__(
raise TypeError(
"The type of grad_clip should be 'ClipGradByNorm', because DGCMomentumOptimizer only support ClipGradByNorm"
)
assert isinstance(num_trainers, int), (
"The type of num_trainers should be 'int', but received %s"
% type(num_trainers)
)
assert isinstance(
num_trainers, int
), f"The type of num_trainers should be 'int', but received {type(num_trainers)}"
assert (
num_trainers > 0
), "The value of num_trainers should be greater than 0!"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,7 @@ def get_sys_free_mem():
return free
else:
raise ValueError(
"%s platform is unsupported is parameter server optimizer"
% (platform.system())
f"{platform.system()} platform is unsupported is parameter server optimizer"
)

if not isinstance(self.inner_opt, paddle.optimizer.SGD):
Expand Down Expand Up @@ -334,8 +333,7 @@ def get_sys_free_mem():
if x < 0:
if neg_dim_count >= 1:
raise ValueError(
"Var %s has more than one negative dim."
% (var_name)
f"Var {var_name} has more than one negative dim."
)
neg_dim_count += 1
data_count *= -x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ def get_sys_free_mem():
return free
else:
raise ValueError(
"%s platform is unsupported is parameter server optimizer"
% (platform.system())
f"{platform.system()} platform is unsupported is parameter server optimizer"
)

if not isinstance(self.inner_opt, paddle.optimizer.SGD):
Expand Down Expand Up @@ -248,8 +247,7 @@ def get_sys_free_mem():
if x < 0:
if neg_dim_count >= 1:
raise ValueError(
"Var %s has more than one negative dim."
% (var_name)
f"Var {var_name} has more than one negative dim."
)
neg_dim_count += 1
data_count *= -x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _get_sharding_segment_strategy(self):
self._forward_remain_anchors = []
else:
raise NotImplementedError(
f"the sharding segment strategy [{str(segment_strategy)}] is not implemented"
f"the sharding segment strategy [{segment_strategy}] is not implemented"
)
self._sharding_segment_strategy = segment_strategy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ def _print_segmentation_for_debug(self):
)

for index, layer in enumerate(self._layers_desc[start:end]):
logger.info(f"{index + start}: {str(layer)}")
logger.info(f"{index + start}: {layer}")

if self._num_virtual_pipeline_stages > 1:
for stage in range(self._num_stages):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def initialize_p2p_groups(
) = _hcg.get_p2p_groups()

debug_str = (
f"P2pInfo: send_next_group: {repr(send_next_group)}, send_prev_group: {repr(send_prev_group)}, "
f"recv_next_group: {repr(recv_next_group)}, recv_prev_group: {repr(recv_prev_group)}"
f"P2pInfo: send_next_group: {send_next_group!r}, send_prev_group: {send_prev_group!r}, "
f"recv_next_group: {recv_next_group!r}, recv_prev_group: {recv_prev_group!r}"
)
logger.info(debug_str)

Expand Down
10 changes: 5 additions & 5 deletions python/paddle/distributed/fleet/runtime/the_one_ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,11 @@ def __init__(self):
def to_string(self, indent):
program_str = "{}tensor {{{}\n{}}}"
attrs = ""
attrs += f"feed_var_name: \"{str(self.feed_var_name)}\" "
attrs += f"fetch_var_name: \"{str(self.fetch_var_name)}\" "
attrs += f"startup_program_id: {str(self.startup_program_id)} "
attrs += f"main_program_id: {str(self.main_program_id)} "
attrs += f"tensor_table_class: \"{str(self.tensor_table_class)}\" "
attrs += f"feed_var_name: \"{self.feed_var_name}\" "
attrs += f"fetch_var_name: \"{self.fetch_var_name}\" "
attrs += f"startup_program_id: {self.startup_program_id} "
attrs += f"main_program_id: {self.main_program_id} "
attrs += f"tensor_table_class: \"{self.tensor_table_class}\" "
attrs += "\n"
return program_str.format(
conv_indent(indent), attrs, conv_indent(indent)
Expand Down
4 changes: 2 additions & 2 deletions python/paddle/distributed/fleet/utils/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ def __init__(
sleep_inter=1000,
): # ms
self.pre_commands = []
hadoop_bin = '%s/bin/hadoop' % hadoop_home
hadoop_bin = f'{hadoop_home}/bin/hadoop'
self.pre_commands.append(hadoop_bin)
dfs = 'fs'
self.pre_commands.append(dfs)
Expand Down Expand Up @@ -1216,7 +1216,7 @@ def list_files_info(self, path_list):
)
ret, lines = self._run_cmd(cmd)
if len(lines) == 0:
logger.warning("list_files empty, path[%s]" % path_list)
logger.warning(f"list_files empty, path[{path_list}]")
return []
for line in lines:
arr = line.split(' ')
Expand Down
4 changes: 1 addition & 3 deletions python/paddle/distributed/fleet/utils/log_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ def layer_to_str(base, *args, **kwargs):
if kwargs:
name += ", "
if kwargs:
name += ", ".join(
f"{key}={str(value)}" for key, value in kwargs.items()
)
name += ", ".join(f"{key}={value}" for key, value in kwargs.items())
name += ")"
return name
8 changes: 4 additions & 4 deletions python/paddle/distributed/fleet/utils/pp_parallel_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def apply(self, src_model_path: str, dst_model_path: str):
# first rank extract shared layer
with_shared = True
for dir in src_dirs:
print("extract layer params in dir %s" % dir)
print(f"extract layer params in dir {dir}")
layers.extend(self.extract_layers(dir, with_shared))
with_shared = False
# 2、sort and unique layers
Expand Down Expand Up @@ -240,15 +240,15 @@ def priority(elem):
return float(match.group(1).lstrip("."))

# strictly sort layers
print("before sort %s" % ("|".join([e[0] for e in layers])))
print("before sort {}".format("|".join([e[0] for e in layers])))
layers.sort(key=priority)
# unique
unique_layers = []
for e in layers:
if unique_layers and e[0] == unique_layers[-1][0]:
continue
unique_layers.append(e)
print("after sort %s " % ("|".join([e[0] for e in unique_layers])))
print("after sort {} ".format("|".join([e[0] for e in unique_layers])))
return unique_layers

def segment_layers(
Expand Down Expand Up @@ -358,7 +358,7 @@ def merge(src, dst, map_k=None):

lr_scheduler = None
for layer_names, file_path in layers_segment:
print("load %s" % file_path)
print(f"load {file_path}")
layer = paddle.load(file_path)

def get_param_name_mapper(layer_name):
Expand Down

0 comments on commit 7384d3b

Please sign in to comment.