Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/7/scheduler calls #93

Merged
merged 3 commits into from
Sep 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 98 additions & 22 deletions src/meson/client/impl/master/scheduler.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
(ns meson.client.impl.master.scheduler
(:require [clj-http.conn-mgr :as http-conn-mgr]
[clojure.data.json :as json]
[clojure.string :as string]
[clojure.tools.logging :as log]
[meson.http.core :as http]
[meson.util.core :as util])
Expand Down Expand Up @@ -31,6 +30,20 @@
(log/debug "Stopping connection manager for the scheduler ...")
(http-conn-mgr/shutdown-manager (:conn-mgr this)))

(defn payload-key
"Determine the payload's key.
Some API endpoints may break the 1 to 1 pattern of :type
and payload key."
[^Keyword type]
(condp = type
:request (str (util/keyword->lower type) "s")
(util/keyword->lower type)))

(comment
(payload-key :request) ; plural expected
(payload-key :accept) ; singular expected
(payload-key :subscribe)) ; singular expected

(defn call
([this ^Keyword type]
(call this type nil nil))
Expand All @@ -40,7 +53,7 @@
(call this type payload framework-id content-type {}))
([this ^Keyword type payload framework-id content-type opts]
(let [data {:type (util/keyword->upper type)
(util/keyword->lower type) payload}]
(payload-key type) payload}]
(http/post
this
scheduler-path
Expand All @@ -63,51 +76,107 @@

(defn acknowledge
([this payload stream-id framework-id]
:not-yet-implemented)
(acknowledge this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
:not-yet-implemented))
(call
this
:acknowledge
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn decline
([this payload stream-id framework-id]
:not-yet-implemented)
(decline this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
:not-yet-implemented))
(call
this
:decline
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn kill-task
([this payload stream-id framework-id]
:not-yet-implemented)
(kill-task this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
:not-yet-implemented))
(call
this
:kill
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn message
([this payload stream-id framework-id]
:not-yet-implemented)
(message this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
:not-yet-implemented))
(call
this
:message
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn reconcile
([this payload stream-id framework-id]
:not-yet-implemented)
(reconcile this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
:not-yet-implemented))
(call
this
:reconcile
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn request
([this payload stream-id framework-id]
:not-yet-implemented)
(request this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
:not-yet-implemented))
(call
this
:request ; type is request, json keyword is requests (plural)
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn revive
([this payload stream-id framework-id]
:not-yet-implemented)
(revive this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
:not-yet-implemented))
(call
this
:revive
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn shutdown-executor
([this payload stream-id framework-id]
:not-yet-implemented)
(shutdown-executor this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
:not-yet-implemented))
(call
this
:shutdown
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn subscribe
([this payload]
Expand All @@ -126,10 +195,17 @@
:connection-manager (:conn-mgr this)})))

(defn teardown
([this payload stream-id framework-id]
:not-yet-implemented)
([this payload stream-id framework-id content-type]
:not-yet-implemented))
([this stream-id framework-id]
(teardown this stream-id framework-id http/json-content-type))
([this stream-id framework-id content-type]
(call
this
:teardown
nil
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(def behaviour
{:accept accept
Expand Down