Skip to content

Commit

Permalink
Safe GC
Browse files Browse the repository at this point in the history
  • Loading branch information
tonsky committed Aug 3, 2023
1 parent ab4cc23 commit 9b06325
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 94 deletions.
25 changes: 8 additions & 17 deletions src/datascript/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -652,14 +652,6 @@
(callback report))
db)))

#?(:clj
(defn collect-garbage-conn!
"Removes everything from storage that isn’t directly reachabel from current DB roots"
[conn]
{:pre [(conn? conn)
(some? (storage/storage @conn))]}
(storage/collect-garbage! @(:db-last-stored (meta conn)))))

(defn- atom? [a]
#?(:cljs (instance? Atom a)
:clj (instance? clojure.lang.IAtom a)))
Expand Down Expand Up @@ -819,17 +811,16 @@
storage/restore))

#?(:clj
(def ^{:arglists '([& dbs])} addresses
"Returns all addresses in use by current db. Anything that is not in
the return set is safe to be deleted"
storage/addresses))
(defn addresses
"Returns all addresses in use by current db (as java.util.HashSet).
Anything that is not in the return set is safe to be deleted"
[& dbs]
(storage/addresses dbs)))

#?(:clj
(def ^{:arglists '([& dbs])} collect-garbage!
"Deletes all keys from storage that are not referenced by any of the provided dbs.
Careful! If you have a lazy-loaded database and do GC on a newer version of it,
old version might stop working. Make sure to always pass all currently used db references.
Has a side-effect of fully loading database into memory"
(def ^{:arglists '([storage])} collect-garbage!
"Deletes all keys from storage that are not referenced by any of the currently alive db refs.
Has a side-effect of fully loading databases fully into memory, so, can be slow"
storage/collect-garbage!))

#?(:clj
Expand Down
146 changes: 93 additions & 53 deletions src/datascript/storage.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
[me.tonsky.persistent-sorted-set :as set])
(:import
[datascript.db Datom]
[java.util List HashSet]
[java.io BufferedOutputStream File FileOutputStream OutputStream PushbackReader]
[java.lang.ref WeakReference]
[java.util ArrayList HashSet Iterator List]
[me.tonsky.persistent_sorted_set ANode Branch Leaf PersistentSortedSet RefType Settings]))

(defprotocol IStorage
Expand Down Expand Up @@ -60,41 +61,50 @@
addr))
(restore [_ addr]
(let [{:keys [level keys addresses]} (-restore storage addr)
keys' ^List (map (fn [[e a v tx]] (db/datom e a v tx)) keys)]
^List keys' (map (fn [[e a v tx]] (db/datom e a v tx)) keys)]
(if addresses
(Branch. (int level) keys' ^List addresses settings)
(Leaf. keys' settings)))))

(defn make-storage-adapter [storage opts]
(let [settings (@#'set/map->settings opts)]
(->StorageAdapter storage settings)))
(StorageAdapter. storage settings)))

(defn storage-adapter ^StorageAdapter [db]
(.-_storage ^PersistentSortedSet (:eavt db)))
(when db
(.-_storage ^PersistentSortedSet (:eavt db))))

(defn storage [db]
(when-some [adapter (storage-adapter db)]
(:storage adapter)))

(def ^:private ^List stored-dbs
(ArrayList.))

(defn- remember-db [db]
(.add stored-dbs (WeakReference. db)))

(defn store-impl! [db adapter]
(binding [*store-buffer* (volatile! (transient []))]
(let [eavt-addr (set/store (:eavt db) adapter)
aevt-addr (set/store (:aevt db) adapter)
avet-addr (set/store (:avet db) adapter)
meta (merge
{:schema (:schema db)
:max-eid (:max-eid db)
:max-tx (:max-tx db)
:eavt eavt-addr
:aevt aevt-addr
:avet avet-addr
:max-addr @*max-addr}
(set/settings (:eavt db)))]
(when (pos? (count @*store-buffer*))
(vswap! *store-buffer* conj! [root-addr meta])
(vswap! *store-buffer* conj! [tail-addr []])
(-store (:storage adapter) (persistent! @*store-buffer*)))
db)))
(locking (:storage adapter)
(remember-db db)
(binding [*store-buffer* (volatile! (transient []))]
(let [eavt-addr (set/store (:eavt db) adapter)
aevt-addr (set/store (:aevt db) adapter)
avet-addr (set/store (:avet db) adapter)
meta (merge
{:schema (:schema db)
:max-eid (:max-eid db)
:max-tx (:max-tx db)
:eavt eavt-addr
:aevt aevt-addr
:avet avet-addr
:max-addr @*max-addr}
(set/settings (:eavt db)))]
(when (pos? (count @*store-buffer*))
(vswap! *store-buffer* conj! [root-addr meta])
(vswap! *store-buffer* conj! [tail-addr []])
(-store (:storage adapter) (persistent! @*store-buffer*)))
db))))

(defn store!
([db]
Expand All @@ -115,20 +125,22 @@
(-store (storage db) [[tail-addr (mapv #(mapv serializable-datom %) tail)]]))

(defn restore-impl [storage opts]
(let [root (-restore storage root-addr)
tail (-restore storage tail-addr)
{:keys [schema eavt aevt avet max-eid max-tx max-addr]} root
_ (vswap! *max-addr max max-addr)
opts (merge root opts)
adapter (make-storage-adapter storage opts)
db (db/restore-db
{:schema schema
:eavt (set/restore-by db/cmp-datoms-eavt eavt adapter opts)
:aevt (set/restore-by db/cmp-datoms-aevt aevt adapter opts)
:avet (set/restore-by db/cmp-datoms-avet avet adapter opts)
:max-eid max-eid
:max-tx max-tx})]
[db (mapv #(mapv (fn [[e a v tx]] (db/datom e a v tx)) %) tail)]))
(locking storage
(let [root (-restore storage root-addr)
tail (-restore storage tail-addr)
{:keys [schema eavt aevt avet max-eid max-tx max-addr]} root
_ (vswap! *max-addr max max-addr)
opts (merge root opts)
adapter (make-storage-adapter storage opts)
db (db/restore-db
{:schema schema
:eavt (set/restore-by db/cmp-datoms-eavt eavt adapter opts)
:aevt (set/restore-by db/cmp-datoms-aevt aevt adapter opts)
:avet (set/restore-by db/cmp-datoms-avet avet adapter opts)
:max-eid max-eid
:max-tx max-tx})]
(remember-db db)
[db (mapv #(mapv (fn [[e a v tx]] (db/datom e a v tx)) %) tail)])))

(defn db-with-tail [db tail]
(reduce
Expand All @@ -143,28 +155,52 @@
(let [[db tail] (restore-impl storage opts)]
(db-with-tail db tail))))

(defn- addresses-impl [db *set]
(defn- addresses-impl [db visit-fn]
{:pre [(db/db? db)]}
(let [visit-fn #(vswap! *set conj! %)]
(let []
(.walkAddresses ^PersistentSortedSet (:eavt db) visit-fn)
(.walkAddresses ^PersistentSortedSet (:aevt db) visit-fn)
(.walkAddresses ^PersistentSortedSet (:avet db) visit-fn)))

(defn addresses [& dbs]
(let [*set (volatile! (transient #{root-addr tail-addr}))]
(defn ^HashSet addresses [dbs]
(let [set (HashSet.)
visit-fn #(.add set %)]
(.add set root-addr)
(.add set tail-addr)
(doseq [db dbs]
(addresses-impl db *set))
(persistent! @*set)))

(defn collect-garbage! [& dbs]
(let [used (apply addresses dbs)]
(doseq [db dbs
:let [storage (storage db)]
:when storage
:let [unused (->> (-list-addresses storage)
(remove used)
(vec))]]
(-delete storage unused))))
(addresses-impl db visit-fn))
set))

(defn- read-stored-dbs [storage']
(let [iter ^Iterator (.iterator stored-dbs)]
(loop [res (transient [])]
(if (.hasNext iter)
(let [ref ^WeakReference (.next iter)
db (.get ref)]
(cond
(nil? db)
(do
(.remove iter)
(recur res))

(identical? (storage db) storage')
(recur (conj! res db))

:else
(recur res)))
(persistent! res)))))

(defn collect-garbage! [storage']
(System/gc) ;; we want all unnecessary weak refs to die as much as possible
(locking storage'
(let [dbs (conj
(read-stored-dbs storage')
(restore storage')) ;; make sure we won’t gc currently stored db
used (addresses dbs)
all (-list-addresses storage')
unused (into [] (remove #(.contains used %)) all)]
(util/log "GC: found" (count dbs) "alive db refs," (count used) "used addrs," (count all) "total addrs," (count unused) "unused")
(-delete storage' unused))))

(defn- output-stream
"OutputStream that ignores flushes. Workaround for slow transit-clj JSON writer.
Expand Down Expand Up @@ -205,17 +241,21 @@
(reify IStorage
(-store [_ addr+data-seq]
(doseq [[addr data] addr+data-seq]
(util/log "store" (addr->filename-fn addr))
(with-open [os (output-stream (io/file dir (addr->filename-fn addr)))]
(write-fn os data))))

(-restore [_ addr]
(util/log "restore" (addr->filename-fn addr))
(with-open [is (io/input-stream (io/file dir (addr->filename-fn addr)))]
(read-fn is)))

(-list-addresses [_]
(mapv #(-> ^File % .getName filename->addr-fn)
(into []
(keep #(-> ^File % .getName filename->addr-fn))
(.listFiles (io/file dir))))

(-delete [_ addrs-seq]
(doseq [addr addrs-seq]
(util/log "deleting" (addr->filename-fn addr))
(.delete (io/file dir (addr->filename-fn addr)))))))))
7 changes: 7 additions & 0 deletions src/datascript/util.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
(:import
[java.util UUID])))

(def ^:dynamic *debug*
false)

(defmacro log [& body]
`(when *debug*
(println ~@body)))

(defn- rand-bits [pow]
(rand-int (bit-shift-left 1 pow)))

Expand Down
65 changes: 41 additions & 24 deletions test/datascript/test/storage.clj
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,35 @@
(is (= (:avet db) (:avet db')))))))))))

(deftest test-gc
(let [storage (make-storage)
db (large-db {:storage storage})]
(d/store! db)
(is (= 135 (count (d/addresses db))))
(is (= 135 (count (storage/-list-addresses storage))))
(is (= (d/addresses db) (set (storage/-list-addresses storage))))
(let [storage (make-storage)]
(let [db (large-db {:storage storage})]
(d/store! db)
(is (= 135 (count (d/addresses db))))
(is (= 135 (count (storage/-list-addresses storage))))
(is (= (d/addresses db) (set (storage/-list-addresses storage))))

(let [db' (d/db-with db [[:db/add 1001 :str "1001"]])]
(d/store! db')
(is (> (count (storage/-list-addresses storage))
(count (d/addresses db'))))

(d/collect-garbage! db')
(is (= (d/addresses db') (set (storage/-list-addresses storage))))
(let [db' (d/db-with db [[:db/add 1001 :str "1001"]])]
(d/store! db')
(is (> (count (storage/-list-addresses storage))
(count (d/addresses db'))))

(is (= 6 (count @(:*deletes storage)))))))
;; no GC because both dbs are alive
(d/collect-garbage! storage)
(is (= (into (set (d/addresses db))
(set (d/addresses db')))
(set (storage/-list-addresses storage))))
(is (= 0 (count @(:*deletes storage))))))

;; if we lose other refs, GC will happen
(let [db'' (d/restore storage)]
(d/collect-garbage! storage)
(is (= (d/addresses db'') (set (storage/-list-addresses storage))))
(is (= 6 (count @(:*deletes storage)))))

(testing "don’t delete currently stored db"
(System/gc)
(d/collect-garbage! storage)
(is (pos? (count (storage/-list-addresses storage)))))))

(deftest test-conn
(let [storage (make-storage)
Expand Down Expand Up @@ -274,7 +287,7 @@
(is (> (count (storage/-list-addresses storage))
(count (d/addresses @(:db-last-stored (meta conn''))))))

(d/collect-garbage-conn! conn'')
(d/collect-garbage! storage)
(is (= (count (storage/-list-addresses storage))
(count (d/addresses @(:db-last-stored (meta conn''))))))

Expand All @@ -290,21 +303,25 @@
(transit/read (transit/reader is :json)))]
(def db (d/from-serializable serializable {:branching-factor 512}))
(count db))

(d/store! db (streaming-edn-storage "target/db_streaming_edn")) ;; ~10 sec
(d/store! db (inmemory-edn-storage "target/db_inmemory_edn")) ;; ~10 sec
(d/store! db (streaming-transit-json-storage "target/db_streaming_transit_json")) ;; ~7.5 sec
(d/store! db (inmemory-transit-json-storage "target/db_inmemory_transit_json")) ;; ~6.4 sec
(d/store! db (streaming-transit-msgpack-storage "target/db_streaming_transit_msgpack")) ;; ~6.3 sec

(count db)

(def storage
(streaming-edn-storage "target/db_streaming_edn"))
(streaming-edn-storage "target/db_streaming_edn")
#_(inmemory-edn-storage "target/db_inmemory_edn")
#_(streaming-transit-json-storage "target/db_streaming_transit_json")
#_(inmemory-transit-json-storage "target/db_inmemory_transit_json")
#_(streaming-transit-msgpack-storage "target/db_streaming_transit_msgpack"))

(d/store! db storage)

(def db' (d/restore storage))
(def db'
(d/restore storage))

(count (d/addresses db))
(count (d/addresses db'))
(count (storage/-list-addresses storage))
(d/collect-garbage! db')
(d/collect-garbage! storage)

(first (:eavt db'))

Expand Down

0 comments on commit 9b06325

Please sign in to comment.