Skip to content

Commit

Permalink
Jetty leverages WS control frames for heartbeat
Browse files Browse the repository at this point in the history
Cleaner design. HTTPKit doesn't support it, so we emulate it.
  • Loading branch information
ggeoffrey committed Jan 31, 2024
1 parent 7b81126 commit ae1341b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
10 changes: 8 additions & 2 deletions src/hyperfiddle/electric_httpkit_adapter.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@
(success-cb)
(failure-cb (ex-info "Can't send message to client, remote channel is closed" {}))))

;; ping and pong are not exposed by HTTPKit
;; HTTPKit will automatically answer pings with an immediate echo pong.
;; ping and pong are not exposed by HTTPKit. Instead ping is replaced by a
;; special "HEARTBEAT" message that the client will echo. HTTPKit will
;; automatically answer client pings with an immediate echo pong.
ering/Pingable
(ping [this] (ering/ping this "HEARTBEAT"))
(ping [this value] (assert (= "HEARTBEAT" value)) (ering/send this value))
(pong [this] (throw (ex-info "Pong is not supported" {})))
(pong [this value] (throw (ex-info "Pong with arbitrary data is not supported" {})))
)

(defn reject-websocket-handler
Expand Down
10 changes: 5 additions & 5 deletions src/hyperfiddle/electric_ring_adapter.clj
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
(send [_this value success-cb failure-cb] (ws/send socket value success-cb failure-cb))
Pingable
(ping [_this] (ws/ping socket))
(ping [_this value] (ws/ping socket value))
(ping [_this value] (ws/ping socket (if (string? value) (java.nio.ByteBuffer/wrap (.getBytes value)) value)))
(pong [_this] (ws/pong socket))
(pong [_this value] (ws/pong socket value)))
(pong [_this value] (ws/pong socket (if (string? value) (java.nio.ByteBuffer/wrap (.getBytes value)) value))))

(defn reject-websocket-handler
"Will accept socket connection upgrade and immediately close the socket on
Expand Down Expand Up @@ -90,8 +90,8 @@
(throw (ex-info "No message received after specified time" {::type ::timeout, ::time-seconds (int (/ time 1000))})))
(recur))))

(defn send-hf-heartbeat [delay send!]
(m/sp (loop [] (m/? (m/sleep delay)) (send! "HEARTBEAT") (recur))))
(defn send-hf-heartbeat [delay ping!]
(m/sp (loop [] (m/? (m/sleep delay)) (ping!) (recur))))

(defn- boot! [entrypoint ring-req write-msg read-msg]
((entrypoint ring-req) (comp write-msg io/encode) (fn [cb] (read-msg (comp cb io/decode)))))
Expand Down Expand Up @@ -156,7 +156,7 @@
((m/join (fn [& _])
(timeout keepalive-mailbox ELECTRIC-CONNECTION-TIMEOUT)
(boot! boot-fn ring-req (partial write-msg socket) (r/subject-at state on-message-slot))
(send-hf-heartbeat ELECTRIC-HEARTBEAT-INTERVAL #(send socket %)))
(send-hf-heartbeat ELECTRIC-HEARTBEAT-INTERVAL #(ping socket "HEARTBEAT")))
{} (partial failure socket)))) ; Start Electric process
:on-close (fn on-close [_socket _status-code & [_reason]]
((aget state on-close-slot)))
Expand Down

0 comments on commit ae1341b

Please sign in to comment.