Skip to content

feat: support for reservations #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,20 @@ Next I'll finish up job (not a lot to do) and work on edge cases of cancel to Fl

## Development

### Reservations

> How do reservations work?

- We have a queue with one worker that sends jobs to the scheduler. This is important because Fluxion is single threaded, and we need the "clear reservation" job to run last.
- The scheduler does everything that it can, and then it starts reserving things (e.g., easy allows 1 reservation)
- Reservations block off resources in fluxion and give an estimated start time.
- Reservations are cleared after the loop. This means that later jobs (smaller) aren't given resources that should go to larger jobs (with higher priority)
- A single job to clear reservations is added to the end of a schedule loop. E.g.,
- We retrieve reservation ids
- We issue a cancel to fluxion
- On success, we delete the reservation ID from the table


### Debugging Postgres

It is often helpful to shell into the postgres container to see the database directly:
Expand All @@ -213,7 +227,8 @@ psql -U postgres
\dt

# test a query
SELECT group_name, group_size from pods_provisional;
SELECT * from pending_queue;
SELECT * from reservations;
```

### TODO
Expand All @@ -222,17 +237,6 @@ SELECT group_name, group_size from pods_provisional;
- we need to use shrink here. And a shrink down to size 0 I assume is a cancel.
- [ ] For cancel, we would issue a cancel for every pod associated with a job. How can we avoid that (or is that OK?)
- [ ] we will eventually need another mechanism to move schedule queue aside from new submission
- [ ] River is task based, so the entire concept of a loop is not relevant. We are asking flux asynchronously. The schedule loop, in that it kicks of different river jobs, isn't actually a loop - they run in sync, potentially.
- So the reservation strategy doesn't make sense because there is no "end" point.
- Currently I'm just going to set the reservationDepth to 0
- Reservations notes:
- We have a queue that sends jobs to the scheduler
- The scheduler does everything that it can, and then it starts reserving things
- Reservations block off resources and give an estimated start time.
- Reservations are cleared after the loop.
- If reservation set to true, it will keep looking for earliest time in future.
- Reservations are saving state of those jobs for the scheduler JUST during the loop
- The reservations are cleared out after the loop.
- [ ] scheduleAt can be used to AskFlux in the future
- [ ] Nodes that are currently assigned need to be taken into account
- Right now they aren't included in resources, but instead should be "given" to Fluxion.
Expand Down
10 changes: 5 additions & 5 deletions build/postgres/create-tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ CREATE TABLE pending_queue (

CREATE UNIQUE INDEX pending_index ON pending_queue (name, namespace);

-- We only need the fluxid for a reservation
-- CREATE TABLE reservations (
-- group_name TEXT NOT NULL,
-- flux_id INTEGER NOT NULL
-- );
-- We only need the fluxid for a reservation, but store the group name for debugging
CREATE TABLE reservations (
name TEXT NOT NULL,
flux_id INTEGER NOT NULL
);
45 changes: 43 additions & 2 deletions dist/fluxqueue-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ spec:
jobspec:
description: JobSpec is the Flux jobspec
type: string
name:
description: Original name of the job
type: string
nodes:
description: Nodes needed for the job
format: int32
Expand Down Expand Up @@ -232,6 +235,42 @@ rules:
- get
- list
- watch
- apiGroups:
- batch
resources:
- jobs
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- batch
resources:
- jobs/exec
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- batch
resources:
- jobs/log
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
Expand Down Expand Up @@ -444,7 +483,7 @@ spec:
- name: PGPASSWORD
value: postgres
image: ghcr.io/converged-computing/fluxqueue:test
imagePullPolicy: IfNotPresent
imagePullPolicy: Never
livenessProbe:
httpGet:
path: /healthz
Expand All @@ -464,6 +503,8 @@ spec:
- |
status=$(curl -ks http://localhost:8081/readyz); if [ "$status" != "ok" ]; then exit 1; fi
pg_isready -d postgres -h postgres -p 5432 -U postgres;
initialDelaySeconds: 20
periodSeconds: 20
resources:
limits:
cpu: 500m
Expand All @@ -484,7 +525,7 @@ spec:
- /code/bin/server
- --port=4242
image: ghcr.io/converged-computing/fluxion:latest
imagePullPolicy: Always
imagePullPolicy: IfNotPresent
name: fluxion
securityContext:
seccompProfile:
Expand Down
36 changes: 36 additions & 0 deletions dist/fluxqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,42 @@ rules:
- get
- list
- watch
- apiGroups:
- batch
resources:
- jobs
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- batch
resources:
- jobs/exec
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- batch
resources:
- jobs/log
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
Expand Down
3 changes: 2 additions & 1 deletion hack/quick-build-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ kind load docker-image ${REGISTRY}/fluxqueue-scheduler:latest
helm uninstall fluxqueue --namespace fluxqueue-system --wait || true

# So we don't try to interact with old webhook, etc.
sleep 5
sleep 10
helm install \
--set controllerManager.manager.image.repository=${REGISTRY}/fluxqueue \
--set controllerManager.manager.image.tag=latest \
--set scheduler.image=${REGISTRY}/fluxqueue-scheduler:latest \
--set postgres.image=${REGISTRY}/fluxqueue-postgres:latest \
--set controllerManager.manager.imagePullPolicy=Never \
--set controllerManager.fluxion.image.tag=grow-api \
--namespace fluxqueue-system \
--create-namespace \
--set scheduler.pullPolicy=Never \
Expand Down
3 changes: 3 additions & 0 deletions pkg/fluxqueue/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ const (
// The database column is an int16
MaxAttempts = math.MaxInt16

// We assume if a fluxion cancel is not successful, it was already deleted
MaxCancelAttempts = 3

// Default duration is 0 (unset) so we honor kubernetes objects
DefaultDuration = 0
)
29 changes: 25 additions & 4 deletions pkg/fluxqueue/fluxqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ import (
ctrl "sigs.k8s.io/controller-runtime"

api "github.com/converged-computing/fluxqueue/api/v1alpha1"
"github.com/converged-computing/fluxqueue/pkg/fluxqueue/defaults"
"github.com/converged-computing/fluxqueue/pkg/fluxqueue/queries"
strategies "github.com/converged-computing/fluxqueue/pkg/fluxqueue/strategy"
"github.com/converged-computing/fluxqueue/pkg/fluxqueue/strategy/workers"
"github.com/converged-computing/fluxqueue/pkg/fluxqueue/types"
)

const (
queueMaxWorkers = 10
// IMPORTANT: must be one because fluxion is run single threaded
queueMaxWorkers = 1
mutexLocked = 1
)

Expand Down Expand Up @@ -102,7 +104,6 @@ func NewQueue(ctx context.Context, cfg rest.Config) (*Queue, error) {
// Default queue handles job allocation
river.QueueDefault: {MaxWorkers: queueMaxWorkers},

// TODO do we have control of deletions?
// Cleanup queue is typically for cancel
"cleanup_queue": {MaxWorkers: queueMaxWorkers},
},
Expand Down Expand Up @@ -133,7 +134,7 @@ func NewQueue(ctx context.Context, cfg rest.Config) (*Queue, error) {
}, nil
}

// StopQueue creates a client (without calling start) only intended to
// Stop creates a client (without calling start) only intended to
// issue stop, so we can leave out workers and queue from Config
func (q *Queue) Stop(ctx context.Context) error {
if q.riverClient != nil {
Expand Down Expand Up @@ -212,14 +213,21 @@ func (q *Queue) Schedule() error {
defer q.lock.Unlock()

// This generates a batch of jobs to send to ask Fluxion for nodes
// It's a batch, but sent to only one worker.
batch, err := q.Strategy.Schedule(q.Context, q.Pool, q.ReservationDepth)
if err != nil {
return err
}

// Run each job task to schedule nodes for it (or ask again later)
if len(batch) > 0 {
_, err := q.riverClient.InsertMany(q.Context, batch)

// Add the reservation clean up job (run at the end)
reservationJob := q.clearReservationJob()
batchComplete := append(batch, reservationJob)

// Submit all jobs to batch, including clearing reservations
_, err = q.riverClient.InsertMany(q.Context, batchComplete)
if err != nil {
return err
}
Expand All @@ -236,3 +244,16 @@ func (q *Queue) Schedule() error {
// Post submit functions for a queue strategy
return q.Strategy.PostSubmit(q.Context, q.Pool, q.riverClient)
}

// clearReservationJob
// 1. retrieve flux ids from the table
// 2. Issue cancel to fluxion to remove from graph
// 3. clear table for next loop
func (q *Queue) clearReservationJob() river.InsertManyParams {
insertOpts := river.InsertOpts{
MaxAttempts: defaults.MaxAttempts,
Tags: []string{"reservation"},
Queue: river.QueueDefault,
}
return river.InsertManyParams{Args: workers.ReservationArgs{}, InsertOpts: &insertOpts}
}
7 changes: 4 additions & 3 deletions pkg/fluxqueue/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ const (
SelectPendingByCreation = "select jobspec, name, flux_job_name, namespace, type, reservation, duration, size from pending_queue order by created_at desc;"

// Reservations
AddReservationQuery = "insert into reservations (group_name, flux_id) values ($1, $2);"
DeleteReservationsQuery = "truncate reservations; delete from reservations;"
GetReservationsQuery = "select (group_name, flux_id) from reservations;"
AddReservationQuery = "insert into reservations (name, flux_id) values ($1, $2);"
DeleteReservationsQuery = "truncate reservations; delete from reservations;"
DeleteSingleReservationsQuery = "delete from reservations where flux_id=$1;"
GetReservationsQuery = "select (name, flux_id) from reservations;"
)
16 changes: 6 additions & 10 deletions pkg/fluxqueue/strategy/easy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,17 @@ func (EasyBackfill) AddWorkers(workers *river.Workers, cfg rest.Config) error {
if err != nil {
return err
}

cleanupWorker, err := work.NewCleanupWorker(cfg)
if err != nil {
return err
}

reservationWorker, err := work.NewReservationWorker(cfg)
if err != nil {
return err
}
river.AddWorker(workers, jobWorker)
river.AddWorker(workers, cleanupWorker)
river.AddWorker(workers, reservationWorker)
return nil
}

Expand All @@ -73,7 +76,7 @@ func (s EasyBackfill) Cleanup(
// Tags can eventually be specific to job attributes, queues, etc.
// This also sets the queue to the cleanup queue
insertOpts := river.InsertOpts{
MaxAttempts: defaults.MaxAttempts,
MaxAttempts: defaults.MaxCancelAttempts,
Tags: []string{s.Name()},
Queue: "cleanup_queue",
}
Expand All @@ -97,13 +100,6 @@ func (s EasyBackfill) Cleanup(
if err != nil {
return err
}
// TODO do we need to delete from database table?
// Now cleanup!
//dRows, err := pool.Query(ctx, queries.DeleteReservationsQuery)
//if err != nil {
// return err
//}
//defer dRows.Close()
}
return nil
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/fluxqueue/strategy/workers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,7 @@ func (w JobWorker) unsuspendJob(namespace, name string, nodes []string, fluxId i
nodesStr := strings.Join(nodes, "__")
payload := `{"spec": {"suspend": false, "template": {"metadata": {"labels": {"` + defaults.NodesLabel + `": "` + nodesStr + `", "` + defaults.FluxJobIdLabel + `": "` + jobid + `"}}}}}`
_, err = client.BatchV1().Jobs(namespace).Patch(ctx, name, patchTypes.StrategicMergePatchType, []byte(payload), metav1.PatchOptions{})
if err != nil {
return err
}
// And unsuspend the job
return patchUnsuspend(ctx, client, name, namespace)
return err
}

// patchUnsuspend patches a pod to unsuspend it.
Expand Down
Loading