Skip to content

Commit

Permalink
Avoid returning duplicates when joining on indexed tags, support univ…
Browse files Browse the repository at this point in the history
…ersal [{}] filters, plus some optmizations
  • Loading branch information
atdixon committed Dec 19, 2022
1 parent 21880a9 commit a3ebe6e
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 118 deletions.
168 changes: 95 additions & 73 deletions src/me/untethr/nostr/app.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
[me.untethr.nostr.fulfill :as fulfill]
[me.untethr.nostr.json-facade :as json-facade]
[me.untethr.nostr.metrics :as metrics]
[me.untethr.nostr.query :as query]
[me.untethr.nostr.store :as store]
[me.untethr.nostr.subscribe :as subscribe]
[me.untethr.nostr.validation :as validation]
Expand Down Expand Up @@ -78,15 +79,6 @@
true
(format "duplicate:%s" ok-message-str))))

(defn- handle-duplicate-event!
[metrics ch event ok-message-str]
(metrics/duplicate-event! metrics)
(hk/send! ch
(create-ok-message
(:id event)
true
(format "duplicate:%s" ok-message-str))))

(defn- handle-stored-event!
[metrics ch event ok-message-str]
(metrics/duplicate-event! metrics)
Expand Down Expand Up @@ -142,7 +134,7 @@
(subscribe/notify! metrics subs-atom e raw-event)))
(handle-invalid-event! metrics ch e verified-event-or-err-map))))

(def max-filters 12)
(def max-filters 15)

;; some clients may still send legacy filter format that permits singular id
;; in filter; so we'll support this for a while.
Expand All @@ -154,52 +146,70 @@
(not (contains? f :ids)))
(-> (assoc :ids [(:id f)]) (dissoc :id))))

(defn- prepare-req-filters*
[req-filters]
(->> req-filters
;; conform...
(map (comp validation/conform-filter-lenient
interpret-legacy-filter))
;; remove null filters (ie filters that could never match anything)...
(filter (complement validation/filter-has-empty-attr?))
;; remove duplicates...
distinct
vec))

(defn- receive-req
[metrics db subs-atom fulfill-atom channel-id ch [_ req-id & req-filters]]
(let [use-req-filters (mapv (comp validation/conform-filter-lenient
interpret-legacy-filter) req-filters)]
(if-let [err (validation/req-err req-id use-req-filters)]
(log/warn "invalid req" {:err err :req [req-id (vec use-req-filters)]})
(do
;; just in case we're still fulfilling prior subscription w/ same req-id
(fulfill/cancel! fulfill-atom channel-id req-id)
(subscribe/unsubscribe! subs-atom channel-id req-id)
(when-not (validation/filters-empty? use-req-filters)
(if (> (subscribe/num-filters subs-atom channel-id) max-filters)
(do
(metrics/inc-excessive-filters! metrics)
(hk/send! ch
(create-notice-message
(format
(str
"Too many subscription filters."
" Max allowed is %d, but you have %d.")
max-filters
(subscribe/num-filters subs-atom channel-id)))))
(do
;; subscribe first so we are guaranteed to dispatch new arrivals
(metrics/time-subscribe! metrics
(subscribe/subscribe! subs-atom channel-id req-id use-req-filters
(fn [raw-event]
;; "some" safety if we're notified and our channel has closed,
;; but we've not yet unsubscribed in response; this isn't thread
;; safe so could still see channel close before the send!;
;; upstream observer invocation should catch and log.
(when (hk/open? ch)
(hk/send! ch (create-event-message req-id raw-event))))))
;; after subscription, capture fulfillment target rowid; in rare cases we
;; may double-deliver an event or few but we will never miss an event
(if-let [target-row-id (store/max-event-rowid db)]
(fulfill/submit! metrics db fulfill-atom channel-id req-id use-req-filters target-row-id
(fn [raw-event]
;; see note above; we may see channel close before we cancel
;; fulfillment
(when (hk/open? ch)
(hk/send! ch (create-event-message req-id raw-event))))
(fn []
(hk/send! ch (create-eose-message req-id))))
;; should only occur on epochal first event
(log/warn "no max rowid; nothing yet to fulfill")))))))))
(if-not (every? map? req-filters)
(log/warn "invalid req" {:msg "expected objects"})
;; else -- req has basic form...
(let [use-req-filters (prepare-req-filters* req-filters)
req-err (validation/req-err req-id use-req-filters)]
(if req-err
(log/warn "invalid req" {:req-err req-err :req [req-id use-req-filters]})
;; else --
(do
;; just in case we're still fulfilling prior subscription w/ same req-id
(fulfill/cancel! fulfill-atom channel-id req-id)
(subscribe/unsubscribe! subs-atom channel-id req-id)
;; from here on, we'll completely ignore empty filters -- that is,
;; filters that are empty after we've done our prepare step above.
(when-not (empty? use-req-filters)
(if (> (subscribe/num-filters subs-atom channel-id) max-filters)
(do
(metrics/inc-excessive-filters! metrics)
(hk/send! ch
(create-notice-message
(format
(str
"Too many subscription filters."
" Max allowed is %d, but you have %d.")
max-filters
(subscribe/num-filters subs-atom channel-id)))))
(do
;; subscribe first so we are guaranteed to dispatch new arrivals
(metrics/time-subscribe! metrics
(subscribe/subscribe! subs-atom channel-id req-id use-req-filters
(fn [raw-event]
;; "some" safety if we're notified and our channel has closed,
;; but we've not yet unsubscribed in response; this isn't thread
;; safe so could still see channel close before the send!;
;; upstream observer invocation should catch and log.
(when (hk/open? ch)
(hk/send! ch (create-event-message req-id raw-event))))))
;; after subscription, capture fulfillment target rowid; in rare cases we
;; may double-deliver an event or few but we will never miss an event
(if-let [target-row-id (store/max-event-rowid db)]
(fulfill/submit! metrics db fulfill-atom channel-id req-id use-req-filters target-row-id
(fn [raw-event]
;; see note above; we may see channel close before we cancel
;; fulfillment
(when (hk/open? ch)
(hk/send! ch (create-event-message req-id raw-event))))
(fn []
(hk/send! ch (create-eose-message req-id))))
;; should only occur on epochal first event
(log/warn "no max rowid; nothing yet to fulfill"))))))))))

(defn- receive-close
[metrics db subs-atom fulfill-atom channel-id ch [_ req-id]]
Expand Down Expand Up @@ -291,25 +301,37 @@
nostr-url untethr-url maybe-server-hostname maybe-server-hostname)}))

(defn- handler-q [db req]
(let [rows (jdbc/execute! db
["select rowid, sys_ts, raw_event from n_events order by rowid desc limit 25"]
{:builder-fn rs/as-unqualified-lower-maps})
rows' (mapv
(fn [row]
(let [parsed-event (-> row :raw_event parse)]
(-> row
(dissoc :raw_event)
(merge
(select-keys parsed-event [:kind :pubkey]))
(assoc :content
(str
(subs (:content parsed-event) 0
(max 0 (min 75 (dec (count (:content parsed-event)))))) "..."))))) rows)]
{:status 200
:headers {"Content-Type" "text/plain"}
:body (with-out-str
(pprint/print-table
[:rowid :sys_ts :kind :pubkey :content] rows'))}))
(let [parsed-body (or (some->> req :body slurp parse) [{}])
_ (when-not (and
(every? map? parsed-body)
(nil? (validation/req-err "http:q" parsed-body)))
(throw (ex-info "bad request" {:req req})))
prepared-filters (prepare-req-filters* parsed-body)
modified (mapv #(update % :limit (fn [a b] (min (or a b) 50)) 25) prepared-filters)
as-query (query/filters->query modified)]
(let [rows (jdbc/execute! db as-query
{:builder-fn rs/as-unqualified-lower-maps})
rows' (mapv
(fn [row]
(let [parsed-event (-> row :raw_event parse)]
(-> row
(dissoc :raw_event)
(merge
(select-keys parsed-event [:kind :pubkey]))
(assoc :content
(let [max-summary-len 75
the-content (:content parsed-event)
the-content-len (count the-content)
needs-summary? (> the-content-len max-summary-len)
the-summary (if needs-summary?
(subs the-content 0 max-summary-len) the-content)
suffix (if needs-summary? "..." "")]
(str the-summary suffix)))))) rows)]
{:status 200
:headers {"Content-Type" "text/plain"}
:body (with-out-str
(pprint/print-table
[:rowid :sys_ts :kind :pubkey :content] rows'))})))

(defn- handler-metrics [metrics _req]
{:status 200
Expand Down
20 changes: 17 additions & 3 deletions src/me/untethr/nostr/query.clj
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,34 @@
(not-empty e#) (conj join-e)
(not-empty p#) (conj join-p)
(not-empty generic-tags) (conj join-x)))
q (if (empty? join-clause)
q (cond
(empty? base-clause)
(format "select v.rowid, v.raw_event from n_events v" base-clause)
(empty? join-clause)
(format "select v.rowid, v.raw_event from n_events v where %s" base-clause)
:else
(format "select v.rowid, v.raw_event from n_events v %s where %s" join-clause base-clause))
q (if (some? target-row-id) (str q " and v.rowid <= " target-row-id) q)
q (str q " and v.deleted_ = 0")]
q (format "%s %s deleted_ = 0" q (if (empty? base-clause) "where" "and"))
q (if (empty? join-clause) q (str q " group by v.rowid"))]
(if (some? limit)
;; note: can't do order by w/in union query unless you leverage sub-queries like so:
(apply vector (str "select * from (" q " order by v.created_at desc limit ?)") (conj base-params limit))
(apply vector q base-params))))

(defn filters->query
"Convert nostr filters to SQL query. Provided filters must be non-empty.
However, [{}] is supported and produces all values.
Callers that care about efficiency should provide a de-duplicated list
of filters; i.e., we won't do any de-duping here.
Filter attributes that are empty colls are ignored. So upstream callers that
want to return zero results in such cases are obligated to short-circuit
before invoking this function."
([filters] (filters->query filters nil))
([filters target-row-id]
{:pre [(or (nil? target-row-id) (number? target-row-id))]}
{:pre [(not-empty filters) (or (nil? target-row-id) (number? target-row-id))]}
(vec
(reduce
(fn [[q & p] [q+ & p+]]
Expand Down
2 changes: 1 addition & 1 deletion src/me/untethr/nostr/subscribe.clj
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
"Does not wipe out prior subscription with same req-id; upstream is expected
to unsubscribe! priors before subscribe!."
[subs-atom channel-id req-id filters observer]
{:pre [(not (validation/filters-empty? filters))]}
{:pre [(not (empty? filters))]}
(let [sid (str channel-id ":" req-id)]
(swap! subs-atom #(subscribe!* % channel-id sid filters observer))))

Expand Down
12 changes: 4 additions & 8 deletions src/me/untethr/nostr/validation.clj
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@

(def zero-hex-str (apply str (repeat 32 "0")))

(defn filter-has-empty-attr?
[the-filter]
(some #(and (vector? %) (empty? %)) (vals the-filter)))

(defn conform-filter-lenient
[the-filter]
;; based on what we see from clients in the wild, we'll forgive some mistakes
Expand Down Expand Up @@ -115,11 +119,3 @@
[req-id]
(cond
(not (string? req-id)) :err/req-id))

(defn filter-empty?
[{:keys [ids kinds since until authors] e# :#e p# :#p :as _filter}]
(and (empty? ids) (empty? kinds) (nil? since) (nil? until) (empty? authors) (empty? e#) (empty? p#)))

(defn filters-empty?
[filters]
(or (empty? filters) (every? filter-empty? filters)))
17 changes: 17 additions & 0 deletions test/test/app_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
(ns test.app-test
(:require
[clojure.test :refer :all]
[me.untethr.nostr.app :as app]
[test.support :as support]))

(deftest prepare-req-filters*-test
(is (= [] (#'app/prepare-req-filters* [])))
(is (= [] (#'app/prepare-req-filters* [{:authors []}])))
(is (= [{}] (#'app/prepare-req-filters* [{}])))
(is (= [{}] (#'app/prepare-req-filters* [{} {}])))
(is (= [{}] (#'app/prepare-req-filters* [{} {} {:authors []}])))
(is (= [{} {:authors [support/fake-hex-str]}]
(#'app/prepare-req-filters* [{} {} {:authors [support/fake-hex-str]}])))
(is (= [{} {:authors [support/fake-hex-str]}]
(#'app/prepare-req-filters* [{} {} {:authors [support/fake-hex-str]}
{:authors [support/fake-hex-str]}]))))
12 changes: 7 additions & 5 deletions test/test/query_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
[next.jdbc :as jdbc]
[next.jdbc.result-set :as rs]
[me.untethr.nostr.app :as app]
[me.untethr.nostr.metrics :as metrics]
[me.untethr.nostr.query :as query]
[test.support :as support]
[test.test-data :as test-data]))
Expand All @@ -22,10 +21,13 @@
(deftest query-test
(support/with-memory-db [db]
(support/load-data db (:pool test-data/pool-with-filters))
(doseq [[filters results] (:filters->results test-data/pool-with-filters)]
(is (= (set results)
(into #{} (map (partial row-id->id db)) (query* db filters)))
(pr-str [filters results])))
(doseq [[filters expected-results] (:filters->results test-data/pool-with-filters)
:let [query-results (query* db filters)]]
(is (= (set expected-results)
(into #{} (map (partial row-id->id db)) query-results))
(pr-str [filters expected-results]))
(is (= (count expected-results) (count query-results))
(pr-str [filters query-results])))
;; with the well-known data set, let's test some w/ target-row-id..
(is (= #{1 2 4} (-> (query* db
[{:ids ["100" "101"]}
Expand Down
Loading

0 comments on commit a3ebe6e

Please sign in to comment.