Skip to content

Commit

Permalink
add job submit commands, database, and new client
Browse files Browse the repository at this point in the history
This change adds the JobSubmit, to proto, and to each
of the client and server. At this point we can request
a job to be submit to a specific cluster, and the
token that was generated on register of the cluster
is required to "authenticate." We then validate those
things and add the job to the database! Next we need a
small client to run from within a flux instance and
check for jobs assigned to it, and when it receives
one, it will be removed from the database. I think I
want to make flux-core "bindings" for Go first.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Feb 12, 2024
1 parent da49f6a commit b454bf3
Show file tree
Hide file tree
Showing 23 changed files with 788 additions and 242 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ stream: ## Runs the interface client

.PHONY: register
register: ## Run mock registration
go run cmd/register/register.go
go run cmd/rainbow/rainbow.go register

.PHONY: tag
tag: ## Creates release tag
Expand All @@ -64,6 +64,7 @@ tagless: ## Delete the current release tag

.PHONY: clean
clean: ## Cleans bin and temp directories
rm -rf ./rainbow.db
go clean
rm -fr ./vendor
rm -fr ./bin
Expand Down
57 changes: 49 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ go run cmd/server/server.go
2024/02/11 18:06:57 server listening: [::]:50051
2024/02/11 18:06:59 📝️ received register: keebler
SELECT count(*) from clusters WHERE name LIKE "keebler": (0)
INSERT into clusters VALUES ("keebler", "2804a013-89df-433d-a904-4666a6415ad0"): (1)
INSERT into clusters VALUES ("keebler", "712747b7-b2a9-4bea-b630-056cd64856e6"): (1)
```

And then mock a registration:
Expand All @@ -68,22 +68,63 @@ go run cmd/register/register.go
2024/02/11 18:06:59 🌈️ starting client (localhost:50051)...
2024/02/11 18:06:59 registering cluster: keebler
2024/02/11 18:06:59 status: REGISTER_SUCCESS
2024/02/11 18:06:59 token: 2804a013-89df-433d-a904-4666a6415ad0
2024/02/11 18:06:59 token: 712747b7-b2a9-4bea-b630-056cd64856e6
```

In the above, we are providing a cluster name (keebler) and it is being registered to the database, and a token and status returned.
We would then use this token for subsequent requests to interact with the cluster.
In the above, we are providing a cluster name (keebler) and it is being registered to the database, and a token and status returned. Note that if we want to submit a job to the "keebler" cluster, from anywhere, we need this token! Let's try that next.

```bash
# Look at help
go run ./cmd/rainbow/rainbow.go submit --help
```
```
usage: rainbow submit [-h|--help] [-s|--secret "<value>"] [-n|--nodes
<integer>] [-t|--tasks <integer>] [-c|--command "<value>"]
[--job-name "<value>"] [--host "<value>"] [--cluster-name
"<value>"]
Submit a job to a rainbow cluster
Arguments:
-h --help Print help information
-s --secret Registration 'secret'. Default: chocolate-cookies
-n --nodes Number of nodes to request. Default: 1
-t --tasks Number of tasks to request (per node? total?)
-c --command Command to submit. Default: chocolate-cookies
--job-name Name for the job (defaults to first command).
--host Scheduler server address (host:port). Default:
localhost:50051
--cluster-name Name of cluster to register. Default: keebler
```
Let's try doing that.

```bash
go run ./cmd/rainbow/rainbow.go submit --secret "712747b7-b2a9-4bea-b630-056cd64856e6" --command hostname
```
```console
2024/02/11 21:43:17 🌈️ starting client (localhost:50051)...
2024/02/11 21:43:17 submit job: hostname
2024/02/11 21:43:17 status:SUBMIT_SUCCESS
```

Hooray! On the server log side we see...

```console
SELECT * from clusters WHERE name LIKE "keebler" LIMIT 1: keebler
2024/02/11 21:43:17 📝️ received job hostname for cluster keebler
```

Now we have a job in the database, and it's oriented for a specific cluster.
Now we need to write the logic for the cluster to poll asking for jobs assigned to it to receive it! I'll work on that next.

## Container Images

**Coming soon**

## TODO

- Write the job submission endpoint, which should take a cluster name and command, and return status (success, denied, etc.)
- Make a nicer (single UI entrypoint) for client with different functions

At this point we will have a dumb little database with jobs assigned to clusters. We can then modify the client to add a polling command (intended to be run on a flux instance) that will use the cluster-specific token to say "Do you have any jobs for me?" at some interval. This can run anywhere there is a Flux instance. It can receive the job, and run it. When it receives the job, the job will be deleted from the database, because we don't care anymore.
At this point we have a dumb little database with jobs assigned to clusters. We can then modify the client to add a polling command (intended to be run on a flux instance) that will use the cluster-specific token to say "Do you have any jobs for me?" at some interval. This can run anywhere there is a Flux instance. It can receive the job, and run it. When it receives the job, the job will be deleted from the database, because we don't care anymore.

And that should be a very basic prototype - we can then build this into containers and deploy in different places (and deploy a client separate from a Flux instance) and demonstrate submitting jobs across different places. For the Flux instance logic, we could write the grpc endpoints in Python, but it would be more fun to (finally) make Go bindings for flux core.

Expand Down
30 changes: 18 additions & 12 deletions api/v1/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ service Service {
rpc Register(RegisterRequest) returns (RegisterResponse);

// Job Submission - request for submitting a job to a named cluster
rpc SubmitJob(Request) returns (SubmitJobResponse);
rpc SubmitJob(SubmitJobRequest) returns (SubmitJobResponse);

// TESTING ENDPOINTS
// Serial checks the connectivity and response time of the service.
Expand All @@ -41,19 +41,31 @@ message Content {
// Request represents the request for a method invocation.
// It includes the content to be sent and a timestamp.
message Request {
// The content to be sent in the request.
Content content = 1;

// Timestamp when the message was sent, represented in Unix epoch format.
google.protobuf.Timestamp sent = 2;
}

// RegisterRequest registers a cluster to the scheduler service
// The shared secret is required to validate the request
message RegisterRequest {
string name = 1;
string secret = 2;
google.protobuf.Timestamp sent = 3;
}

// SubmitJobRequest takes a job name, cluster name
// and requires the cluster token. Since we want to be generic,
// we currently accept nodes, tasks, and the command
message SubmitJobRequest {
string name = 1;
string cluster = 2;
string token = 3;
int32 nodes = 4;
int32 tasks = 5;
string command = 6;
google.protobuf.Timestamp sent = 7;
}


// Testing response - the server's response to a request.
message Response {
Expand All @@ -65,16 +77,9 @@ message Response {
RESULT_TYPE_ERROR = 2;
}

// Unique identifier correlating to the request.
string request_id = 1;

// Total number of messages received in the request.
int64 message_count = 2;

// Number of messages successfully processed.
int64 messages_processed = 3;

// Detailed information or description of the processing result.
string processing_details = 4;
}

Expand Down Expand Up @@ -105,5 +110,6 @@ message SubmitJobResponse {
SUBMIT_DENIED = 3;
}
string request_id = 1;
string job_id = 2;
int32 jobid = 2;
ResultType status = 3;
}
73 changes: 73 additions & 0 deletions cmd/rainbow/rainbow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package main

import (
"fmt"
"log"
"os"

"github.com/akamensky/argparse"
register "github.com/converged-computing/rainbow/cmd/rainbow/register"
submit "github.com/converged-computing/rainbow/cmd/rainbow/submit"
"github.com/converged-computing/rainbow/pkg/types"
)

var (
Header = `
• ┓
┏┓┏┓┓┏┓┣┓┏┓┓┏┏
┛ ┗┻┗┛┗┗┛┗┛┗┻┛
`

defaultSecret = "chocolate-cookies"
)

func RunVersion() {
fmt.Printf("🌈️ rainbow version %s\n", types.Version)
}

func main() {

parser := argparse.NewParser("rainbow", "Interact with a rainbow multi-cluster")
versionCmd := parser.NewCommand("version", "See the version of compspec")
registerCmd := parser.NewCommand("register", "Register a new cluster")
submitCmd := parser.NewCommand("submit", "Submit a job to a rainbow cluster")

// Shared values
host := parser.String("", "host", &argparse.Options{Default: "localhost:50051", Help: "Scheduler server address (host:port)"})
clusterName := parser.String("", "cluster-name", &argparse.Options{Default: "keebler", Help: "Name of cluster to register"})

// Register
secret := registerCmd.String("s", "secret", &argparse.Options{Default: defaultSecret, Help: "Registration 'secret'"})

// Submit (note that command for now needs to be in quotes to get the whole thing)
submitSecret := submitCmd.String("s", "secret", &argparse.Options{Default: defaultSecret, Help: "Registration 'secret'"})
nodes := submitCmd.Int("n", "nodes", &argparse.Options{Default: 1, Help: "Number of nodes to request"})
tasks := submitCmd.Int("t", "tasks", &argparse.Options{Help: "Number of tasks to request (per node? total?)"})
command := submitCmd.String("c", "command", &argparse.Options{Default: defaultSecret, Help: "Command to submit"})
jobName := submitCmd.String("", "job-name", &argparse.Options{Help: "Name for the job (defaults to first command)"})

// Now parse the arguments
err := parser.Parse(os.Args)
if err != nil {
fmt.Println(Header)
fmt.Println(parser.Usage(err))
return
}

if registerCmd.Happened() {
err := register.Run(*host, *clusterName, *secret)
if err != nil {
log.Fatalf("Issue with register: %s\n", err)
}
} else if submitCmd.Happened() {
err := submit.Run(*host, *jobName, *command, *nodes, *tasks, *submitSecret, *clusterName)
if err != nil {
log.Fatal(err.Error())
}
} else if versionCmd.Happened() {
RunVersion()
} else {
fmt.Println(Header)
fmt.Println(parser.Usage(nil))
}
}
29 changes: 29 additions & 0 deletions cmd/rainbow/register/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package extract

import (
"context"
"log"

"github.com/converged-computing/rainbow/pkg/client"
)

// Run will run an extraction of host metadata
func Run(host, clusterName, secret string) error {
c, err := client.NewClient(host)
if err != nil {
return err
}

log.Printf("registering cluster: %s", clusterName)

// Last argument is secret, empty for now
response, err := c.Register(context.Background(), clusterName, secret)
if err != nil {
return err
}

// If we get here, success! Dump all the stuff.
log.Printf("status: %s", response.Status)
log.Printf(" token: %s", response.Token)
return nil
}
55 changes: 55 additions & 0 deletions cmd/rainbow/submit/submit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package match

import (
"context"
"fmt"
"log"
"strings"

"github.com/converged-computing/rainbow/pkg/client"
"github.com/converged-computing/rainbow/pkg/types"
)

// Run will check a manifest list of artifacts against a host machine
// For now, the host machine parameters will be provided as flags
func Run(
host, jobName, command string,
nodes, tasks int,
submitSecret, clusterName string,
) error {

c, err := client.NewClient(host)
if err != nil {
return nil
}

// Further validation of job happens with client below
if command == "" {
return fmt.Errorf("a command is required")
}
log.Printf("submit job: %s", command)

// Prepare a JobSpec
if jobName == "" {
parts := strings.Split(command, " ")
jobName = parts[0]
}
jobspec := types.JobSpec{
Name: jobName,
Nodes: int32(nodes),
Tasks: int32(tasks),
Command: command,
}

// Last argument is secret, empty for now
response, err := c.SubmitJob(context.Background(), jobspec, clusterName, submitSecret)
if err != nil {
return err
}

// If we get here, success! Dump all the stuff.
//log.Printf("status: %s", response.Status)
//log.Printf(" token: %s", response.Token)
log.Println(response)
return nil
}
42 changes: 0 additions & 42 deletions cmd/register/register.go

This file was deleted.

Loading

0 comments on commit b454bf3

Please sign in to comment.