Skip to content

Commit

Permalink
[CI] Format Python code with Black (ray-project#21975)
Browse files Browse the repository at this point in the history
See ray-project#21316 and ray-project#21311 for the motivation behind these changes.
  • Loading branch information
bveeramani authored and Jules S.Damji committed Jan 30, 2022
1 parent 2d0428a commit b40b619
Show file tree
Hide file tree
Showing 1,637 changed files with 75,458 additions and 59,446 deletions.
19 changes: 13 additions & 6 deletions .buildkite/copy_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ def perform_auth():
resp = requests.get(
"https://vop4ss7n22.execute-api.us-west-2.amazonaws.com/endpoint/",
auth=auth,
params={"job_id": os.environ["BUILDKITE_JOB_ID"]})
params={"job_id": os.environ["BUILDKITE_JOB_ID"]},
)
return resp


def handle_docker_login(resp):
pwd = resp.json()["docker_password"]
subprocess.call(
["docker", "login", "--username", "raytravisbot", "--password", pwd])
["docker", "login", "--username", "raytravisbot", "--password", pwd]
)


def gather_paths(dir_path) -> List[str]:
Expand Down Expand Up @@ -86,7 +88,7 @@ def upload_paths(paths, resp, destination):
"branch_wheels": f"{branch}/{sha}/{fn}",
"jars": f"jars/latest/{current_os}/{fn}",
"branch_jars": f"jars/{branch}/{sha}/{current_os}/{fn}",
"logs": f"bazel_events/{branch}/{sha}/{bk_job_id}/{fn}"
"logs": f"bazel_events/{branch}/{sha}/{bk_job_id}/{fn}",
}[destination]
of["file"] = open(path, "rb")
r = requests.post(c["url"], files=of)
Expand All @@ -95,14 +97,19 @@ def upload_paths(paths, resp, destination):

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Helper script to upload files to S3 bucket")
description="Helper script to upload files to S3 bucket"
)
parser.add_argument("--path", type=str, required=False)
parser.add_argument("--destination", type=str)
args = parser.parse_args()

assert args.destination in {
"branch_jars", "branch_wheels", "jars", "logs", "wheels",
"docker_login"
"branch_jars",
"branch_wheels",
"jars",
"logs",
"wheels",
"docker_login",
}
assert "BUILDKITE_JOB_ID" in os.environ
assert "BUILDKITE_COMMIT" in os.environ
Expand Down
8 changes: 5 additions & 3 deletions benchmarks/distributed/test_many_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ def no_resource_leaks():
test_utils.wait_for_condition(no_resource_leaks)

rate = MAX_ACTORS_IN_CLUSTER / (end_time - start_time)
print(f"Success! Started {MAX_ACTORS_IN_CLUSTER} actors in "
f"{end_time - start_time}s. ({rate} actors/s)")
print(
f"Success! Started {MAX_ACTORS_IN_CLUSTER} actors in "
f"{end_time - start_time}s. ({rate} actors/s)"
)

if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
Expand All @@ -62,6 +64,6 @@ def no_resource_leaks():
"time": end_time - start_time,
"success": "1",
"_peak_memory": round(used_gb, 2),
"_peak_process_memory": usage
"_peak_process_memory": usage,
}
json.dump(results, out_file)
8 changes: 5 additions & 3 deletions benchmarks/distributed/test_many_pgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ def no_resource_leaks():
test_utils.wait_for_condition(no_resource_leaks)

rate = MAX_PLACEMENT_GROUPS / (end_time - start_time)
print(f"Success! Started {MAX_PLACEMENT_GROUPS} pgs in "
f"{end_time - start_time}s. ({rate} pgs/s)")
print(
f"Success! Started {MAX_PLACEMENT_GROUPS} pgs in "
f"{end_time - start_time}s. ({rate} pgs/s)"
)

if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
Expand All @@ -88,6 +90,6 @@ def no_resource_leaks():
"time": end_time - start_time,
"success": "1",
"_peak_memory": round(used_gb, 2),
"_peak_process_memory": usage
"_peak_process_memory": usage,
}
json.dump(results, out_file)
15 changes: 7 additions & 8 deletions benchmarks/distributed/test_many_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ def test_max_running_tasks(num_tasks):
def task():
time.sleep(sleep_time)

refs = [
task.remote() for _ in tqdm.trange(num_tasks, desc="Launching tasks")
]
refs = [task.remote() for _ in tqdm.trange(num_tasks, desc="Launching tasks")]

max_cpus = ray.cluster_resources()["CPU"]
min_cpus_available = max_cpus
Expand Down Expand Up @@ -48,8 +46,7 @@ def no_resource_leaks():


@click.command()
@click.option(
"--num-tasks", required=True, type=int, help="Number of tasks to launch.")
@click.option("--num-tasks", required=True, type=int, help="Number of tasks to launch.")
def test(num_tasks):
ray.init(address="auto")

Expand All @@ -66,8 +63,10 @@ def test(num_tasks):
test_utils.wait_for_condition(no_resource_leaks)

rate = num_tasks / (end_time - start_time - sleep_time)
print(f"Success! Started {num_tasks} tasks in {end_time - start_time}s. "
f"({rate} tasks/s)")
print(
f"Success! Started {num_tasks} tasks in {end_time - start_time}s. "
f"({rate} tasks/s)"
)

if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
Expand All @@ -77,7 +76,7 @@ def test(num_tasks):
"time": end_time - start_time,
"success": "1",
"_peak_memory": round(used_gb, 2),
"_peak_process_memory": usage
"_peak_process_memory": usage,
}
json.dump(results, out_file)

Expand Down
56 changes: 31 additions & 25 deletions benchmarks/distributed/test_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ def do_job(self):


def start_tasks(num_task, num_cpu_per_task, task_duration):
ray.get([
simple_task.options(num_cpus=num_cpu_per_task).remote(task_duration)
for _ in range(num_task)
])
ray.get(
[
simple_task.options(num_cpus=num_cpu_per_task).remote(task_duration)
for _ in range(num_task)
]
)


def measure(f):
Expand All @@ -40,47 +42,49 @@ def measure(f):

def start_actor(num_actors, num_actors_per_nodes, job):
resources = {"node": floor(1.0 / num_actors_per_nodes)}
submission_cost, actors = measure(lambda: [
SimpleActor.options(resources=resources, num_cpus=0).remote(job)
for _ in range(num_actors)])
ready_cost, _ = measure(
lambda: ray.get([actor.ready.remote() for actor in actors]))
submission_cost, actors = measure(
lambda: [
SimpleActor.options(resources=resources, num_cpus=0).remote(job)
for _ in range(num_actors)
]
)
ready_cost, _ = measure(lambda: ray.get([actor.ready.remote() for actor in actors]))
actor_job_cost, _ = measure(
lambda: ray.get([actor.do_job.remote() for actor in actors]))
lambda: ray.get([actor.do_job.remote() for actor in actors])
)
return (submission_cost, ready_cost, actor_job_cost)


if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="Test Scheduling")
# Task workloads
parser.add_argument(
"--total-num-task",
type=int,
help="Total number of tasks.",
required=False)
"--total-num-task", type=int, help="Total number of tasks.", required=False
)
parser.add_argument(
"--num-cpu-per-task",
type=int,
help="Resources needed for tasks.",
required=False)
required=False,
)
parser.add_argument(
"--task-duration-s",
type=int,
help="How long does each task execute.",
required=False,
default=1)
default=1,
)

# Actor workloads
parser.add_argument(
"--total-num-actors",
type=int,
help="Total number of actors.",
required=True)
"--total-num-actors", type=int, help="Total number of actors.", required=True
)
parser.add_argument(
"--num-actors-per-nodes",
type=int,
help="How many actors to allocate for each nodes.",
required=True)
required=True,
)

ray.init(address="auto")

Expand All @@ -92,13 +96,14 @@ def start_actor(num_actors, num_actors_per_nodes, job):
job = None
if args.total_num_task is not None:
if args.num_cpu_per_task is None:
args.num_cpu_per_task = floor(
1.0 * total_cpus / args.total_num_task)
args.num_cpu_per_task = floor(1.0 * total_cpus / args.total_num_task)
job = lambda: start_tasks( # noqa: E731
args.total_num_task, args.num_cpu_per_task, args.task_duration_s)
args.total_num_task, args.num_cpu_per_task, args.task_duration_s
)

submission_cost, ready_cost, actor_job_cost = start_actor(
args.total_num_actors, args.num_actors_per_nodes, job)
args.total_num_actors, args.num_actors_per_nodes, job
)

output = os.environ.get("TEST_OUTPUT_JSON")

Expand All @@ -118,6 +123,7 @@ def start_actor(num_actors, num_actors_per_nodes, job):

if output is not None:
from pathlib import Path

p = Path(output)
p.write_text(json.dumps(result))

Expand Down
3 changes: 1 addition & 2 deletions benchmarks/distributed/wait_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ def num_alive_nodes():


@click.command()
@click.option(
"--num-nodes", required=True, type=int, help="The target number of nodes")
@click.option("--num-nodes", required=True, type=int, help="The target number of nodes")
def wait_cluster(num_nodes: int):
ray.init(address="auto")
while num_alive_nodes() != num_nodes:
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/object_store/test_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from tqdm import tqdm

NUM_NODES = 50
OBJECT_SIZE = 2**30
OBJECT_SIZE = 2 ** 30


def num_alive_nodes():
Expand Down Expand Up @@ -60,6 +60,6 @@ def sum(self, arr):
"broadcast_time": end - start,
"object_size": OBJECT_SIZE,
"num_nodes": NUM_NODES,
"success": "1"
"success": "1",
}
json.dump(results, out_file)
7 changes: 3 additions & 4 deletions benchmarks/single_node/test_single_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
MAX_RETURNS = 3000
MAX_RAY_GET_ARGS = 10000
MAX_QUEUED_TASKS = 1_000_000
MAX_RAY_GET_SIZE = 100 * 2**30
MAX_RAY_GET_SIZE = 100 * 2 ** 30


def assert_no_leaks():
Expand Down Expand Up @@ -189,8 +189,7 @@ def test_large_object():
print(f"Many returns time: {returns_time} ({MAX_RETURNS} returns)")
print(f"Ray.get time: {get_time} ({MAX_RAY_GET_ARGS} args)")
print(f"Queued task time: {queued_time} ({MAX_QUEUED_TASKS} tasks)")
print(f"Ray.get large object time: {large_object_time} "
f"({MAX_RAY_GET_SIZE} bytes)")
print(f"Ray.get large object time: {large_object_time} " f"({MAX_RAY_GET_SIZE} bytes)")

if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
Expand All @@ -205,6 +204,6 @@ def test_large_object():
"num_queued": MAX_QUEUED_TASKS,
"large_object_time": large_object_time,
"large_object_size": MAX_RAY_GET_SIZE,
"success": "1"
"success": "1",
}
json.dump(results, out_file)
Loading

0 comments on commit b40b619

Please sign in to comment.