Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 89 additions & 4 deletions src/fluree/db/api.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
[fluree.db.json-ld.iri :as iri]
[fluree.db.json-ld.policy :as policy]
[fluree.db.ledger :as ledger]
[fluree.db.nameservice :as nameservice]
[fluree.db.nameservice.query :as ns-query]
[fluree.db.nameservice.storage :as nameservice.storage]
[fluree.db.query.api :as query-api]
[fluree.db.query.fql.parse :as parse]
[fluree.db.query.range :as query-range]
Expand Down Expand Up @@ -46,11 +48,14 @@
(resolve res))))))))

(defn- validate-connection
"Throws exception if x is not a valid connection"
"Throws exception if x is not a valid connection or is disconnecting"
[x]
(when-not (connection? x)
(throw (ex-info "Unable to create new ledger, connection is not valid. fluree/connect returns a promise, did you deref it?"
{:status 400 :error :db/invalid-connection}))))
{:status 400 :error :db/invalid-connection})))
(when (-> x :state deref :disconnecting?)
(throw (ex-info "Connection is disconnecting, cannot perform new operations"
{:status 400 :error :db/connection-disconnecting}))))

(defn connect
"Creates a connection from a JSON-LD configuration map.
Expand All @@ -73,12 +78,88 @@

(defn disconnect
"Terminates a connection and releases associated resources.

First stops the idle cleanup loop (if running), then releases all cached ledgers
(cleaning up their channels, subscriptions, and background processes), and finally
terminates the connection's system resources.

Returns a promise that resolves when disconnection is complete."
[conn]
(validate-connection conn)
(promise-wrap
(go-try
(let [{:keys [state]} conn]
(swap! state assoc :disconnecting? true))

(when-let [cleanup-ch (:idle-cleanup-ch conn)]
(log/debug "Stopping idle cleanup loop")
(async/close! cleanup-ch))

(let [{:keys [state]} conn
ledger-aliases (-> @state :ledger keys)
num-ledgers (count ledger-aliases)]
(log/debug "Disconnecting - releasing" num-ledgers "cached ledgers")
(when (pos? num-ledgers)
(let [release-chs (mapv #(connection/release-ledger conn %) ledger-aliases)
merged-ch (async/merge release-chs)]
(dotimes [_ num-ledgers]
(let [result (async/<! merged-ch)]
(when (util/exception? result)
(log/warn result "Error releasing ledger during disconnect")))))))

(-> conn ::system-map system/terminate))))

(defn release-ledger
"Releases a ledger from the connection cache and cleans up all associated resources.

This removes the ledger from memory and stops all background processes including:
- Index queue processing and go-loops
- Nameservice subscription monitoring
- All async channels associated with the ledger

Useful for freeing resources when a ledger is no longer needed, or can be called
automatically via an idle timeout mechanism.

Parameters:
conn - Connection object
ledger-alias - Ledger alias (with optional :branch) to release

Returns a promise that resolves to :released when cleanup is complete."
[conn ledger-alias]
(validate-connection conn)
(let [normalized-alias (util.ledger/ensure-ledger-branch ledger-alias)]
(promise-wrap
(connection/release-ledger conn normalized-alias))))

(defn indexing-status
"Returns the current indexing status for a ledger, or nil if not indexing.

Queries the nameservice to determine if background indexing is in progress
for the specified ledger.

Parameters:
conn - Connection object
ledger-alias - Ledger alias (with optional :branch) to query

Returns a promise that resolves to a map with indexing metadata:
{:target-t <t-value> ; The 't' value being indexed
:started <iso-8601>} ; ISO-8601 timestamp when indexing started

Returns nil if the ledger is not currently indexing.

Example:
@(indexing-status conn \"my-ledger\")
;; => {:target-t 123, :started \"2025-11-02T10:30:00Z\"}
;; or nil if not indexing"
[conn ledger-alias]
(validate-connection conn)
(let [normalized-alias (util.ledger/ensure-ledger-branch ledger-alias)]
(promise-wrap
(go-try
(when-let [primary-publisher (:primary-publisher conn)]
(when-let [ns-record (<? (nameservice/lookup primary-publisher normalized-alias))]
(nameservice.storage/get-indexing-status ns-record)))))))

(defn convert-config-key
[[k v]]
(if (#{:id :type} k)
Expand All @@ -101,7 +182,9 @@
Options map (all optional):
:parallelism - Number of parallel operations (default: 4)
:cache-max-mb - Maximum cache size in MB (default: half of JVM -Xmx, or 1000 MB for Node.js)
:defaults - Default settings map for operations"
:defaults - Default settings map for operations, which can include:
:ledger-cache-idle-minutes - Minutes of inactivity before releasing a ledger
from cache (default: nil, disabled)"
([]
(connect-memory {}))
([{:keys [parallelism cache-max-mb defaults]}]
Expand Down Expand Up @@ -132,7 +215,9 @@
When provided, all data will be encrypted using AES-256-CBC with PKCS5 padding.
The key should be exactly 32 bytes long for optimal security.
Example: \"my-secret-32-byte-encryption-key!\"
- defaults (optional): Default options for ledgers created with this connection
- defaults (optional): Default options for ledgers created with this connection, which can include:
:ledger-cache-idle-minutes - Minutes of inactivity before releasing a ledger
from cache (default: nil, disabled)

Returns a core.async channel that resolves to a connection, or an exception if
the connection cannot be established.
Expand Down
84 changes: 62 additions & 22 deletions src/fluree/db/branch.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -117,30 +117,70 @@
db))

(defn index-queue
[publishers branch-state]
[publishers branch-state ledger-alias]
(let [buf (async/sliding-buffer 1)
queue (async/chan buf)]
queue (async/chan buf)
;; Find primary publisher (first one, typically storage nameservice)
primary-publisher (first publishers)]
(go-loop [last-index-commit nil]
(when-let [{:keys [db index-files-ch complete-ch]} (<! queue)]
(let [db* (use-latest-index db last-index-commit branch-state)
result (try*
(let [indexed-db (<? (indexer/index db* index-files-ch)) ; indexer/index always returns a FlakeDB (never AsyncDB)
[{prev-commit :commit} {indexed-commit :commit}]
(swap-vals! branch-state update-index indexed-db)]
(if-not (= prev-commit indexed-commit)
(do (log/debug "Publishing new index commit:" indexed-commit)
(let [commit-jsonld (commit-data/->json-ld indexed-commit)]
(nameservice/publish-to-all commit-jsonld publishers)))
(log/debug "Not publishing unchanged index commit:" indexed-commit))
{:status :success, :db indexed-db, :commit indexed-commit})
(catch* e
(log/error e "Error updating index")
{:status :error, :error e}))]
(when complete-ch
(async/put! complete-ch result))
(if (= :success (:status result))
(recur (:commit result))
(recur last-index-commit)))))
(let [db* (use-latest-index db last-index-commit branch-state)
target-t (commit-data/t (:commit db*))
machine-id (util/machine-id)
start-result (try*
(<? (nameservice/index-start primary-publisher ledger-alias target-t machine-id))
(catch* e
(log/warn e "Failed to start indexing")
{:status :error}))]
(if (= :started (:status start-result))
;; Start heartbeat loop
(let [heartbeat-ch (async/chan)]
(async/go-loop []
(let [[_ ch] (async/alts! [(async/timeout 60000) heartbeat-ch])]
(when-not (= ch heartbeat-ch) ; timeout fired, not stop signal
(try*
(<? (nameservice/index-heartbeat primary-publisher ledger-alias))
(log/debug "Published heartbeat for" ledger-alias)
(catch* e
(log/warn e "Failed to publish heartbeat")))
(recur))))

;; Perform indexing
(let [result (try*
(let [indexed-db (<? (indexer/index db* index-files-ch))
[{prev-commit :commit} {indexed-commit :commit}]
(swap-vals! branch-state update-index indexed-db)]
(if-not (= prev-commit indexed-commit)
(do (log/debug "Publishing new index commit:" indexed-commit)
(let [commit-jsonld (commit-data/->json-ld indexed-commit)]
(nameservice/publish-to-all commit-jsonld publishers)))
(log/debug "Not publishing unchanged index commit:" indexed-commit))
{:status :success, :db indexed-db, :commit indexed-commit})
(catch* e
(log/error e "Error updating index")
{:status :error, :error e}))]

;; Stop heartbeat
(async/close! heartbeat-ch)

;; Finish indexing (clears status)
(try*
(<? (nameservice/index-finish primary-publisher ledger-alias))
(catch* e
(log/warn e "Failed to finish indexing")))

(when complete-ch
(async/put! complete-ch result))
(if (= :success (:status result))
(recur (:commit result))
(recur last-index-commit))))

;; Indexing already in progress or failed to start
(do
(log/warn "Skipping indexing - already in progress or failed to start:" start-result)
(when complete-ch
(async/put! complete-ch {:status :skipped, :reason start-result}))
(recur last-index-commit))))))
queue))

(defn enqueue-index!
Expand All @@ -159,7 +199,7 @@
commit-jsonld commit-map indexing-opts)
state (atom {:commit commit-map
:current-db initial-db})
idx-q (index-queue publishers state)]
idx-q (index-queue publishers state alias)]
{:name branch-name
:alias alias
:state state
Expand Down
Loading