Skip to content

Implement HTTP GET /status to get the State of the Queue #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
126 changes: 96 additions & 30 deletions go/src/pythia/backend/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package backend

import (
"container/list"
"encoding/json"
"flag"
"fmt"
"log"
"pythia"
"strings"
"sync"
"time"
)

func init() {
Expand All @@ -41,7 +43,7 @@ type queueClient struct {
Id int

// The response channel.
Response chan<- pythia.Message
Response chan<- pythia.Message `json:"-"`

// The number of parallel jobs this pool can handle.
Capacity int
Expand All @@ -67,7 +69,7 @@ type queueJob struct {
// The client having submitted this job.
Origin *queueClient

// Element of the queue.waiting list pointing to this job, or nil if the job
// Element of the queue.Waiting list pointing to this job, or nil if the job
// is currently running.
WaitingElement *list.Element

Expand Down Expand Up @@ -96,6 +98,17 @@ const (
quitMsg pythia.MsgType = "-quit"
)

// A queueStatus is an internal structure required to marshal the state of the Queue
// in a semantically right JSON.
type QueueStatus struct {
Capacity int `json:"capacity"`
Available int `json:"available"`
Clients []*queueClient `json:"clients, omitempty"`
Jobs []*queueJob `json:"jobs, omitempty"`
Waiting *list.List `json:"waiting"`
CreationDate time.Time `json:"creation_date"`
}

// The Queue is the central component of Pythia.
// It receives jobs (tasks with inputs) from front-ends and dispatches them
// to the sandboxes.
Expand All @@ -116,20 +129,24 @@ type Queue struct {
wg sync.WaitGroup

// Active connections
clients map[int]*queueClient
Clients map[int]*queueClient

// Jobs to be processed/currently processing
jobs map[string]*queueJob
Jobs map[string]*queueJob

// List of jobs (*queueJob) waiting to be assigned.
waiting *list.List
Waiting *list.List

// Get the Queue creation datetime
CreationDate time.Time
}

// NewQueue returns a new queue with default parameters.
func NewQueue() *Queue {
queue := new(Queue)
queue.Capacity = 500
queue.quit = make(chan bool, 1)
queue.CreationDate = time.Now()
return queue
}

Expand All @@ -149,6 +166,7 @@ func (queue *Queue) Run() {
closing := false
master := make(chan queueMessage)
queue.master = master

go func() {
<-queue.quit
closing = true
Expand Down Expand Up @@ -192,29 +210,29 @@ func (queue *Queue) Shutdown() {
// Main goroutine responsible for scheduling the jobs.
func (queue *Queue) main(master <-chan queueMessage) {
defer queue.wg.Done()
queue.clients = make(map[int]*queueClient)
queue.jobs = make(map[string]*queueJob)
queue.waiting = list.New()
queue.Clients = make(map[int]*queueClient)
queue.Jobs = make(map[string]*queueJob)
queue.Waiting = list.New()
for qm := range master {
switch qm.Msg.Message {
case connectMsg:
log.Print("Client ", qm.Client.Id, ": connected.")
queue.clients[qm.Client.Id] = qm.Client
queue.Clients[qm.Client.Id] = qm.Client
case pythia.RegisterPoolMsg:
log.Print("Client ", qm.Client.Id, ": pool capacity ",
qm.Msg.Capacity)
qm.Client.Capacity = qm.Msg.Capacity
case pythia.LaunchMsg:
id := qm.Msg.Id
if _, ok := queue.jobs[id]; ok {
if _, ok := queue.Jobs[id]; ok {
log.Print("Job ", id, ": already launched, rejecting.")
qm.Client.Response <- pythia.Message{
Message: pythia.DoneMsg,
Id: id,
Status: pythia.Fatal,
Output: "Job already launched",
}
} else if queue.waiting.Len() >= queue.Capacity {
} else if queue.Waiting.Len() >= queue.Capacity {
log.Print("Job ", id, ": queue full, rejecting.")
qm.Client.Response <- pythia.Message{
Message: pythia.DoneMsg,
Expand All @@ -229,14 +247,14 @@ func (queue *Queue) main(master <-chan queueMessage) {
Origin: qm.Client,
}
qm.Client.Submitted[id] = job
queue.jobs[id] = job
job.WaitingElement = queue.waiting.PushBack(job)
queue.Jobs[id] = job
job.WaitingElement = queue.Waiting.PushBack(job)
log.Print("Job ", id, ": queued.")
}
case pythia.DoneMsg:
id := qm.Msg.Id
log.Print("Job ", id, ": done.")
job := queue.jobs[id]
job := queue.Jobs[id]
if job == nil {
log.Println("Ignoring message for unknown job", qm.Msg)
break
Expand All @@ -246,7 +264,7 @@ func (queue *Queue) main(master <-chan queueMessage) {
log.Println("Ignoring message from wrong source", qm.Msg)
break
}
delete(queue.jobs, id)
delete(queue.Jobs, id)
delete(pool.Running, id)
if job.Origin != nil {
// job.Origin is nil if the submitting client has disconnected
Expand All @@ -257,35 +275,51 @@ func (queue *Queue) main(master <-chan queueMessage) {
case closedMsg:
log.Print("Client ", qm.Client.Id, ": disconnected.")
close(qm.Client.Response)
delete(queue.clients, qm.Client.Id)
delete(queue.Clients, qm.Client.Id)
for _, job := range qm.Client.Running {
if job.Origin == nil {
// Submitter disconnected, we can discard the job.
delete(queue.jobs, job.Id)
delete(queue.Jobs, job.Id)
} else {
// Otherwise, reschedule it.
job.Pool = nil
job.WaitingElement = queue.waiting.PushFront(job)
job.WaitingElement = queue.Waiting.PushFront(job)
}
}
for _, job := range qm.Client.Submitted {
if job.WaitingElement != nil {
// Job is in waiting queue, discard it.
queue.waiting.Remove(job.WaitingElement)
delete(queue.jobs, job.Id)
queue.Waiting.Remove(job.WaitingElement)
delete(queue.Jobs, job.Id)
} else if job.Pool != nil {
// Job is running, abort it.
job.Origin = nil
job.Pool.Response <- pythia.Message{
Message: pythia.AbortMsg,
Id: job.Id,
}
// Keep job in queue.jobs to handle abort result
// Keep job in queue.Jobs to handle abort result
}
}
case quitMsg:
log.Println("Quitting.")
goto quit

case pythia.StatusMsg:
status := fillQueueStatus(queue)
id := qm.Msg.Id
serializedStatus, err := json.Marshal(status)
if err != nil {
log.Fatal("Queue is in an invalid state")
log.Fatal(err)
}
qm.Client.Response <- pythia.Message{
Message: pythia.DoneMsg,
Id: id,
Status: pythia.Success,
Output: string(serializedStatus),
}
log.Println("Client ", qm.Client.Id, " : Status sent")
default:
log.Fatal("Invalid internal message", qm.Msg)
}
Expand All @@ -295,19 +329,19 @@ func (queue *Queue) main(master <-chan queueMessage) {
}

quit:
if len(queue.clients) == 0 {
if len(queue.Clients) == 0 {
return
}
for _, client := range queue.clients {
for _, client := range queue.Clients {
close(client.Response)
}
// Wait for all clients to quit. We flush messages from the master channel
// Wait for all Clients to quit. We flush messages from the master channel
// to ensure no connection handler is in a deadlock.
for qm := range master {
switch qm.Msg.Message {
case closedMsg:
delete(queue.clients, qm.Client.Id)
if len(queue.clients) == 0 {
delete(queue.Clients, qm.Client.Id)
if len(queue.Clients) == 0 {
return
}
default:
Expand All @@ -320,17 +354,17 @@ quit:
// This function shall be called from the main goroutine, as it manipulates
// the queue data structures.
func (queue *Queue) schedule() {
if queue.waiting.Len() == 0 {
if queue.Waiting.Len() == 0 {
return
}
for _, client := range queue.clients {
for _, client := range queue.Clients {
for len(client.Running) < client.Capacity {
job := queue.waiting.Remove(queue.waiting.Front()).(*queueJob)
job := queue.Waiting.Remove(queue.Waiting.Front()).(*queueJob)
job.WaitingElement = nil
job.Pool = client
client.Running[job.Id] = job
client.Response <- job.Msg
if queue.waiting.Len() == 0 {
if queue.Waiting.Len() == 0 {
return
}
}
Expand Down Expand Up @@ -361,6 +395,8 @@ func (queue *Queue) handle(conn *pythia.Conn, client *queueClient, response chan
queue.master <- queueMessage{msg, client}
case pythia.DoneMsg:
queue.master <- queueMessage{msg, client}
case pythia.StatusMsg:
queue.master <- queueMessage{msg, client}
default:
log.Println("Ignoring message", msg)
}
Expand All @@ -374,10 +410,40 @@ func (queue *Queue) handle(conn *pythia.Conn, client *queueClient, response chan
case pythia.DoneMsg:
msg.Id = msg.Id[strings.Index(msg.Id, ":")+1:]
conn.Send(msg)
case pythia.StatusMsg:
conn.Send(msg)
default:
log.Fatal("Invalid internal message", msg)
}
}
}

func convertClientsToSlice(clients map[int]*queueClient) (clientsSlice []*queueClient) {
clientsSlice = make([]*queueClient, 0)
for _, element := range clients {
clientsSlice = append(clientsSlice, element)
}
return clientsSlice
}

func convertJobsToSlice(jobs map[string]*queueJob) (jobsSlice []*queueJob) {
jobsSlice = make([]*queueJob, 0)
for _, element := range jobs {
jobsSlice = append(jobsSlice, element)
}
return jobsSlice
}

// Return a QueueStatus struct filled with information coming from the Queue
func fillQueueStatus(queue *Queue) (status QueueStatus) {
status.Capacity = queue.Capacity
status.Available = queue.Capacity - len(queue.Jobs) - queue.Waiting.Len()
status.Clients = convertClientsToSlice(queue.Clients)
status.Jobs = convertJobsToSlice(queue.Jobs)
status.Waiting = queue.Waiting
status.CreationDate = queue.CreationDate

return status
}

// vim:set ts=4 sw=4 noet:
45 changes: 45 additions & 0 deletions go/src/pythia/backend/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package backend

import (
"encoding/json"
"pythia"
"reflect"
"strconv"
"testing"
"testutils"
"testutils/pytest"
Expand Down Expand Up @@ -114,4 +117,46 @@ func TestQueueSimple(t *testing.T) {
f.TearDown()
}

func TestQueueStatus(t *testing.T) {
f := SetupQueueFixture(t, 500, 2)
frontend := f.Clients[0]

frontend.Send(pythia.Message{
Message: pythia.StatusMsg,
Id: "test",
})

// Removing Clients array from Message as this part will differ due to client list
// referencing the test client as a client but said client is no longer connected
// when the status is emitted
status := fillQueueStatus(f.Queue)
status.Clients = make([]*queueClient, 0)

msg := <-frontend.Conn.Receive()
if msg.Message != pythia.DoneMsg || msg.Id != "test" || msg.Status != pythia.Success {
t.Fatal("Message content mismatching")
}

var expected QueueStatus
expected.Clients = make([]*queueClient, 0)
json.Unmarshal([]byte(msg.Output), &expected)

// The content of the Waiting list is not compared because it's not efficient
// and it's not really interesting because the list is supposed to be empty
if expected.Capacity != status.Capacity ||
expected.Available != status.Available ||
!reflect.DeepEqual(expected.Jobs, status.Jobs) ||
!reflect.DeepEqual(expected.Waiting.Len(), status.Waiting.Len()) ||
!expected.CreationDate.Equal(status.CreationDate) {

t.Error("Capacity : " + strconv.FormatBool(expected.Capacity != status.Capacity))
t.Error("Available : " + strconv.FormatBool(expected.Available != status.Available))
t.Error("Jobs : " + strconv.FormatBool(!reflect.DeepEqual(expected.Jobs, status.Jobs)))
t.Error("Waiting : " + strconv.FormatBool(!reflect.DeepEqual(expected.Waiting, status.Waiting)))
t.Error("CreationDate : " + strconv.FormatBool(!expected.CreationDate.Equal(status.CreationDate)))
}

f.TearDown()
}

// vim:set sw=4 ts=4 noet:
25 changes: 25 additions & 0 deletions go/src/pythia/frontend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (server *Server) Run() {
}()
// Start the web server
http.HandleFunc("/execute", handler)
http.HandleFunc("/status", statusHandler)
log.Println("Server listening on", server.Port)
if err := http.ListenAndServe(fmt.Sprint(":", server.Port), nil); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -140,4 +141,28 @@ func handler(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}

// Handle for /status route to get the status of the Queue
func statusHandler(rw http.ResponseWriter, req *http.Request) {
log.Println("Client connected: ", req.URL)
if req.Method != "GET" {
rw.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Connection to the pool
conn := pythia.DialRetry(pythia.QueueAddr)
defer conn.Close()
conn.Send(pythia.Message{
Message: pythia.StatusMsg,
})
if msg, ok := <-conn.Receive(); ok {
switch msg.Status {
case "success":
rw.Header().Set("Content-Type", "application/json")
fmt.Fprintf(rw, msg.Output)
}
return
}
rw.WriteHeader(http.StatusInternalServerError)
}

// vim:set sw=4 ts=4 noet:
Loading