Skip to content

Commit 48a31cc

Browse files
authored
Merge pull request #69 from fluree/feature/no-consensus
Use new no-consensus by default
2 parents d21f261 + 7597ddc commit 48a31cc

20 files changed

+298
-133
lines changed

resources/config-raft.edn

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{:fluree/raft {:log-history #or [#env FLUREE_RAFT_LOG_HISTORY
2+
10]
3+
:entries-max #or [#env FLUREE_RAFT_ENTRIES_MAX
4+
50]
5+
:catch-up-rounds #or [#env FLUREE_RAFT_CATCH_UP_ROUNDS
6+
10]
7+
:storage-type #or [#keyword #env FLUREE_RAFT_STORAGE_TYPE
8+
:file]
9+
:servers #or [#env FLUREE_RAFT_SERVERS
10+
#profile {:dev "/ip4/127.0.0.1/tcp/62071"
11+
:prod nil
12+
:docker "/ip4/127.0.0.1/tcp/62071"}]
13+
:this-server #or [#env FLUREE_RAFT_THIS_SERVER
14+
#profile {:dev "/ip4/127.0.0.1/tcp/62071"
15+
:prod nil
16+
:docker "/ip4/127.0.0.1/tcp/62071"}]
17+
:log-directory #env FLUREE_RAFT_LOG_DIRECTORY
18+
:ledger-directory #env FLUREE_RAFT_LEDGER_DIRECTORY
19+
:conn #ig/ref :fluree/connection
20+
:watcher #ig/ref :fluree/watcher
21+
:subscriptions #ig/ref :fluree/subscriptions}}

resources/config.edn

+6-22
Original file line numberDiff line numberDiff line change
@@ -21,34 +21,18 @@
2121
:defaults {:indexer {:reindex-min-bytes 1000
2222
:reindex-max-bytes 10000000}}}
2323

24-
:fluree/watcher {:max-tx-wait-ms #or [#env FLUREE_HTTP_MAX_TX_WAIT_MS
24+
:fluree/watcher {:max-tx-wait-ms #or [#env FLUREE_MAX_TX_WAIT_MS
2525
#profile {:dev 45000
2626
:prod 45000
2727
:docker 45000}]}
2828

2929
:fluree/subscriptions {}
3030

31-
:fluree/raft {:log-history #or [#env FLUREE_RAFT_LOG_HISTORY
32-
10]
33-
:entries-max #or [#env FLUREE_RAFT_ENTRIES_MAX
34-
50]
35-
:catch-up-rounds #or [#env FLUREE_RAFT_CATCH_UP_ROUNDS
36-
10]
37-
:storage-type #or [#keyword #env FLUREE_RAFT_STORAGE_TYPE
38-
:file]
39-
:servers #or [#env FLUREE_RAFT_SERVERS
40-
#profile {:dev "/ip4/127.0.0.1/tcp/62071"
41-
:prod nil
42-
:docker "/ip4/127.0.0.1/tcp/62071"}]
43-
:this-server #or [#env FLUREE_RAFT_THIS_SERVER
44-
#profile {:dev "/ip4/127.0.0.1/tcp/62071"
45-
:prod nil
46-
:docker "/ip4/127.0.0.1/tcp/62071"}]
47-
:log-directory #env FLUREE_RAFT_LOG_DIRECTORY
48-
:ledger-directory #env FLUREE_RAFT_LEDGER_DIRECTORY
49-
:conn #ig/ref :fluree/connection
50-
:watcher #ig/ref :fluree/watcher
51-
:subscriptions #ig/ref :fluree/subscriptions}
31+
:fluree/standalone {:conn #ig/ref :fluree/connection
32+
:watcher #ig/ref :fluree/watcher
33+
:subscriptions #ig/ref :fluree/subscriptions
34+
:max-pending-txs #or [#env FLUREE_STANDALONE_MAX_PENDING_TXS
35+
16]}
5236

5337
:fluree/handler {:conn #ig/ref :fluree/connection
5438
:consensus #ig/ref :fluree/consensus

src/fluree/server/consensus.clj

+19-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,24 @@
11
(ns fluree.server.consensus
2-
"To allow for pluggable consensus, we have a TxGroup protocol. In order to allow
2+
"To allow for pluggable consensus, we have a Transactor protocol. In order to allow
33
for a new consensus type, we need to create a record with all of the following
4-
methods. Currently, we support Raft and Solo.")
4+
methods. Currently, we support Raft and Standalone."
5+
(:require [fluree.db.util.log :as log]
6+
[fluree.server.consensus.events :as events]))
57

68
(set! *warn-on-reflection* true)
79

8-
(defprotocol TxGroup
9-
(queue-new-ledger [group ledger-id tx-id txn opts])
10-
(queue-new-transaction [group ledger-id tx-id txn opts]))
10+
(defprotocol Transactor
11+
(-queue-new-ledger [group event-params])
12+
(-queue-new-transaction [group event-params]))
13+
14+
(defn queue-new-ledger
15+
[group ledger-id tx-id txn opts]
16+
(log/trace "queue-new-ledger:" ledger-id tx-id txn)
17+
(let [event-params (events/create-ledger ledger-id tx-id txn opts)]
18+
(-queue-new-ledger group event-params)))
19+
20+
(defn queue-new-transaction
21+
[group ledger-id tx-id txn opts]
22+
(log/trace "queue-new-transaction:" txn)
23+
(let [event-params (events/commit-transaction ledger-id tx-id txn opts)]
24+
(-queue-new-transaction group event-params)))
+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
(ns fluree.server.consensus.broadcast
2+
(:require [fluree.db.util.log :as log]
3+
[fluree.server.consensus.subscriptions :as subscriptions]
4+
[fluree.server.consensus.watcher :as watcher]))
5+
6+
(set! *warn-on-reflection* true)
7+
8+
(defn announce-new-ledger!
9+
[subscriptions watcher ledger-created-event]
10+
(let [{:keys [ledger-id server tx-id commit-file-meta]} ledger-created-event]
11+
(log/info (str "New Ledger successfully created by server " server
12+
": " ledger-id " with tx-id: " tx-id "."))
13+
(watcher/deliver-watch watcher tx-id ledger-created-event)
14+
(subscriptions/send-message-to-all subscriptions "ledger-created" ledger-id (:json commit-file-meta))
15+
:success))
16+
17+
(defn announce-new-commit!
18+
[subscriptions watcher transaction-commited-event]
19+
(let [{:keys [ledger-id tx-id server commit-file-meta]} transaction-commited-event]
20+
(log/info "New transaction completed for" ledger-id
21+
"tx-id: " tx-id "by server:" server)
22+
(watcher/deliver-watch watcher tx-id transaction-commited-event)
23+
(subscriptions/send-message-to-all subscriptions "new-commit" ledger-id
24+
(:json commit-file-meta))
25+
:success))
26+
27+
(defn announce-error!
28+
[watcher error-event]
29+
(let [{:keys [tx-id ex-message ex-data]} error-event]
30+
(log/debug "Delivering tx-exception to watcher with msg/data: " ex-message ex-data)
31+
(watcher/deliver-watch watcher tx-id (ex-info ex-message ex-data))))
+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
(ns fluree.server.consensus.events
2+
"Common namespace for defining consensus event messages shared across consensus
3+
protocols")
4+
5+
(defn create-ledger
6+
"Upon receiving a request to create a new ledger, an event
7+
message must be queued into the consensus state machine.
8+
9+
Format is [event-name event-body]"
10+
[ledger-id tx-id txn opts]
11+
[:ledger-create {:txn txn
12+
:size (count txn)
13+
:tx-id tx-id
14+
:ledger-id ledger-id
15+
:opts opts
16+
:instant (System/currentTimeMillis)}])
17+
18+
(defn commit-transaction
19+
"Upon receiving a request to create a new ledger, an event
20+
message must be queued into the consensus state machine.
21+
22+
Format is [event-name event-body]"
23+
[ledger-id tx-id txn opts]
24+
[:tx-queue {:txn txn
25+
:size (count txn)
26+
:tx-id tx-id
27+
:ledger-id ledger-id
28+
:opts opts
29+
:instant (System/currentTimeMillis)}])
30+
31+
(defn transaction-committed
32+
"Post-transaction, the message we will broadcast out and/or deliver
33+
to a client awaiting a response."
34+
([{:keys [ledger-id tx-id] :as _event-params}
35+
{:keys [db data-file-meta commit-file-meta] :as _commit-result}]
36+
{:ledger-id ledger-id
37+
:data-file-meta data-file-meta
38+
:commit-file-meta commit-file-meta
39+
;; below is metadata for quickly validating into the state machine, not retained
40+
:t (:t db) ;; for quickly validating this is the next 'block'
41+
:tx-id tx-id ;; for quickly removing from the queue
42+
})
43+
([processing-server event-params commit-result]
44+
(-> (transaction-committed event-params commit-result)
45+
(assoc :server processing-server))))
46+
47+
(defn error
48+
([params exception]
49+
(-> params
50+
(select-keys [:ledger-id :tx-id])
51+
(assoc :ex-message (ex-message exception)
52+
:ex-data (ex-data exception))))
53+
([processing-server params exception]
54+
(-> (error params exception)
55+
(assoc :server processing-server))))

src/fluree/server/consensus/raft.clj

+5-33
Original file line numberDiff line numberDiff line change
@@ -322,41 +322,13 @@
322322
;; send command to leader
323323
(send-rpc raft leader :new-command command-data nil)))))))
324324

325-
(defn queue-new-ledger-raft
326-
"Queues a new ledger into the consensus layer for processing.
327-
Returns a core async channel that will eventually contain true if successful."
328-
[group ledger-id tx-id txn opts]
329-
(log/debug "Consensus - queue new ledger:" ledger-id tx-id txn)
330-
(new-entry-async
331-
group
332-
[:ledger-create {:txn txn
333-
:size (count txn)
334-
:tx-id tx-id
335-
:ledger-id ledger-id
336-
:opts opts
337-
:instant (System/currentTimeMillis)}]))
338-
339-
(defn queue-new-transaction-raft
340-
"Queues a new transaction into the consensus layer for processing.
341-
Returns a core async channel that will eventually contain a truthy value if successful."
342-
[group ledger-id tx-id txn opts]
343-
(log/trace "queue-new-transaction txn:" txn)
344-
(new-entry-async
345-
group
346-
[:tx-queue {:txn txn
347-
:size (count txn)
348-
:tx-id tx-id
349-
:ledger-id ledger-id
350-
:opts opts
351-
:instant (System/currentTimeMillis)}]))
352-
353325
(defrecord RaftGroup [state-atom event-chan command-chan this-server port
354326
close raft raft-initialized open-api private-keys]
355-
consensus/TxGroup
356-
(queue-new-ledger [group ledger-id tx-id txn opts]
357-
(queue-new-ledger-raft group ledger-id tx-id txn opts))
358-
(queue-new-transaction [group ledger-id tx-id txn opts]
359-
(queue-new-transaction-raft group ledger-id tx-id txn opts)))
327+
consensus/Transactor
328+
(-queue-new-ledger [group ledger-msg]
329+
(new-entry-async group ledger-msg))
330+
(-queue-new-transaction [group txn-msg]
331+
(new-entry-async group txn-msg)))
360332

361333
(defn leader-change-fn
362334
"Function called every time there is a leader change to provide any extra

src/fluree/server/consensus/raft/handlers/create_ledger.clj

+5-10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
[fluree.db.constants :as const]
44
[fluree.db.util.log :as log]
55
[fluree.raft.leader :refer [is-leader?]]
6+
[fluree.server.consensus.events :as events]
67
[fluree.server.consensus.raft.participant :as participant]
78
[fluree.server.consensus.raft.producers.new-index-file :as new-index-file]
89
[fluree.server.handlers.shared :refer [deref!]]))
@@ -55,16 +56,10 @@
5556
"Pushes create-ledger request in consensus only if leader.
5657
5758
Returns promise that will have the eventual response once committed."
58-
[{:keys [consensus/raft-state] :as config}
59-
{:keys [ledger-id tx-id] :as _params}
60-
{:keys [db data-file-meta commit-file-meta]}]
61-
(let [created-body {:ledger-id ledger-id
62-
:data-file-meta data-file-meta
63-
:commit-file-meta commit-file-meta
64-
;; below is metadata for quickly validating into the state machine, not retained
65-
:t (:t db) ;; for quickly validating this is the next 'block'
66-
:tx-id tx-id ;; for quickly removing from the queue
67-
:server (participant/this-server raft-state)}] ;; for quickly ensuring this server *is* still the leader
59+
[{:keys [consensus/raft-state] :as config} params commit-result]
60+
(let [created-body (events/transaction-committed ; same as new-commit message
61+
(participant/this-server raft-state)
62+
params commit-result)]
6863

6964
;; returns promise
7065
(participant/leader-new-command! config :ledger-created created-body)))

src/fluree/server/consensus/raft/handlers/ledger_created.clj

+4-15
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
[clojure.java.io :as io]
44
[fluree.db.util.filesystem :as fs]
55
[fluree.db.util.log :as log]
6-
[fluree.server.consensus.raft.handlers.new-commit :as new-commit]
7-
[fluree.server.subscriptions :as subscriptions]
8-
[fluree.server.watcher :as watcher])
6+
[fluree.server.consensus.broadcast :as broadcast]
7+
[fluree.server.consensus.raft.handlers.new-commit :as new-commit])
98
(:import (java.io File)))
109

1110
(set! *warn-on-reflection* true)
@@ -47,12 +46,6 @@
4746
:error :db/unexpected-error}
4847
e)))))
4948

50-
(defn return-success-response
51-
[watcher {:keys [ledger-id server tx-id] :as params} state-map]
52-
(log/info (str "New Ledger successfully created by server " server ": " ledger-id " with tx-id: " tx-id "."))
53-
(watcher/deliver-watch watcher tx-id params)
54-
(get-in state-map [:ledgers ledger-id]))
55-
5649
(defn clean-up-files
5750
[{:keys [fluree/conn] :as _config} {:keys [ledger-id] :as _params}]
5851
(let [local-path (fs/local-path (:storage-path conn))
@@ -90,9 +83,5 @@
9083

9184
(defn broadcast!
9285
"Responsible for producing the event broadcast to connected peers."
93-
[{:keys [fluree/watcher fluree/subscriptions] :as _config}
94-
{:keys [ledger-id server tx-id commit-file-meta] :as handler-result}]
95-
(log/info (str "New Ledger successfully created by server " server ": " ledger-id " with tx-id: " tx-id "."))
96-
(watcher/deliver-watch watcher tx-id handler-result)
97-
(subscriptions/send-message-to-all subscriptions "ledger-created" ledger-id (:json commit-file-meta))
98-
:success) ;; result of this function is not used
86+
[{:keys [fluree/watcher fluree/subscriptions] :as _config} handler-result]
87+
(broadcast/announce-new-ledger! subscriptions watcher handler-result))

src/fluree/server/consensus/raft/handlers/new_commit.clj

+3-8
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
[fluree.db.storage :as storage]
55
[fluree.db.util.async :refer [<? go-try]]
66
[fluree.db.util.log :as log]
7-
[fluree.server.subscriptions :as subscriptions]
8-
[fluree.server.watcher :as watcher]))
7+
[fluree.server.consensus.broadcast :as broadcast]))
98

109
(set! *warn-on-reflection* true)
1110

@@ -85,9 +84,5 @@
8584

8685
(defn broadcast!
8786
"Responsible for producing the event broadcast to connected peers."
88-
[{:keys [fluree/watcher fluree/subscriptions] :as _config}
89-
{:keys [ledger-id tx-id server commit-file-meta] :as commit-result}]
90-
(log/info "New transaction completed for" ledger-id "tx-id: " tx-id "by server:" server)
91-
(watcher/deliver-watch watcher tx-id commit-result)
92-
(subscriptions/send-message-to-all subscriptions "new-commit" ledger-id (:json commit-file-meta))
93-
:success) ;; result of this function is not used
87+
[{:keys [fluree/watcher fluree/subscriptions] :as _config} commit-result]
88+
(broadcast/announce-new-commit! subscriptions watcher commit-result))

src/fluree/server/consensus/raft/handlers/tx_exception.clj

+3-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
(ns fluree.server.consensus.raft.handlers.tx-exception
22
(:require [fluree.db.util.log :as log]
3-
[fluree.server.watcher :as watcher]))
3+
[fluree.server.consensus.broadcast :as broadcast]))
44

55
(defn update-ledger-state
66
"Updates the latest commit in the ledger, and removes the processed transaction in the queue"
@@ -27,10 +27,8 @@
2727
e)))))
2828

2929
(defn broadcast!
30-
[{:keys [fluree/watcher] :as _config}
31-
{:keys [tx-id ex-message ex-data] :as _exception-meta}]
32-
(log/debug "Delivering tx-exception to watcher with msg/data: " ex-message ex-data)
33-
(watcher/deliver-watch watcher tx-id (ex-info ex-message ex-data)))
30+
[{:keys [fluree/watcher] :as _config} exception-meta]
31+
(broadcast/announce-error! watcher exception-meta))
3432

3533
(defn handler
3634
"Handles transaction exceptions and broadcasts them to network."

src/fluree/server/consensus/raft/producers/new_commit.clj

+6-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
(ns fluree.server.consensus.raft.producers.new-commit
2-
(:require [fluree.server.consensus.raft.participant :as participant]))
2+
(:require [fluree.server.consensus.events :as events]
3+
[fluree.server.consensus.raft.participant :as participant]))
34

45
(set! *warn-on-reflection* true)
56

@@ -9,16 +10,10 @@
910
This is called with new commits immediately after processing a transaction.
1011
1112
Returns promise that will have the eventual response once committed."
12-
[{:keys [consensus/raft-state] :as config}
13-
{:keys [ledger-id tx-id] :as _params}
14-
{:keys [db data-file-meta commit-file-meta]}]
15-
(let [created-body {:ledger-id ledger-id
16-
:data-file-meta data-file-meta
17-
:commit-file-meta commit-file-meta
18-
;; below is metadata for quickly validating into the state machine, not retained
19-
:t (:t db) ;; for quickly validating this is the next 'block'
20-
:tx-id tx-id ;; for quickly removing from the queue
21-
:server (participant/this-server raft-state)}] ;; for quickly ensuring this server *is* still the leader
13+
[{:keys [consensus/raft-state] :as config} params commit-result]
14+
(let [created-body (events/transaction-committed
15+
(participant/this-server raft-state)
16+
params commit-result)]
2217

2318
;; returns promise
2419
(participant/leader-new-command! config :new-commit created-body)))
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
(ns fluree.server.consensus.raft.producers.tx-exception
2-
(:require [fluree.server.consensus.raft.participant :as participant]))
2+
(:require [fluree.server.consensus.events :as events]
3+
[fluree.server.consensus.raft.participant :as participant]))
34

45
(defn consensus-push-tx-exception
5-
[{:keys [consensus/raft-state] :as config}
6-
{:keys [ledger-id tx-id] :as _params}
7-
tx-exception]
8-
(let [created-body {:ledger-id ledger-id
9-
:ex-message (ex-message tx-exception)
10-
:ex-data (ex-data tx-exception)
11-
:tx-id tx-id ;; for quickly removing from the queue
12-
:server (participant/this-server raft-state)}]
6+
[{:keys [consensus/raft-state] :as config} params tx-exception]
7+
(let [server (participant/this-server raft-state)
8+
error-msg (events/error server params tx-exception)]
139
;; returns promise
14-
(participant/leader-new-command! config :tx-exception created-body)))
10+
(participant/leader-new-command! config :tx-exception error-msg)))

0 commit comments

Comments
 (0)