Skip to content

Commit

Permalink
feat: python support for poll (request jobs) and accept
Browse files Browse the repository at this point in the history
After this is added, we have the basic logic to be able to submit,
and then query for and accept some number of jobs. We can next
build containers and prototype this with actual flux instances...
somewhere.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Feb 14, 2024
1 parent e028ba6 commit 392f54c
Show file tree
Hide file tree
Showing 10 changed files with 352 additions and 63 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ proto: protoc ## Generates the API code and documentation

.PHONY: python
python: python ## Generate python proto files in python
pip freeze | grep grpcio-tools
# pip freeze | grep grpcio-tools
mkdir -p python/v1/rainbow/protos
cd python/v1/rainbow/protos
python -m grpc_tools.protoc -I./api/v1 --python_out=./python/v1/rainbow/protos --pyi_out=./python/v1/rainbow/protos --grpc_python_out=./python/v1/rainbow/protos ./api/v1/rainbow.proto
Expand All @@ -35,12 +35,12 @@ version: ## Prints the current version
@echo $(VERSION)

.PHONY: tidy
tidy: ## Updates the go modules and vendors all dependancies
tidy: ## Updates the go modules and vendors all dependencies
go mod tidy
go mod vendor

.PHONY: upgrade
upgrade: ## Upgrades all dependancies
upgrade: ## Upgrades all dependencies
go get -d -u ./...
go mod tidy
go mod vendor
Expand Down
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,18 +203,32 @@ What this does is randomly select from the set you receive, and send back a resp
The logic you would expect is there - that you can't accept greater than the number available.
You could try asking for a high level of max jobs again, and see that there is one fewer than before. It was deleted from the database.

## Python

To build Python GRPC, ensure you have the grpc-tools installed:

```bash
pip install grpcio-tools
```

Then:

```bash
make python
```

and cd into [python/v1](python/v1) and follow the README instructions there.


## Container Images

**Coming soon**

## TODO

- endpoint to summarize jobs? Or update the request jobs to return a summary?
- request jobs should accept way to filter or specify criteria for request
- receiving endpoint to accept (meaning just deleting from the database)
- (advanced) request jobs should accept way to filter or specify criteria for request

Next steps: Python bindings so we can run the client in a flux instance and:
Next steps: Containers and example (kind?) to run in a flux instance and:

- Run in poll, at some increment
- "Do you have jobs for me?"
Expand Down
96 changes: 90 additions & 6 deletions python/v1/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ python ./examples/flux/register.py keebler
```
```console
$ python ./examples/flux/register.py keebler
token: "956580b8-7339-40aa-84c2-489539bbdc16"
token: "e2d48b92-7801-4ad3-8b4d-87e30c1441e0"
secret: "39ef6c99-ac20-40de-92df-d16e9fe651e1"
status: REGISTER_SUCCESS

The token you will need to submit jobs to this cluster is 956580b8-7339-40aa-84c2-489539bbdc16
🤫️ The token you will need to submit jobs to this cluster is e2d48b92-7801-4ad3-8b4d-87e30c1441e0
🔐️ The secret you will need to accept jobs is 39ef6c99-ac20-40de-92df-d16e9fe651e1
```

Try running it again - you can't register a cluster twice.
Expand All @@ -56,19 +58,101 @@ status: REGISTER_EXISTS
The cluster keebler alreadey exists.
```

But of course other cluster names you can register. A "cluster" can actually be a cluster, or a flux instance, or any entity that can accept jobs.
But of course other cluster names you can register. A "cluster" can actually be a cluster, or a flux instance, or any entity that can accept jobs. The script also accepts arguments (see `register.py --help`)

```console
python ./examples/flux/register.py --help

🌈️ Rainbow scheduler register

options:
-h, --help show this help message and exit
--cluster CLUSTER cluster name to register
--host HOST host of rainbow cluster
--secret SECRET Rainbow cluster registration secret
```

### Submit Job

Now let's submit a job to our faux cluster. We need to provide the token we received above.
Now let's submit a job to our faux cluster. We need to provide the token we received above. Note you can provide other arguments too:

```console
$ python examples/flux/submit-job.py 956580b8-7339-40aa-84c2-489539bbdc16
python ./examples/flux/submit-job.py --help

🌈️ Rainbow scheduler register

positional arguments:
command Command to submit

options:
-h, --help show this help message and exit
--cluster CLUSTER cluster name to register
--host HOST host of rainbow cluster
--token TOKEN Cluster token for permission to submit jobs
--nodes NODES Nodes for job (defaults to 1)
```

And then submit!

```console
$ python examples/flux/submit-job.py --token $token --cluster keebler echo hello world
⭐️ Submitting job: echo hello world
status: SUBMIT_SUCCESS
```

Nice! We will next be writing a receiving endpoint that can poll the server at some increment to ask for jobs, and then accept some number. TBA!
### Poll and Accept

Given the above (we have submit jobs to the keebler cluster, not necessarily from it) we would then (from the keebler cluster)
want to receive them, and accept some number to run. Let's mock that next. This is going to include two steps:

- request jobs: a request from the keebler cluster to list jobs (requires the secret)
- accept jobs: given the list of jobs requested, tell the rainbow scheduler you accept some subset to run.


```console
python ./examples/flux/poll-jobs.py --help

🌈️ Rainbow scheduler poll (request jobs) and accept

options:
-h, --help show this help message and exit
--cluster CLUSTER cluster name to register
--host HOST host of rainbow cluster
--max-jobs MAX_JOBS Maximum jobs to request (unset defaults to all)
--secret SECRET Cluster secret to access job queue
--nodes NODES Nodes for job (defaults to 1)
--accept ACCEPT Number of jobs to accept
```

And then request (poll) for jobs. This does not accept any, it's akin to just asking for a listing, and up to a maximum
number. We will eventually want to add logic to better filter or query for what we _can_ accept.

```console
$ python examples/flux/poll-jobs.py --secret $secret --cluster keebler echo hello world
Status: REQUEST_JOBS_SUCCESS
Received 3 jobs for inspection!
{"id":6,"cluster":"blueberry","name":"","nodes":1,"tasks":0,"command":"echo hello world"}
{"id":4,"cluster":"blueberry","name":"","nodes":1,"tasks":0,"command":"echo hello world"}
{"id":5,"cluster":"blueberry","name":"","nodes":1,"tasks":0,"command":"echo hello world"}
```

Now let's try accepting!

```console
$ python examples/flux/poll-jobs.py --secret $secret --cluster keebler echo hello world
Status: REQUEST_JOBS_SUCCESS
Received 3 jobs for inspection!
{"id":4,"cluster":"blueberry","name":"","nodes":1,"tasks":0,"command":"echo hello world"}
{"id":6,"cluster":"blueberry","name":"","nodes":1,"tasks":0,"command":"echo hello world"}
{"id":5,"cluster":"blueberry","name":"","nodes":1,"tasks":0,"command":"echo hello world"}
Accepting 1 jobs...
[4]
```

If you were to ask again (for all jobs) you'd only see two because you took one off of the dispatcher,
and it's now owned by your keebler cluster.
And that's it! Next we can build the above into a container with an actual flux instance and actually run
the jobs we accept.

## License

Expand Down
85 changes: 85 additions & 0 deletions python/v1/examples/flux/poll-jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from __future__ import print_function

import logging

import argparse
import grpc
import random
from rainbow.protos import rainbow_pb2
from rainbow.protos import rainbow_pb2_grpc


def get_parser():
parser = argparse.ArgumentParser(
description="🌈️ Rainbow scheduler poll (request jobs) and accept"
)
parser.add_argument("--cluster", help="cluster name to register", default="keebler")
parser.add_argument(
"--host", help="host of rainbow cluster", default="localhost:50051"
)
parser.add_argument(
"--max-jobs", help="Maximum jobs to request (unset defaults to all)", type=int
)
parser.add_argument(
"--secret", help="Cluster secret to access job queue", required=True
)
parser.add_argument(
"--nodes", help="Nodes for job (defaults to 1)", default=1, type=int
)
parser.add_argument("--accept", help="Number of jobs to accept", type=int)
return parser


def main():

parser = get_parser()
args, _ = parser.parse_known_args()

# These are submit variables. A more substantial submit script would have argparse, etc.
pollRequest = rainbow_pb2.RequestJobsRequest(
secret=args.secret, maxJobs=args.max_jobs, cluster=args.cluster
)

# NOTE(gRPC Python Team): .close() is possible on a channel and should be
# used in circumstances in which the with statement does not fit the needs
# of the code.
with grpc.insecure_channel(args.host) as channel:
stub = rainbow_pb2_grpc.RainbowSchedulerStub(channel)
response = stub.RequestJobs(pollRequest)
if response.status != 1:
print("Issue with requesting jobs:")
print(response)
return

# Unwrap ourselves to prettier print
print("Status: REQUEST_JOBS_SUCCESS")
print(f"Received {len(response.jobs)} jobs for inspection!")
for _, job in response.jobs.items():
# Note this can be json loaded, it's a json string
print(job)

# Cut out early if not accepting
if not args.accept or args.accept < 0:
return

# We would normally save metadata to submit to flux, but just faux accept
# for now (meaning we just need the job ids)
jobs = list(response.jobs)
random.shuffle(jobs)

# We can only accept up to the max that we have
if args.accept > len(jobs):
args.accept = len(jobs)

accepted = jobs[: args.accept]
print(f"Accepting {args.accept} jobs...")
print(accepted)
acceptRequest = rainbow_pb2.AcceptJobsRequest(
secret=args.secret, jobids=accepted, cluster=args.cluster
)
response = stub.AcceptJobs(acceptRequest)


if __name__ == "__main__":
logging.basicConfig()
main()
45 changes: 31 additions & 14 deletions python/v1/examples/flux/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,52 @@

import logging

import sys
import argparse
import grpc
from rainbow.protos import api_pb2
from rainbow.protos import api_pb2_grpc
from rainbow.protos import rainbow_pb2
from rainbow.protos import rainbow_pb2_grpc


def main(cluster):
def get_parser():
parser = argparse.ArgumentParser(description="🌈️ Rainbow scheduler register")
parser.add_argument("--cluster", help="cluster name to register", default="keebler")
parser.add_argument(
"--host", help="host of rainbow cluster", default="localhost:50051"
)
parser.add_argument(
"--secret",
help="Rainbow cluster registration secret",
default="chocolate-cookies",
)
return parser


def main():

parser = get_parser()
args, _ = parser.parse_known_args()

# These are the variables for our cluster - name for now
registerRequest = api_pb2.RegisterRequest(name=cluster, secret="chocolate-cookies")
registerRequest = rainbow_pb2.RegisterRequest(name=args.cluster, secret=args.secret)

# NOTE(gRPC Python Team): .close() is possible on a channel and should be
# used in circumstances in which the with statement does not fit the needs
# of the code.
with grpc.insecure_channel("localhost:50051") as channel:
stub = api_pb2_grpc.RainbowSchedulerStub(channel)
with grpc.insecure_channel(args.host) as channel:
stub = rainbow_pb2_grpc.RainbowSchedulerStub(channel)
response = stub.Register(registerRequest)
print(response)
if response.status == api_pb2.RegisterResponse.ResultType.REGISTER_EXISTS:
print(f"The cluster {cluster} alreadey exists.")
if response.status == rainbow_pb2.RegisterResponse.ResultType.REGISTER_EXISTS:
print(f"The cluster {args.cluster} already exists.")
else:
print(
f"The token you will need to submit jobs to this cluster is {response.token}",
f"🤫️ The token you will need to submit jobs to this cluster is {response.token}",
)
print(
f"🔐️ The secret you will need to accept jobs is {response.secret}",
)


if __name__ == "__main__":
logging.basicConfig()
cluster = "keebler"
if len(sys.argv) > 1:
cluster = sys.argv[1]
main(cluster)
main()
Loading

0 comments on commit 392f54c

Please sign in to comment.