Skip to content

Separate broadcast and watcher #121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
bed4a1f
remove reference to no longer existing parsed-initialize fn
zonotope Feb 23, 2025
3ada84b
pass derive function instead of transducer
zonotope Feb 23, 2025
95caf95
add function to read config files from disk
zonotope Feb 24, 2025
7b87575
add dedicated ns for processing cli options
zonotope Feb 24, 2025
9fa527f
update db dep
zonotope Feb 25, 2025
0a70dc0
better error messages for missing configs
zonotope Feb 26, 2025
6bfa0cf
update db dep
zonotope Feb 26, 2025
f053ca1
update db dep
zonotope Feb 27, 2025
1db09aa
update db dep
zonotope Feb 27, 2025
19e6df8
update db dep
zonotope Feb 28, 2025
687d9f7
generalize middleware
zonotope Mar 2, 2025
8933a2b
allow passing custom routes in app config
zonotope Mar 2, 2025
55b9b1d
make fluree routes more configurable
zonotope Mar 2, 2025
ba0caaa
remove callback routes
zonotope Mar 3, 2025
ff03683
remove unnecessary prepare task
zonotope Mar 3, 2025
f382672
use keyword namespace aliases
zonotope Mar 3, 2025
cb98066
remove nonexistent make dependency
zonotope Mar 3, 2025
512fab3
make broadcaster explicitly available to handler
zonotope Mar 4, 2025
dbe6ee2
final-resp -> result
zonotope Mar 4, 2025
69abdb2
move watcher outside of consensus ns hierarchy
zonotope Mar 4, 2025
2eead8d
get tx-id from event when delivering a new commit
zonotope Mar 4, 2025
8f3f895
deliver watch directly from transactors instead of when broadcasting
zonotope Mar 4, 2025
3c2dd2e
broadcast! -> deliver!
zonotope Mar 4, 2025
d760569
separate deliver watch from broadcast; broadcast after txn returns
zonotope Mar 5, 2025
3a05f5c
use nil placeholders for ledger and tx id in raft for now; cleanup
zonotope Mar 5, 2025
fccf514
separate methods for broadcasting commits and errors
zonotope Mar 5, 2025
6c59b30
cljfmt fixes
zonotope Mar 5, 2025
7e9eb70
add fn to broadcast events generally
zonotope Mar 5, 2025
d9b4777
expose the watcher within the standalone transactor
zonotope Mar 5, 2025
b4db11f
add helpers to recognize certain event types
zonotope Mar 10, 2025
1702e1c
add method to determine if a transaction is being tracked
zonotope Mar 10, 2025
623f102
Merge remote-tracking branch 'origin/main' into refactor/config-parsing
zonotope Mar 11, 2025
56e1d0a
update db dep
zonotope Mar 11, 2025
6cc78c8
update db dep
zonotope Mar 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps.edn
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{:deps {org.clojure/clojure {:mvn/version "1.11.3"}
org.clojure/core.async {:mvn/version "1.6.681"}
com.fluree/db {:git/url "https://github.com/fluree/db.git"
:git/sha "745d7e734b9d617edf4a3a8c9333732c75453075"}
:git/sha "de63ab3e47a53bce0bb4b038487f91c57cfab580"}
com.fluree/json-ld {:git/url "https://github.com/fluree/json-ld.git"
:git/sha "73a990a4b803d0b4cfbbbe4dc16275b39a3add4e"}

Expand Down
105 changes: 18 additions & 87 deletions src/fluree/server.clj
Original file line number Diff line number Diff line change
@@ -1,97 +1,28 @@
(ns fluree.server
(:require [clojure.string :as str]
[clojure.tools.cli :as cli]
[fluree.db.util.log :as log]
(:require [fluree.db.util.log :as log]
[fluree.server.command :as command]
[fluree.server.system :as system])
(:gen-class))

(set! *warn-on-reflection* true)

(defn strip-leading-colon
[s]
(if (str/starts-with? s ":")
(subs s 1)
s))

(defn profile-string->keyword
[s]
(-> s str/trim strip-leading-colon keyword))

(def cli-options
[["-p" "--profile PROFILE" "Run profile"
:default :prod
:parse-fn profile-string->keyword]
["-c" "--config FILE" "Load configuration at a file path"]
["-s" "--string STRING" "Load stringified configuration"]
["-r" "--resource NAME" "Load pre-defined configuration resource"]
["-h" "--help" "Print this usage summary and exit"]])

(defn single-configuration?
[{:keys [config string resource]}]
(->> [config string resource] (remove nil?) count (>= 1)))

(def multiple-configuration-error
(str "Only a single configuration option from"
"-c/--config, -s/--string, and -r/--resource"
"is allowed."))

(defn validate-opts
[{:keys [options] :as parsed-opts}]
(if (single-configuration? options)
parsed-opts
(update parsed-opts :errors conj multiple-configuration-error)))

(defn parse-cli
[args]
(-> args
(cli/parse-opts cli-options)
validate-opts))

(defn usage
[summary]
(str/join \newline ["Fluree Ledger Server"
""
"Options:"
summary]))

(defn error-message
[errors]
(str/join \newline errors))

(defn exit
[status message]
(println message)
(System/exit status))

(defn start
[{:keys [profile] :as options}]
(if-let [config-string (:string options)]
(do (log/info "Starting Fluree server from command line configuration with profile:"
profile)
(system/start-config config-string profile))
(if-let [config-path (:config options)]
(do (log/info "Starting Fluree server configuration at path:" config-path
"with profile:" profile)
(system/start-file config-path profile))
(if-let [config-resource (:resource options)]
(do (log/info "Starting Fluree server configuration from resource:" config-resource)
(system/start-resource config-resource profile))
(do (log/info "Starting Fluree server with profile:" profile)
(system/start profile))))))

(defn run
[{:keys [options errors summary]}]
(cond (seq errors)
(let [msg (error-message errors)]
(exit 1 msg))

(:help options)
(let [msg (usage summary)]
(exit 0 msg))

:else
(start options)))
[{:keys [options] :as _cli}]
(let [{:keys [profile]} options]
(if-let [config-string (:string options)]
(do (log/info "Starting Fluree server from command line configuration with profile:"
profile)
(system/start-config config-string profile))
(if-let [config-path (:config options)]
(do (log/info "Starting Fluree server configuration at path:" config-path
"with profile:" profile)
(system/start-file config-path profile))
(if-let [config-resource (:resource options)]
(do (log/info "Starting Fluree server configuration from resource:" config-resource)
(system/start-resource config-resource profile))
(do (log/info "Starting Fluree server with profile:" profile)
(system/start profile)))))))

(defn -main
[& args]
(-> args parse-cli run))
(-> args command/formulate start))
44 changes: 19 additions & 25 deletions src/fluree/server/broadcast.clj
Original file line number Diff line number Diff line change
@@ -1,36 +1,30 @@
(ns fluree.server.broadcast
(:require [fluree.db.util.log :as log]
[fluree.server.consensus.events :as events]
[fluree.server.consensus.watcher :as watcher]))
(:require [fluree.server.consensus.events :as events]))

(set! *warn-on-reflection* true)

(defprotocol Broadcaster
(-broadcast [b ledger-id event]))
(-broadcast-commit [b ledger-id event])
(-broadcast-error [b ledger-id error-event]))

(defn broadcast-new-ledger!
[broadcaster watcher new-ledger-params new-ledger-result]
(let [{:keys [ledger-id server tx-id] :as ledger-created-event}
(events/ledger-created new-ledger-params new-ledger-result)]
(log/info (str "New Ledger successfully created by server " server
": " ledger-id " with tx-id: " tx-id "."))
(watcher/deliver-commit watcher tx-id ledger-created-event)
(-broadcast broadcaster ledger-id ledger-created-event)
::new-ledger))
[broadcaster {:keys [ledger-id] :as ledger-created-event}]
(-broadcast-commit broadcaster ledger-id ledger-created-event)
::new-ledger)

(defn broadcast-new-commit!
[broadcaster watcher commit-params commit-result]
(let [{:keys [ledger-id tx-id server] :as transaction-committed-event}
(events/transaction-committed commit-params commit-result)]
(log/info "New transaction completed for" ledger-id
"tx-id: " tx-id "by server:" server)
(watcher/deliver-commit watcher tx-id transaction-committed-event)
(-broadcast broadcaster ledger-id transaction-committed-event)
::new-commit))
[broadcaster {:keys [ledger-id] :as transaction-committed-event}]
(-broadcast-commit broadcaster ledger-id transaction-committed-event)
::new-commit)

(defn broadcast-error!
[_broadcaster watcher event error]
(let [{:keys [tx-id]} event]
(log/debug error "Delivering tx-exception to watcher")
(watcher/deliver-error watcher tx-id error)
::error))
[broadcaster {:keys [ledger-id] :as error-event}]
(-broadcast-error broadcaster ledger-id error-event)
::error)

(defn broadcast-event!
[broadcaster result]
(case (events/event-type result)
:transaction-committed (broadcast-new-commit! broadcaster result)
:ledger-created (broadcast-new-ledger! broadcaster result)
:error (broadcast-error! broadcaster result)))
5 changes: 4 additions & 1 deletion src/fluree/server/broadcast/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,13 @@

(defrecord Subscriptions [state]
Broadcaster
(-broadcast [_ ledger-id event]
(-broadcast-commit [_ ledger-id event]
(let [action (events/event-type event)
message (select-keys event [:tx-id :commit :t])]
(send-message-to-all state action ledger-id message)))
(-broadcast-error [_ ledger-id error-event]
;; no-op as subscribers are only concerned with successful transactions
(log/debug "Skipping error broadcast of event" error-event "for ledger" ledger-id))

Closeable
(close [_]
Expand Down
79 changes: 79 additions & 0 deletions src/fluree/server/command.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
(ns fluree.server.command
(:require [clojure.string :as str]
[clojure.tools.cli :as cli]))

(set! *warn-on-reflection* true)

(defn strip-leading-colon
[s]
(if (str/starts-with? s ":")
(subs s 1)
s))

(defn profile-string->keyword
[s]
(-> s str/trim strip-leading-colon keyword))

(def cli-options
[["-p" "--profile PROFILE" "Run profile"
:default :prod
:parse-fn profile-string->keyword]
["-c" "--config FILE" "Load configuration at a file path"]
["-s" "--string STRING" "Load stringified configuration"]
["-r" "--resource NAME" "Load pre-defined configuration resource"]
["-h" "--help" "Print this usage summary and exit"]])

(defn single-configuration?
[{:keys [config string resource]}]
(->> [config string resource] (remove nil?) count (>= 1)))

(def multiple-configuration-error
(str "Only a single configuration option from"
"-c/--config, -s/--string, and -r/--resource"
"is allowed."))

(defn validate-opts
[{:keys [options] :as parsed-opts}]
(if (single-configuration? options)
parsed-opts
(update parsed-opts :errors conj multiple-configuration-error)))

(defn usage
[summary]
(str/join \newline ["Fluree Ledger Server"
""
"Options:"
summary]))

(defn error-message
[errors]
(str/join \newline errors))

(defn exit
[status message]
(println message)
(System/exit status))

(defn parse
[args]
(-> args
(cli/parse-opts cli-options)
validate-opts))

(defn verify
[{:keys [errors] :as cli}]
(if (seq errors)
(let [msg (error-message errors)]
(exit 1 msg))
cli))

(defn describe
[{:keys [options summary] :as cli}]
(if (:help options)
(let [msg (usage summary)]
(exit 0 msg))
cli))

(defn formulate
[args]
(-> args parse verify describe))
18 changes: 16 additions & 2 deletions src/fluree/server/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,24 @@

(defn read-resource
[resource-name]
(-> resource-name io/resource slurp))
(try
(-> resource-name io/resource slurp)
(catch IllegalArgumentException e
(throw (ex-info (str "Unable to load configuration resource " resource-name)
{:status 400, :error :server/missing-config}
e)))))

(defn read-file
[path]
(try
(-> path io/file slurp)
(catch IllegalArgumentException e
(throw (ex-info (str "Unable to load configuration file at path: " path)
{:status 400, :error :server/missing-config}
e)))))

(defn parse
[cfg]
(-> cfg
(conn-config/parse (map derive-server-node-id))
(conn-config/parse derive-server-node-id)
(assoc :fluree.server/subscriptions {})))
58 changes: 34 additions & 24 deletions src/fluree/server/consensus/events.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
[event]
(:type event))

(defn create-ledger
"Upon receiving a request to create a new ledger, an event
message must be queued into the consensus state machine.
(defn type?
[evt type]
(-> evt event-type (= type)))

Format is [event-name event-body]"
(defn create-ledger
"Create a new event message to create a new ledger"
[ledger-id tx-id txn opts]
{:type :ledger-create
:txn txn
Expand All @@ -21,10 +22,7 @@
:instant (System/currentTimeMillis)})

(defn commit-transaction
"Upon receiving a request to create a new ledger, an event
message must be queued into the consensus state machine.

Format is [event-name event-body]"
"Create a new event message to commit a new transaction"
[ledger-id tx-id txn opts]
{:type :tx-queue
:txn txn
Expand All @@ -37,32 +35,44 @@
(defn transaction-committed
"Post-transaction, the message we will broadcast out and/or deliver
to a client awaiting a response."
([{:keys [ledger-id tx-id] :as _event-params}
{:keys [db address] :as _commit-result}]
([ledger-id tx-id {:keys [db address] :as _commit-result}]
{:type :transaction-committed
:ledger-id ledger-id
:t (:t db)
:tx-id tx-id
:commit address})
([processing-server event-params commit-result]
(-> (transaction-committed event-params commit-result)
([processing-server ledger-id tx-id commit-result]
(-> (transaction-committed ledger-id tx-id commit-result)
(assoc :server processing-server))))

(defn transaction-committed?
[evt]
(type? evt :transaction-committed))

(defn ledger-created
([event-params commit-result]
(-> event-params
(transaction-committed commit-result)
([ledger-id tx-id commit-result]
(-> (transaction-committed ledger-id tx-id commit-result)
(assoc :type :ledger-created)))
([processing-server event-params commit-result]
(-> (ledger-created event-params commit-result)
([processing-server ledger-id tx-id commit-result]
(-> (ledger-created ledger-id tx-id commit-result)
(assoc :server processing-server))))

(defn ledger-created?
[evt]
(type? evt :ledger-created))

(defn error
([params exception]
(-> params
(select-keys [:ledger-id :tx-id])
(assoc :error-message (ex-message exception)
:error-data (ex-data exception))))
([processing-server params exception]
(-> (error params exception)
([ledger-id tx-id exception]
(-> {:type :error
:ledger-id ledger-id
:tx-id tx-id
:error exception
:error-message (ex-message exception)
:error-data (ex-data exception)}))
([processing-server ledger-id tx-id exception]
(-> (error ledger-id tx-id exception)
(assoc :server processing-server))))

(defn error?
[evt]
(type? evt :error))
Loading