Skip to content

Feature/consensus broadcast #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
98c6cc9
mark deliver-event method as public
zonotope Mar 19, 2025
0ca8828
remove broadcaster from handlers
zonotope Mar 19, 2025
feb79ae
add response ns that combines broadcasting and delivering watcher
zonotope Mar 19, 2025
e0fedab
add broadcaster to standalone config; use response fns for tx result
zonotope Mar 19, 2025
21069fc
use response ns with raft; add watcher and broadcaster
zonotope Mar 19, 2025
d3cf821
add make task for cljfmt-fix
zonotope Mar 19, 2025
fb93848
cljfmt fixes
zonotope Mar 19, 2025
952e203
remove unused requires
zonotope Mar 19, 2025
149df5a
more focused try/catch block for more relevant error messages
zonotope Mar 20, 2025
d8d454c
expect event messages to be created outside of watcher
zonotope Mar 21, 2025
25640a8
add ledger-id to ex-data response maps
zonotope Mar 21, 2025
c6b4ca4
add new error responses
zonotope Mar 25, 2025
b8ef0c5
Merge remote-tracking branch 'origin/main' into feature/consensus-bro…
zonotope Mar 25, 2025
a3aa0ba
add outcome predicate fn for events
zonotope Mar 25, 2025
9fe5a39
update db dep
zonotope Mar 26, 2025
4b7c8b5
update db dep
zonotope Mar 27, 2025
033da09
bump db dep
zonotope Mar 28, 2025
7d287aa
update db dep
zonotope Mar 28, 2025
f539dba
update db dep
zonotope Mar 28, 2025
01557fd
Merge remote-tracking branch 'origin/main' into feature/consensus-bro…
zonotope Mar 30, 2025
d986084
update db dep
zonotope Mar 30, 2025
4483c7a
update db dep
zonotope Apr 2, 2025
8da1a7b
Add commit hash to commit result message
zonotope Apr 4, 2025
23171ec
update db dep
zonotope Apr 4, 2025
73649d6
update db dep
zonotope Apr 4, 2025
4d04907
update db dep
zonotope Apr 4, 2025
fa12414
update db dep
zonotope Apr 5, 2025
e5722ea
update db dep
zonotope Apr 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ clj-kondo-lint-ci:
cljfmt-check: ## Check Clojure formatting with cljfmt
cljfmt check src test build.clj

.PHONY: cljfmt-fix ## Fix Clojure formatting errors with cljfmt
cljfmt-fix:
cljfmt fix src dev test build.clj

.PHONY: clean
clean: ## Remove build artifacts
rm -rf target
Expand Down
2 changes: 1 addition & 1 deletion deps.edn
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{:deps {org.clojure/clojure {:mvn/version "1.11.3"}
org.clojure/core.async {:mvn/version "1.6.681"}
com.fluree/db {:git/url "https://github.com/fluree/db.git"
:git/sha "6619218ac8b29348982a194c00f07cd3b806c5c1"}
:git/sha "d79f127e0d9cc8e8aa95d0120fd00bd9683a341b"}
com.fluree/json-ld {:git/url "https://github.com/fluree/json-ld.git"
:git/sha "73a990a4b803d0b4cfbbbe4dc16275b39a3add4e"}

Expand Down
5 changes: 1 addition & 4 deletions dev/src/api_calls.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
(ns api-calls
(:require [fluree.db.api :as fluree]))


(comment
(def conn @(fluree/connect {:method :file,
:parallelism 2,
Expand All @@ -12,6 +11,4 @@
:indexer {:reindex-min-bytes 9
:reindex-max-bytes 10000000}}}))

(def ledger @(fluree/load conn "my/test1"))

)
(def ledger @(fluree/load conn "my/test1")))
81 changes: 38 additions & 43 deletions dev/src/http_calls.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,58 +4,53 @@
[fluree.server.consensus.network.multi-addr :refer [multi->map]]
[user :as user]))


(def ledger-name "my/test")

(def server-1-address "http://localhost:58090/")
(def server-2-address "http://localhost:58091/")
(def server-3-address "http://localhost:58092/")


(comment

(-> (client/get (str server-1-address "swagger.json"))
:body
(json/parse false))
(-> (client/get (str server-1-address "swagger.json"))
:body
(json/parse false))

(-> (client/post (str server-1-address "fluree/create")
{:body (json/stringify-UTF8
{
"@context" {"ex" "http://example.org/"}
"ledger" ledger-name
"insert" {"@id" "ex:test1"
"ex:name" "Brian"}})
(-> (client/post (str server-1-address "fluree/create")
{:body (json/stringify-UTF8
{"@context" {"ex" "http://example.org/"}
"ledger" ledger-name
"insert" {"@id" "ex:test1"
"ex:name" "Brian"}})
;:headers {"X-Api-Version" "2"}
:content-type :json
:socket-timeout 1000 ;; in milliseconds
:connection-timeout 1000 ;; in milliseconds
:accept :json}))

(-> (client/post (str server-1-address "fluree/transact")
{:body (json/stringify-UTF8
{"@context" {"ex" "http://example.org/"}
"ledger" ledger-name
"insert" {"@id" "ex:test2"
"ex:name" "Brian2"}})
:content-type :json
:socket-timeout 1000 ;; in milliseconds
:connection-timeout 1000 ;; in milliseconds
:accept :json}))

(-> (client/post (str server-1-address "fluree/transact")
{:body (json/stringify-UTF8
{"@context" {"ex" "http://example.org/"}
"ledger" ledger-name
"insert" {"@id" "ex:test2"
"ex:name" "Brian2"}})
;:headers {"X-Api-Version" "2"}
:content-type :json
:socket-timeout 1000 ;; in milliseconds
:connection-timeout 1000 ;; in milliseconds
:accept :json}))

(-> (client/post (str server-1-address "fluree/query")
{:body (json/stringify-UTF8
{"@context" {"ex" "http://example.org/"}
"select" {"?s" ["*"]}
"from" ledger-name
"where" {"@id" "?s"
"ex:name" nil}})
:content-type :json
:socket-timeout 1000 ;; in milliseconds
:connection-timeout 1000 ;; in milliseconds
:accept :json}))

(-> (client/post (str server-1-address "fluree/query")
{:body (json/stringify-UTF8
{"@context" {"ex" "http://example.org/"}
"select" {"?s" ["*"]}
"from" ledger-name
"where" {"@id" "?s"
"ex:name" nil}})
;:headers {"X-Api-Version" "2"}
:content-type :json
:socket-timeout 1000 ;; in milliseconds
:connection-timeout 1000 ;; in milliseconds
:accept :json})
:body
(json/parse false))

)
:content-type :json
:socket-timeout 1000 ;; in milliseconds
:connection-timeout 1000 ;; in milliseconds
:accept :json})
:body
(json/parse false)))
11 changes: 5 additions & 6 deletions dev/src/user.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
[clojure.string :as str]
[fluree.db.api :as fluree]
[fluree.db.connection.system :as conn-system]
[fluree.server.handlers.transact :as tx-handler]
[fluree.server.handlers.create :as create-handler]
[fluree.server.consensus.raft.handler :as consensus-handler]
[fluree.server.consensus.raft]
[fluree.server.system :as system]
[fluree.db.util.log :as log]
[fluree.server.config :as config]
[fluree.server.consensus.raft]
[fluree.server.consensus.raft.handler :as consensus-handler]
[fluree.server.handlers.create :as create-handler]
[fluree.server.handlers.transact :as tx-handler]
[fluree.server.system :as system]
[integrant.core :as ig]
[integrant.repl :refer [clear go halt init reset reset-all]]))


;; Register dev-config as the default config
(def sys-config (config/read-resource "file-config.jsonld"))

Expand Down
11 changes: 9 additions & 2 deletions src/fluree/server/consensus/events.clj
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@
(defn transaction-committed
"Post-transaction, the message we will broadcast out and/or deliver
to a client awaiting a response."
([ledger-id tx-id {:keys [db address] :as _commit-result}]
([ledger-id tx-id {:keys [db address hash] :as _commit-result}]
{:type :transaction-committed
:ledger-id ledger-id
:t (:t db)
:tx-id tx-id
:commit address})
:commit {:address address
:hash hash}})
([processing-server ledger-id tx-id commit-result]
(-> (transaction-committed ledger-id tx-id commit-result)
(assoc :server processing-server))))
Expand Down Expand Up @@ -76,3 +77,9 @@
(defn error?
[evt]
(type? evt :error))

(defn outcome?
[evt]
(or (transaction-committed? evt)
(ledger-created? evt)
(error? evt)))
8 changes: 3 additions & 5 deletions src/fluree/server/consensus/raft/handlers/ledger_created.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
[clojure.java.io :as io]
[fluree.db.util.filesystem :as fs]
[fluree.db.util.log :as log]
[fluree.server.consensus.events :as events]
[fluree.server.consensus.raft.handlers.new-commit :as new-commit]
[fluree.server.watcher :as watcher])
[fluree.server.consensus.response :as response])
(:import (java.io File)))

(set! *warn-on-reflection* true)
Expand Down Expand Up @@ -84,6 +83,5 @@

(defn deliver!
"Responsible for producing the event broadcast to connected peers."
[{:keys [fluree/watcher] :as _config} handler-result]
(let [new-ledger-event (events/ledger-created nil nil handler-result)]
(watcher/deliver-commit watcher nil nil new-ledger-event)))
[{:keys [fluree/watcher fluree/broadcaster] :as _config} {:keys [ledger-id tx-id] :as commit-result}]
(response/announce-new-ledger watcher broadcaster ledger-id tx-id commit-result))
9 changes: 4 additions & 5 deletions src/fluree/server/consensus/raft/handlers/new_commit.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
[fluree.db.util.filesystem :as fs]
[fluree.db.util.json :as json]
[fluree.db.util.log :as log]
[fluree.server.consensus.events :as events]
[fluree.server.watcher :as watcher]))
[fluree.server.consensus.response :as response]))

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -100,6 +99,6 @@

(defn deliver!
"Responsible for producing the event broadcast to connected peers."
[{:keys [fluree/watcher] :as _config} commit-result]
(let [new-commit-event (events/transaction-committed nil nil commit-result)]
(watcher/deliver-commit watcher nil nil new-commit-event)))
[{:keys [fluree/watcher fluree/broadcaster] :as _config}
{:keys [ledger-id tx-id] :as commit-result}]
(response/announce-commit watcher broadcaster ledger-id tx-id commit-result))
6 changes: 3 additions & 3 deletions src/fluree/server/consensus/raft/handlers/tx_exception.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns fluree.server.consensus.raft.handlers.tx-exception
(:require [fluree.db.util.log :as log]
[fluree.server.watcher :as watcher]))
[fluree.server.consensus.response :as response]))

(defn update-ledger-state
"Updates the latest commit in the ledger, and removes the processed transaction in the queue"
Expand All @@ -27,8 +27,8 @@
e)))))

(defn deliver!
[{:keys [fluree/watcher] :as _config} exception]
(watcher/deliver-error watcher nil nil exception))
[{:keys [fluree/watcher fluree/broadcaster] :as _config} exception]
(response/announce-error watcher broadcaster nil nil exception))

(defn handler
"Handles transaction exceptions and broadcasts them to network."
Expand Down
25 changes: 25 additions & 0 deletions src/fluree/server/consensus/response.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
(ns fluree.server.consensus.response
(:require [fluree.server.broadcast :as broadcast]
[fluree.server.consensus.events :as events]
[fluree.server.watcher :as watcher]))

(defn announce-new-ledger
[watcher broadcaster ledger-id tx-id commit-result]
(let [new-ledger-event (events/ledger-created ledger-id tx-id commit-result)]
(broadcast/broadcast-new-ledger! broadcaster new-ledger-event)
(watcher/deliver-event watcher tx-id new-ledger-event)
::new-ledger))

(defn announce-commit
[watcher broadcaster ledger-id tx-id commit-result]
(let [commit-event (events/transaction-committed ledger-id tx-id commit-result)]
(broadcast/broadcast-new-commit! broadcaster commit-event)
(watcher/deliver-event watcher tx-id commit-event)
::commit))

(defn announce-error
[watcher broadcaster ledger-id tx-id ex]
(let [error-event (events/error ledger-id tx-id ex)]
(broadcast/broadcast-error! broadcaster error-event)
(watcher/deliver-event watcher tx-id error-event)
::error))
Loading