Skip to content

Commit

Permalink
[CodeStyle][UP031] fix `/python/paddle/distributed/{passes, transpile…
Browse files Browse the repository at this point in the history
…r, utils}*` - part 13 (PaddlePaddle#65573)
  • Loading branch information
gouzil authored Jun 29, 2024
1 parent 7384d3b commit 87fa2d9
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 47 deletions.
2 changes: 1 addition & 1 deletion python/paddle/distributed/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def _new_process_group_impl(
):
pg = None
genv = _get_global_env()
assert backend in _valid_backend_list, "Unsupported backend: %s." % backend
assert backend in _valid_backend_list, f"Unsupported backend: {backend}."
if backend == "gloo":
pg = core.ProcessGroupGloo.create(store, rank, world_size, group_id)
elif backend == "nccl":
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ def load_inference_model_distributed(
if dirname is not None:
load_dirname = os.path.normpath(dirname)
if not os.path.isdir(load_dirname):
raise ValueError("There is no directory named '%s'" % dirname)
raise ValueError(f"There is no directory named '{dirname}'")

if model_filename is None:
model_filename = '__model__'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ def replace_training_script(self):
print("----------- PopRun Command -----------")
print("poprun \\")
for i in range(len(poprun_command) - 1):
print("%s \\" % (poprun_command[i]))
print("%s" % (poprun_command[len(poprun_command) - 1]))
print(f"{poprun_command[i]} \\")
print(f"{poprun_command[len(poprun_command) - 1]}")
print("---------------------------------------")

# replace training_script_args
Expand Down
4 changes: 2 additions & 2 deletions python/paddle/distributed/passes/auto_parallel_amp.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def _cast_block(self, block):
for in_var_name in op.input_arg_names:
assert (
in_var.dtype == block.var(in_var_name).dtype
), f"{in_var}, {block.var(in_var_name)}, {str(op)}"
), f"{in_var}, {block.var(in_var_name)}, {op}"
out_var.desc.set_dtype(in_var.dtype)
elif int(op.attr('op_role')) == 257:
pass
Expand Down Expand Up @@ -547,7 +547,7 @@ def _keep_fp32_output(op, out_name):
else:
assert (
in_var.dtype == dst_dtype
), f"op [{op.type}] expect input [{in_name}] to be dtype [{dst_dtype}] BUT got [{in_var.dtype}]. {str(op)}"
), f"op [{op.type}] expect input [{in_name}] to be dtype [{dst_dtype}] BUT got [{in_var.dtype}]. {op}"

for out_name in op.output_names:
if src_dtype == paddle.float32 and _keep_fp32_output(op, out_name):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ def _analyze_program(self):
continue
assert op.has_attr(
"ring_id"
), f"Unexpected: comm op [{str(op)}] has NOT ring id."
), f"Unexpected: comm op [{op}] has NOT ring id."
group = ring_id_to_process_group(op.attr("ring_id"))

assert (
group is not None
), f"Unexpected: data parallel group of [{grad_name}] from op [{str(op)}] is None"
), f"Unexpected: data parallel group of [{grad_name}] from op [{op}] is None"

self._grad_name_to_group_map[grad_name] = group

Expand Down Expand Up @@ -241,10 +241,10 @@ def _update_opt_rescale_grad(self):
):
assert op.has_attr(
'rescale_grad'
), f"Unexpected: op [{str(op)}] is supported to have [rescale_grad] attribute."
), f"Unexpected: op [{op}] is supported to have [rescale_grad] attribute."
assert (
len(op.input("Grad")) == 1
), f"Unexpected: op [{str(op)}] is supported to have only one input grad var."
), f"Unexpected: op [{op}] is supported to have only one input grad var."

grad_name = op.input("Grad")[0]
dp_degree = len(
Expand Down Expand Up @@ -482,7 +482,7 @@ def _update_program(self, grad_groups):
scale_op = block.ops[group.scale_op_idx]
assert (
scale_op.type == 'scale'
), f"should found scale op but found {str(scale_op)}"
), f"should found scale op but found {scale_op}"
scale_op._rename_input(
scale_op.input_arg_names[0], group.coalesce_var.name
)
Expand All @@ -494,7 +494,7 @@ def _update_program(self, grad_groups):
assert allreduce_op.type in [
'c_allreduce_avg',
'c_allreduce_sum',
], f"should found c_allreduce_avg or c_allreduce_sum op but found {str(allreduce_op)}"
], f"should found c_allreduce_avg or c_allreduce_sum op but found {allreduce_op}"
allreduce_op_dist_attr = (
self.dist_context.get_op_dist_attr_for_program(allreduce_op)
)
Expand Down Expand Up @@ -527,7 +527,7 @@ def _update_program(self, grad_groups):
for idx in sorted(remove_op_indices, reverse=True):
assert (
block.ops[idx].type in remove_op_types
), f"Unexpected: try to remove op {str(block.ops[idx])}"
), f"Unexpected: try to remove op {block.ops[idx]}"
block._remove_op(idx, False)

# insert coalesce op
Expand Down Expand Up @@ -752,7 +752,7 @@ def add(self, grad_var, ring_id, i):
grad_op = self.ops[grad_op_idx]
assert (
grad_var.name in grad_op.output_arg_names
), f"grad [{grad_var.name}] should be output of {str(grad_op)}"
), f"grad [{grad_var.name}] should be output of {grad_op}"
self.coalesce_op_idx = grad_op_idx

def finalize(self):
Expand Down
12 changes: 6 additions & 6 deletions python/paddle/distributed/passes/auto_parallel_fp16.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def _mark_op(self, op):
elif is_backward_op(op) == int(OpRole.Backward):
if op.desc.original_id() in self.grad_op_to_op_map:
fwd_op_id = self.grad_op_to_op_map[op.desc.original_id()]
assert fwd_op_id in self._op_fp16_dict, f"{str(op)}"
assert fwd_op_id in self._op_fp16_dict, f"{op}"
self._op_fp16_dict[op.desc.original_id()] = self._op_fp16_dict[
fwd_op_id
]
Expand Down Expand Up @@ -433,7 +433,7 @@ def cast_block(self, block):
for in_var_name in op.input_arg_names:
assert (
in_var.dtype == block.var(in_var_name).dtype
), f"{in_var}, {block.var(in_var_name)}, {str(op)}"
), f"{in_var}, {block.var(in_var_name)}, {op}"
out_var.desc.set_dtype(in_var.dtype)

idx += num_cast_ops + 1
Expand Down Expand Up @@ -548,7 +548,7 @@ def _insert_backward_cast_ops(
out_var = block.var(out_var_name)
if _keep_fp32_output(op, out_var.name):
continue
assert out_var.dtype == dst_dtype, f"{str(out_var)}, {dst_dtype}"
assert out_var.dtype == dst_dtype, f"{out_var}, {dst_dtype}"

for (
cast_name,
Expand All @@ -562,7 +562,7 @@ def _insert_backward_cast_ops(
if slot_name in op.input_names:
assert src_name in op.input(
slot_name
), f"var: {src_name} not in op's {slot_name}. {str(op)}"
), f"var: {src_name} not in op's {slot_name}. {op}"
src_var_dist_attr = grad_op_attr.get_input_dist_attr(src_name)
assert src_var_dist_attr is not None
op._rename_input(src_name, cast_name)
Expand All @@ -576,7 +576,7 @@ def _insert_backward_cast_ops(
continue
assert (
len(op.output(grad_slot_name)) == 1
), f"[{grad_slot_name}], Current Op: {str(op)}"
), f"[{grad_slot_name}], Current Op: {op}"
grad_name = op.output(grad_slot_name)[0]
grad = block.var(grad_name)
grad_dist_attr = grad_op_attr.get_output_dist_attr(grad_name)
Expand Down Expand Up @@ -805,7 +805,7 @@ def is_initialization_op(op):
if param_to_dtype.get(output_name, None) == __target_dtype__:
assert op.has_attr(
'dtype'
), f"initialization op is supported to has dtype attribute but got {str(op)}."
), f"initialization op is supported to has dtype attribute but got {op}."
out_var = startup_program.global_block().var(output_name)
if out_var.dtype == paddle.float32:
out_var.desc.set_dtype(__target_dtype__)
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/passes/auto_parallel_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ def op_depend_on_group(op, group):
first_grad_name = group.vars[0].name
assert (
first_grad_name in op.output_arg_names
), f"Unexpected: op is supposed to generate grad [{first_grad_name}] but got [{str(op)}]"
), f"Unexpected: op is supposed to generate grad [{first_grad_name}] but got [{op}]"
grad_names = [grad.name for grad in group.vars]

concated_shapes = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def _op_cost(self, op):
time *= 8
return time
except Exception as e:
logger.info(f"The cost of {op} is unknown since {repr(e)}.")
logger.info(f"The cost of {op} is unknown since {e!r}.")
return 0.0

def _partial_programs(self, program):
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/ps/utils/public.py
Original file line number Diff line number Diff line change
Expand Up @@ -1801,7 +1801,7 @@ def check_program(program):
for var_name in input_var_names + output_var_names:
if not block._find_var_recursive(str(var_name)):
raise ValueError(
f'var: {str(var_name)} needed by op is not found in block: {block_idx}'
f'var: {var_name} needed by op is not found in block: {block_idx}'
)
block_idx += 1
print('program checked valid')
Expand Down
8 changes: 3 additions & 5 deletions python/paddle/distributed/spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,13 @@ def _options_valid_check(options):
if key not in supported_options:
if key in deprecated_options:
warnings.warn(
"The config option (%s) of `paddle.distributed.spawn` is deprecated. "
"Please use the latest config options stated in the `spawn` API documentation."
% key,
f"The config option ({key}) of `paddle.distributed.spawn` is deprecated. "
"Please use the latest config options stated in the `spawn` API documentation.",
DeprecationWarning,
)
else:
raise ValueError(
"The config option (%s) of `paddle.distributed.spawn` is not supported."
% key
f"The config option ({key}) of `paddle.distributed.spawn` is not supported."
)


Expand Down
4 changes: 2 additions & 2 deletions python/paddle/distributed/transpiler/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def _transpile_main_program(self):
return
# fuse allreduce
if self.fuse_allreduce > 0:
print("begin used fuse_allreduce param count = %s" % (param_cnt))
print(f"begin used fuse_allreduce param count = {param_cnt}")
# use fuse allreduce
self._insert_fuse_allreduce_ops()
else:
Expand Down Expand Up @@ -592,7 +592,7 @@ def _insert_scale_loss_grad_ops(self, param_cnt):
scale = 1.0 / self.nranks / self.gpu_nums
else:
scale = 1.0 / self.gpu_nums
print("begin _insert_scale_loss_grad_ops scale = %s" % (scale))
print(f"begin _insert_scale_loss_grad_ops scale = {scale}")
block = self.main_program.global_block()
for idx, op in reversed(list(enumerate(block.ops))):
if not self._is_loss_grad_op(op):
Expand Down
6 changes: 3 additions & 3 deletions python/paddle/distributed/transpiler/distribute_transpiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ def _transpile_collective(
elif collective_mode == "single_process_multi_thread":
transpiler = collective.SingleProcessMultiThread()
else:
raise ValueError('invalid collective_mode: %s' % collective_mode)
raise ValueError(f'invalid collective_mode: {collective_mode}')

transpiler.transpile(
startup_program=startup_program,
Expand Down Expand Up @@ -2414,7 +2414,7 @@ def _get_optimizer_input_shape(
pass
else:
raise ValueError(
"Not supported optimizer for distributed training: %s" % op_type
f"Not supported optimizer for distributed training: {op_type}"
)
return orig_shape

Expand Down Expand Up @@ -2497,7 +2497,7 @@ def _append_pserver_grad_merge_ops(
def _append_dc_asgd_ops(self, block, param_var, grad_var):
# NOTE: can not use grammar candy here, should put ops in specific block
local_param_bak = block.create_var(
name="%s.local_bak" % param_var.name,
name=f"{param_var.name}.local_bak",
shape=param_var.shape,
type=param_var.type,
dtype=param_var.dtype,
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/transpiler/geo_sgd_transpiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def get_pserver_program(self, endpoint):
pserver_block = per_opt_block.program.global_block()
param = pserver_block.vars[var_name]

delta_var_name = "%s.delta" % (param.name)
delta_var_name = f"{param.name}.delta"
if var.name in self.sparse_var_splited_list:
delta_type = core.VarDesc.VarType.SELECTED_ROWS
sparse_grad_to_param.append(
Expand Down
28 changes: 15 additions & 13 deletions python/paddle/distributed/utils/launch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus):
for i in range(len(selected_gpus)):
trainer = Trainer()
trainer.gpus.append(selected_gpus[i])
trainer.endpoint = "%s" % (cur_node_endpoints[i])
trainer.endpoint = f"{cur_node_endpoints[i]}"
trainer.rank = trainer_rank
trainer_rank += 1

Expand Down Expand Up @@ -392,27 +392,29 @@ def _prepare_trainer_env(cluster, trainer, backend=None):
backend = get_backend_by_compile_flag() # for compatibility
if backend == 'bkcl':
proc_env = {
"FLAGS_selected_xpus": "%s"
% ",".join([str(g) for g in trainer.gpus]),
"FLAGS_selected_xpus": "{}".format(
",".join([str(g) for g in trainer.gpus])
),
"PADDLE_TRAINER_ID": "%d" % trainer.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % trainer.endpoint,
"PADDLE_CURRENT_ENDPOINT": f"{trainer.endpoint}",
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
}
elif backend == 'nccl':
proc_env = {
"FLAGS_selected_gpus": "%s"
% ",".join([str(g) for g in trainer.gpus]),
"FLAGS_selected_gpus": "{}".format(
",".join([str(g) for g in trainer.gpus])
),
"PADDLE_TRAINER_ID": "%d" % trainer.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % trainer.endpoint,
"PADDLE_CURRENT_ENDPOINT": f"{trainer.endpoint}",
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
}
elif backend == 'gloo':
# NOTE (xiongkun) default fall back into cpu only
proc_env = {
"PADDLE_TRAINER_ID": "%d" % trainer.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % trainer.endpoint,
"PADDLE_CURRENT_ENDPOINT": f"{trainer.endpoint}",
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
"PADDLE_DISTRI_BACKEND": backend, # only add here, other will be auto
Expand All @@ -422,10 +424,11 @@ def _prepare_trainer_env(cluster, trainer, backend=None):

custom_device_name = core.get_all_custom_device_type()[0]
proc_env = {
f"FLAGS_selected_{custom_device_name}s": "%s"
% ",".join([str(g) for g in trainer.gpus]),
f"FLAGS_selected_{custom_device_name}s": "{}".format(
",".join([str(g) for g in trainer.gpus])
),
"PADDLE_TRAINER_ID": "%d" % trainer.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % trainer.endpoint,
"PADDLE_CURRENT_ENDPOINT": f"{trainer.endpoint}",
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
}
Expand Down Expand Up @@ -498,8 +501,7 @@ def pull_worker_log(tp):
except UnicodeEncodeError:
sys.stdout.write(
'UnicodeEncodeError occurs at this line. '
'Please refer to the original log file "%s"\n'
% tp.log_fn.name
f'Please refer to the original log file "{tp.log_fn.name}"\n'
)
tp.log_offset = fin.tell()

Expand Down

0 comments on commit 87fa2d9

Please sign in to comment.