From a3ebe6e845bb45096fa11a9bfd13a18d448d8532 Mon Sep 17 00:00:00 2001 From: Aaron Dixon Date: Mon, 19 Dec 2022 14:30:20 -0600 Subject: [PATCH] Avoid returning duplicates when joining on indexed tags, support universal [{}] filters, plus some optmizations --- src/me/untethr/nostr/app.clj | 168 ++++++++++++++++------------ src/me/untethr/nostr/query.clj | 20 +++- src/me/untethr/nostr/subscribe.clj | 2 +- src/me/untethr/nostr/validation.clj | 12 +- test/test/app_test.clj | 17 +++ test/test/query_test.clj | 12 +- test/test/subscribe_test.clj | 67 ++++++++--- test/test/support.clj | 5 +- test/test/test_data.clj | 9 +- test/test/validation_test.clj | 18 +-- 10 files changed, 212 insertions(+), 118 deletions(-) create mode 100644 test/test/app_test.clj diff --git a/src/me/untethr/nostr/app.clj b/src/me/untethr/nostr/app.clj index 6a1f2e4..7631f30 100644 --- a/src/me/untethr/nostr/app.clj +++ b/src/me/untethr/nostr/app.clj @@ -10,6 +10,7 @@ [me.untethr.nostr.fulfill :as fulfill] [me.untethr.nostr.json-facade :as json-facade] [me.untethr.nostr.metrics :as metrics] + [me.untethr.nostr.query :as query] [me.untethr.nostr.store :as store] [me.untethr.nostr.subscribe :as subscribe] [me.untethr.nostr.validation :as validation] @@ -78,15 +79,6 @@ true (format "duplicate:%s" ok-message-str)))) -(defn- handle-duplicate-event! - [metrics ch event ok-message-str] - (metrics/duplicate-event! metrics) - (hk/send! ch - (create-ok-message - (:id event) - true - (format "duplicate:%s" ok-message-str)))) - (defn- handle-stored-event! [metrics ch event ok-message-str] (metrics/duplicate-event! metrics) @@ -142,7 +134,7 @@ (subscribe/notify! metrics subs-atom e raw-event))) (handle-invalid-event! metrics ch e verified-event-or-err-map)))) -(def max-filters 12) +(def max-filters 15) ;; some clients may still send legacy filter format that permits singular id ;; in filter; so we'll support this for a while. @@ -154,52 +146,70 @@ (not (contains? f :ids))) (-> (assoc :ids [(:id f)]) (dissoc :id)))) +(defn- prepare-req-filters* + [req-filters] + (->> req-filters + ;; conform... + (map (comp validation/conform-filter-lenient + interpret-legacy-filter)) + ;; remove null filters (ie filters that could never match anything)... + (filter (complement validation/filter-has-empty-attr?)) + ;; remove duplicates... + distinct + vec)) + (defn- receive-req [metrics db subs-atom fulfill-atom channel-id ch [_ req-id & req-filters]] - (let [use-req-filters (mapv (comp validation/conform-filter-lenient - interpret-legacy-filter) req-filters)] - (if-let [err (validation/req-err req-id use-req-filters)] - (log/warn "invalid req" {:err err :req [req-id (vec use-req-filters)]}) - (do - ;; just in case we're still fulfilling prior subscription w/ same req-id - (fulfill/cancel! fulfill-atom channel-id req-id) - (subscribe/unsubscribe! subs-atom channel-id req-id) - (when-not (validation/filters-empty? use-req-filters) - (if (> (subscribe/num-filters subs-atom channel-id) max-filters) - (do - (metrics/inc-excessive-filters! metrics) - (hk/send! ch - (create-notice-message - (format - (str - "Too many subscription filters." - " Max allowed is %d, but you have %d.") - max-filters - (subscribe/num-filters subs-atom channel-id))))) - (do - ;; subscribe first so we are guaranteed to dispatch new arrivals - (metrics/time-subscribe! metrics - (subscribe/subscribe! subs-atom channel-id req-id use-req-filters - (fn [raw-event] - ;; "some" safety if we're notified and our channel has closed, - ;; but we've not yet unsubscribed in response; this isn't thread - ;; safe so could still see channel close before the send!; - ;; upstream observer invocation should catch and log. - (when (hk/open? ch) - (hk/send! ch (create-event-message req-id raw-event)))))) - ;; after subscription, capture fulfillment target rowid; in rare cases we - ;; may double-deliver an event or few but we will never miss an event - (if-let [target-row-id (store/max-event-rowid db)] - (fulfill/submit! metrics db fulfill-atom channel-id req-id use-req-filters target-row-id - (fn [raw-event] - ;; see note above; we may see channel close before we cancel - ;; fulfillment - (when (hk/open? ch) - (hk/send! ch (create-event-message req-id raw-event)))) - (fn [] - (hk/send! ch (create-eose-message req-id)))) - ;; should only occur on epochal first event - (log/warn "no max rowid; nothing yet to fulfill"))))))))) + (if-not (every? map? req-filters) + (log/warn "invalid req" {:msg "expected objects"}) + ;; else -- req has basic form... + (let [use-req-filters (prepare-req-filters* req-filters) + req-err (validation/req-err req-id use-req-filters)] + (if req-err + (log/warn "invalid req" {:req-err req-err :req [req-id use-req-filters]}) + ;; else -- + (do + ;; just in case we're still fulfilling prior subscription w/ same req-id + (fulfill/cancel! fulfill-atom channel-id req-id) + (subscribe/unsubscribe! subs-atom channel-id req-id) + ;; from here on, we'll completely ignore empty filters -- that is, + ;; filters that are empty after we've done our prepare step above. + (when-not (empty? use-req-filters) + (if (> (subscribe/num-filters subs-atom channel-id) max-filters) + (do + (metrics/inc-excessive-filters! metrics) + (hk/send! ch + (create-notice-message + (format + (str + "Too many subscription filters." + " Max allowed is %d, but you have %d.") + max-filters + (subscribe/num-filters subs-atom channel-id))))) + (do + ;; subscribe first so we are guaranteed to dispatch new arrivals + (metrics/time-subscribe! metrics + (subscribe/subscribe! subs-atom channel-id req-id use-req-filters + (fn [raw-event] + ;; "some" safety if we're notified and our channel has closed, + ;; but we've not yet unsubscribed in response; this isn't thread + ;; safe so could still see channel close before the send!; + ;; upstream observer invocation should catch and log. + (when (hk/open? ch) + (hk/send! ch (create-event-message req-id raw-event)))))) + ;; after subscription, capture fulfillment target rowid; in rare cases we + ;; may double-deliver an event or few but we will never miss an event + (if-let [target-row-id (store/max-event-rowid db)] + (fulfill/submit! metrics db fulfill-atom channel-id req-id use-req-filters target-row-id + (fn [raw-event] + ;; see note above; we may see channel close before we cancel + ;; fulfillment + (when (hk/open? ch) + (hk/send! ch (create-event-message req-id raw-event)))) + (fn [] + (hk/send! ch (create-eose-message req-id)))) + ;; should only occur on epochal first event + (log/warn "no max rowid; nothing yet to fulfill")))))))))) (defn- receive-close [metrics db subs-atom fulfill-atom channel-id ch [_ req-id]] @@ -291,25 +301,37 @@ nostr-url untethr-url maybe-server-hostname maybe-server-hostname)})) (defn- handler-q [db req] - (let [rows (jdbc/execute! db - ["select rowid, sys_ts, raw_event from n_events order by rowid desc limit 25"] - {:builder-fn rs/as-unqualified-lower-maps}) - rows' (mapv - (fn [row] - (let [parsed-event (-> row :raw_event parse)] - (-> row - (dissoc :raw_event) - (merge - (select-keys parsed-event [:kind :pubkey])) - (assoc :content - (str - (subs (:content parsed-event) 0 - (max 0 (min 75 (dec (count (:content parsed-event)))))) "..."))))) rows)] - {:status 200 - :headers {"Content-Type" "text/plain"} - :body (with-out-str - (pprint/print-table - [:rowid :sys_ts :kind :pubkey :content] rows'))})) + (let [parsed-body (or (some->> req :body slurp parse) [{}]) + _ (when-not (and + (every? map? parsed-body) + (nil? (validation/req-err "http:q" parsed-body))) + (throw (ex-info "bad request" {:req req}))) + prepared-filters (prepare-req-filters* parsed-body) + modified (mapv #(update % :limit (fn [a b] (min (or a b) 50)) 25) prepared-filters) + as-query (query/filters->query modified)] + (let [rows (jdbc/execute! db as-query + {:builder-fn rs/as-unqualified-lower-maps}) + rows' (mapv + (fn [row] + (let [parsed-event (-> row :raw_event parse)] + (-> row + (dissoc :raw_event) + (merge + (select-keys parsed-event [:kind :pubkey])) + (assoc :content + (let [max-summary-len 75 + the-content (:content parsed-event) + the-content-len (count the-content) + needs-summary? (> the-content-len max-summary-len) + the-summary (if needs-summary? + (subs the-content 0 max-summary-len) the-content) + suffix (if needs-summary? "..." "")] + (str the-summary suffix)))))) rows)] + {:status 200 + :headers {"Content-Type" "text/plain"} + :body (with-out-str + (pprint/print-table + [:rowid :sys_ts :kind :pubkey :content] rows'))}))) (defn- handler-metrics [metrics _req] {:status 200 diff --git a/src/me/untethr/nostr/query.clj b/src/me/untethr/nostr/query.clj index 8ab9006..fe31fce 100644 --- a/src/me/untethr/nostr/query.clj +++ b/src/me/untethr/nostr/query.clj @@ -55,20 +55,34 @@ (not-empty e#) (conj join-e) (not-empty p#) (conj join-p) (not-empty generic-tags) (conj join-x))) - q (if (empty? join-clause) + q (cond + (empty? base-clause) + (format "select v.rowid, v.raw_event from n_events v" base-clause) + (empty? join-clause) (format "select v.rowid, v.raw_event from n_events v where %s" base-clause) + :else (format "select v.rowid, v.raw_event from n_events v %s where %s" join-clause base-clause)) q (if (some? target-row-id) (str q " and v.rowid <= " target-row-id) q) - q (str q " and v.deleted_ = 0")] + q (format "%s %s deleted_ = 0" q (if (empty? base-clause) "where" "and")) + q (if (empty? join-clause) q (str q " group by v.rowid"))] (if (some? limit) ;; note: can't do order by w/in union query unless you leverage sub-queries like so: (apply vector (str "select * from (" q " order by v.created_at desc limit ?)") (conj base-params limit)) (apply vector q base-params)))) (defn filters->query + "Convert nostr filters to SQL query. Provided filters must be non-empty. + However, [{}] is supported and produces all values. + + Callers that care about efficiency should provide a de-duplicated list + of filters; i.e., we won't do any de-duping here. + + Filter attributes that are empty colls are ignored. So upstream callers that + want to return zero results in such cases are obligated to short-circuit + before invoking this function." ([filters] (filters->query filters nil)) ([filters target-row-id] - {:pre [(or (nil? target-row-id) (number? target-row-id))]} + {:pre [(not-empty filters) (or (nil? target-row-id) (number? target-row-id))]} (vec (reduce (fn [[q & p] [q+ & p+]] diff --git a/src/me/untethr/nostr/subscribe.clj b/src/me/untethr/nostr/subscribe.clj index f1272d9..e20233e 100644 --- a/src/me/untethr/nostr/subscribe.clj +++ b/src/me/untethr/nostr/subscribe.clj @@ -159,7 +159,7 @@ "Does not wipe out prior subscription with same req-id; upstream is expected to unsubscribe! priors before subscribe!." [subs-atom channel-id req-id filters observer] - {:pre [(not (validation/filters-empty? filters))]} + {:pre [(not (empty? filters))]} (let [sid (str channel-id ":" req-id)] (swap! subs-atom #(subscribe!* % channel-id sid filters observer)))) diff --git a/src/me/untethr/nostr/validation.clj b/src/me/untethr/nostr/validation.clj index e6fab98..36020e3 100644 --- a/src/me/untethr/nostr/validation.clj +++ b/src/me/untethr/nostr/validation.clj @@ -86,6 +86,10 @@ (def zero-hex-str (apply str (repeat 32 "0"))) +(defn filter-has-empty-attr? + [the-filter] + (some #(and (vector? %) (empty? %)) (vals the-filter))) + (defn conform-filter-lenient [the-filter] ;; based on what we see from clients in the wild, we'll forgive some mistakes @@ -115,11 +119,3 @@ [req-id] (cond (not (string? req-id)) :err/req-id)) - -(defn filter-empty? - [{:keys [ids kinds since until authors] e# :#e p# :#p :as _filter}] - (and (empty? ids) (empty? kinds) (nil? since) (nil? until) (empty? authors) (empty? e#) (empty? p#))) - -(defn filters-empty? - [filters] - (or (empty? filters) (every? filter-empty? filters))) diff --git a/test/test/app_test.clj b/test/test/app_test.clj new file mode 100644 index 0000000..c68a7a6 --- /dev/null +++ b/test/test/app_test.clj @@ -0,0 +1,17 @@ +(ns test.app-test + (:require + [clojure.test :refer :all] + [me.untethr.nostr.app :as app] + [test.support :as support])) + +(deftest prepare-req-filters*-test + (is (= [] (#'app/prepare-req-filters* []))) + (is (= [] (#'app/prepare-req-filters* [{:authors []}]))) + (is (= [{}] (#'app/prepare-req-filters* [{}]))) + (is (= [{}] (#'app/prepare-req-filters* [{} {}]))) + (is (= [{}] (#'app/prepare-req-filters* [{} {} {:authors []}]))) + (is (= [{} {:authors [support/fake-hex-str]}] + (#'app/prepare-req-filters* [{} {} {:authors [support/fake-hex-str]}]))) + (is (= [{} {:authors [support/fake-hex-str]}] + (#'app/prepare-req-filters* [{} {} {:authors [support/fake-hex-str]} + {:authors [support/fake-hex-str]}])))) diff --git a/test/test/query_test.clj b/test/test/query_test.clj index 74aeceb..062d33e 100644 --- a/test/test/query_test.clj +++ b/test/test/query_test.clj @@ -3,7 +3,6 @@ [next.jdbc :as jdbc] [next.jdbc.result-set :as rs] [me.untethr.nostr.app :as app] - [me.untethr.nostr.metrics :as metrics] [me.untethr.nostr.query :as query] [test.support :as support] [test.test-data :as test-data])) @@ -22,10 +21,13 @@ (deftest query-test (support/with-memory-db [db] (support/load-data db (:pool test-data/pool-with-filters)) - (doseq [[filters results] (:filters->results test-data/pool-with-filters)] - (is (= (set results) - (into #{} (map (partial row-id->id db)) (query* db filters))) - (pr-str [filters results]))) + (doseq [[filters expected-results] (:filters->results test-data/pool-with-filters) + :let [query-results (query* db filters)]] + (is (= (set expected-results) + (into #{} (map (partial row-id->id db)) query-results)) + (pr-str [filters expected-results])) + (is (= (count expected-results) (count query-results)) + (pr-str [filters query-results]))) ;; with the well-known data set, let's test some w/ target-row-id.. (is (= #{1 2 4} (-> (query* db [{:ids ["100" "101"]} diff --git a/test/test/subscribe_test.clj b/test/test/subscribe_test.clj index 2a7361e..1f65aac 100644 --- a/test/test/subscribe_test.clj +++ b/test/test/subscribe_test.clj @@ -1,14 +1,16 @@ (ns test.subscribe-test - (:require [clojure.test :refer :all] - [clojure.test.check :as tc] - [clojure.test.check.generators :as gen] - [clojure.test.check.properties :as prop] - [clojure.tools.logging :as log] - [me.untethr.nostr.app :as app] - [me.untethr.nostr.subscribe :as subscribe] - [me.untethr.nostr.metrics :as metrics] - [test.support :as support] - [test.test-data :as test-data]) + (:require + [clojure.set :as set] + [clojure.test :refer :all] + [clojure.test.check :as tc] + [clojure.test.check.generators :as gen] + [clojure.test.check.properties :as prop] + [clojure.tools.logging :as log] + [me.untethr.nostr.app :as app] + [me.untethr.nostr.subscribe :as subscribe] + [me.untethr.nostr.metrics :as metrics] + [test.support :as support] + [test.test-data :as test-data]) (:import (java.util List))) (defn- throw-fn @@ -30,6 +32,33 @@ (:id n) (:pubkey n) (:created_at n) (:kind n) (:tags n))) (pr-str i spec))))) +(deftest subscribe-with-pool-data-test + (let [metrics-fake (metrics/create-metrics) + subs-atom (atom (subscribe/create-empty-subs)) + idx->results-atom (atom {})] + ;; subscribe everything ... + (doseq [[idx [filters _]] (map-indexed vector + (:filters->results test-data/pool-with-filters))] + (subscribe/subscribe! subs-atom + (format "chan%d" idx) (format "req%d" idx) filters + (fn [fake-raw-event] + (swap! idx->results-atom update idx (fnil conj []) fake-raw-event)))) + ;; notify everything... + (doseq [{obj-id :id :as obj} (:pool test-data/pool-with-filters)] + (subscribe/notify! metrics-fake subs-atom obj (format "" obj-id))) + ;; verify... + (doseq [[idx [filters expected-ids]] + (map-indexed vector + (:filters->results test-data/pool-with-filters)) + :let [has-limits? (some :limit filters) + actual-raw-events (get @idx->results-atom idx []) + expected-raw-events (mapv (partial format "") expected-ids)]] + (if has-limits? + (is (= (set expected-raw-events) + (set/intersection (set expected-raw-events) (set actual-raw-events))) + filters) + (is (= expected-raw-events actual-raw-events) filters))))) + (deftest candidate-filters-test (let [subs-atom (doto (atom (subscribe/create-empty-subs)) @@ -197,15 +226,21 @@ (deftest firehose-test (let [metrics-fake (metrics/create-metrics) subs-atom (atom (subscribe/create-empty-subs)) - result-atom (atom nil)] + result-0-atom (atom nil) + result-1-atom (atom nil)] (subscribe/subscribe! subs-atom "scope-0" "main-channel" [{} {:ids ["abc"]}] - #(swap! result-atom (fnil conj []) %)) + #(swap! result-0-atom (fnil conj []) %)) + (subscribe/subscribe! subs-atom "scope-1" "main-channel" + [{}] + #(swap! result-1-atom (fnil conj []) %)) (subscribe/notify! metrics-fake subs-atom {:id "abc"} "") - (is (= @result-atom [""])) - (is (= 1 (subscribe/num-subscriptions subs-atom))) - (is (= 1 (subscribe/num-firehose-filters subs-atom))) - (is (= 2 (subscribe/num-filters subs-atom "scope-0"))))) + (is (= @result-0-atom [""])) + (is (= @result-1-atom [""])) + (is (= 2 (subscribe/num-subscriptions subs-atom))) + (is (= 2 (subscribe/num-firehose-filters subs-atom))) + (is (= 2 (subscribe/num-filters subs-atom "scope-0"))) + (is (= 1 (subscribe/num-filters subs-atom "scope-1"))))) (deftest regression-test (support/with-regression-data [data-vec] diff --git a/test/test/support.clj b/test/test/support.clj index 7f66623..8781d39 100644 --- a/test/test/support.clj +++ b/test/test/support.clj @@ -4,9 +4,12 @@ [me.untethr.nostr.store :as store] [me.untethr.nostr.app :as app] [me.untethr.nostr.json-facade :as json-facade] - [me.untethr.nostr.metrics :as metrics] [clojure.java.io :as io])) +(def ^:private hex-chars "abcdef0123456789") + +(def fake-hex-str (apply str (take 32 (cycle hex-chars)))) + (defmacro with-memory-db [bindings & body] `(with-open [db# (jdbc/get-connection "jdbc:sqlite::memory:")] diff --git a/test/test/test_data.clj b/test/test/test_data.clj index 5a97c28..ad2c373 100644 --- a/test/test/test_data.clj +++ b/test/test/test_data.clj @@ -12,10 +12,14 @@ {:id "106" :pubkey "cat" :created_at 160 :kind 1 :tags [["t" "tag0"]]} {:id "107" :pubkey "dog" :created_at 170 :kind 1 :tags [["p" "cat"] ["t" "tag1"]]}] :filters->results - [[[{:ids ["100" "101"]}] ["100" "101"]] + [[[{}] ["100" "101" "102" "103" "104" "105" "106" "107"]] + [[{} {:limit 1}] ["100" "101" "102" "103" "104" "105" "106" "107"]] + [[{:limit 2} {:limit 1}] ["106" "107"]] + [[{:ids ["100" "101"]}] ["100" "101"]] [[{:ids ["100" "101"] :limit 1}] ["101"]] [[{:since 140}] ["104" "105" "106" "107"]] [[{:since 140 :limit 1}] ["107"]] + [[{:since 150 :limit 1} {:until 120 :limit 3}] ["107" "102" "101" "100"]] [[{:since 140 :until 140}] ["104"]] [[{:until 140}] ["100" "101" "102" "103" "104"]] [[{:until 140 :limit 3}] ["102" "103" "104"]] @@ -39,8 +43,7 @@ [[{:#e ["100"]} {:#p ["abe" "cat"]} {:#t ["tag0"]}] ["103" "104" "105" "106" "107"]]]}) (def filter-matches - [;; we never expect empty filter but test the base case anyway - {:filter {} + [{:filter {} :known-matches [{:id "id" :pubkey "pk" :created_at 100 :kind 1 :tags []}] :known-non-matches []} {:filter {:ids ["id0"]} diff --git a/test/test/validation_test.clj b/test/test/validation_test.clj index da75777..e056d63 100644 --- a/test/test/validation_test.clj +++ b/test/test/validation_test.clj @@ -1,14 +1,10 @@ (ns test.validation-test (:require [clojure.test :refer :all] - [me.untethr.nostr.validation :as validation])) - -(def ^:private hex-chars "abcdef0123456789") - -(def ^:private fake-hex-str (apply str (take 32 (cycle hex-chars)))) + [me.untethr.nostr.validation :as validation] + [test.support :as support])) (deftest conform-filter-lenient-test - [] (is (= {} (validation/conform-filter-lenient {}))) ;ids kinds since until authors limit] e# :#e p# :#p (doseq [k [:ids :kinds :authors :#e :#p]] @@ -24,6 +20,12 @@ (validation/conform-filter-lenient {k ["bad"]}))) (is (= {k [validation/zero-hex-str validation/zero-hex-str]} (validation/conform-filter-lenient {k ["bad" "bad"]}))) - (is (= {k [validation/zero-hex-str fake-hex-str]} + (is (= {k [validation/zero-hex-str support/fake-hex-str]} (validation/conform-filter-lenient - {k ["bad" fake-hex-str]}))))) + {k ["bad" support/fake-hex-str]}))))) + +(deftest filter-has-empty-attr?-test + (is (not (validation/filter-has-empty-attr? {}))) + (is (not (validation/filter-has-empty-attr? {:b ["x"] :a ["y"]}))) + (is (validation/filter-has-empty-attr? {:a []})) + (is (validation/filter-has-empty-attr? {:b ["x"] :a []})))