Skip to content

Updated dependencies and added query-chunked #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
:url "http://www.eclipse.org/legal/epl-v10.html"}
:min-lein-version "2.5.0"
:dependencies [[org.clojure/clojure "1.8.0"]
[org.influxdb/influxdb-java "2.2"]
[org.influxdb/influxdb-java "2.7"]
[org.clojure/tools.logging "0.3.1"]
[clj-time "0.12.0"]]
:source-paths ["src/main/clojure"]
Expand Down
77 changes: 37 additions & 40 deletions src/main/clojure/influxdb_clojure/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
[clj-time.core :as t])
(:import (org.influxdb InfluxDB InfluxDBFactory InfluxDB$ConsistencyLevel)
(org.influxdb.dto BatchPoints Point BatchPoints$Builder Point$Builder Query QueryResult QueryResult$Result QueryResult$Series)
(java.util.concurrent TimeUnit)
(retrofit.client OkClient)
(com.squareup.okhttp OkHttpClient)))
(java.util.concurrent TimeUnit)))

(def ^:private default-uri "http://localhost:8086")

Expand All @@ -19,18 +17,6 @@
:read-timeout (* 5 1000)
:write-timeout (* 5 1000)})

(defn- default-client [opts]
(let [{:keys [connect-timeout
read-timeout
write-timeout]} (merge connection-default-opts opts)
^OkHttpClient http-client (OkHttpClient.)
^TimeUnit time-unit (TimeUnit/MILLISECONDS)]
(doto http-client
(.setConnectTimeout connect-timeout time-unit)
(.setReadTimeout read-timeout time-unit)
(.setWriteTimeout write-timeout time-unit))
(OkClient. http-client)))

(defn connect
"Connects to the given InfluxDB endpoint and returns a connection"
(^InfluxDB []
Expand All @@ -39,13 +25,9 @@
(connect uri default-user default-password))
(^InfluxDB [uri user password]
(connect uri user password {}))
(^InfluxDB [uri user password opts]
;; Create our own OkHttpClient so that we can set connection parameters.
;; The default timeouts set by the underlying Retrofit/OkHttpClient implementation
;; are rather high: connect timeout: 15s, read timeout: 20s.
;; See: retrofit.client.Defaults.
(let [{:keys [client]
:or {client (default-client opts)}} opts]
(^InfluxDB [uri user password {:keys [client]}]
(if-not client
(InfluxDBFactory/connect uri user password)
(InfluxDBFactory/connect uri user password client))))

(defn create-database
Expand Down Expand Up @@ -88,7 +70,7 @@
(let [{:keys [tags consistency retention-policy]
:or {tags {}
consistency :any
retention-policy "default"}} opts
retention-policy "autogen"}} opts
^BatchPoints$Builder batch-builder (BatchPoints/database database-name)
batch-time (System/currentTimeMillis)
point-objects (map (partial convert-point batch-time tags) points)]
Expand All @@ -102,29 +84,44 @@
(.point batch-builder point-object))
(.write conn (.build batch-builder)))))

(defn convert-query-result [^QueryResult query-result]
(letfn [(convert-series [^QueryResult$Series series]
{:name (.getName series)
:colums (into [] (.getColumns series))
:values (into [] (map #(into [] %) (.getValues series)))})
(convert-result [^QueryResult$Result result]
(if (.hasError result)
{:error (.getError result)})
{:series (into [] (map convert-series (.getSeries result)))})]
(let [response {}]
(if (.hasError query-result)
(assoc response :error (.getError query-result))
(->> query-result
.getResults
(map convert-result)
(into [])
(assoc response :results))))))

(defn query
"Executes a database query"
([^InfluxDB conn ^String query-str]
(query conn query-str nil))
([^InfluxDB conn ^String query-str ^String database-name]
(defn convert-series [^QueryResult$Series series]
{:name (.getName series)
:colums (into [] (.getColumns series))
:values (into [] (map #(into [] %) (.getValues series)))})
(defn convert-result [^QueryResult$Result result]
(if (.hasError result)
{:error (.getError result)})
{:series (into [] (map convert-series (.getSeries result)))})
(let [^Query query (Query. query-str database-name)
^QueryResult query-result (.query conn query)
response {}]
(if (.hasError query-result)
(assoc response :error (.getError query-result)))
(->> query-result
.getResults
(map convert-result)
(into [])
(assoc response :results)))))
^QueryResult query-result (.query conn query)]
(convert-query-result query-result))))

(defn query-chunked
"Executes a chunked database query executing callback every chunk
after last chunk has been processed, sends a last chunk in the
form of {:error :done}"
([^InfluxDB conn ^String query-str ^Integer chunk-size ^String database-name callback-fn]
(let [^Query query (Query. query-str database-name)
rewrite-done #(if (= (:error %) "DONE") {:error :done} %)
consumer (reify java.util.function.Consumer
(accept [this t]
(callback-fn (-> t convert-query-result rewrite-done))))]
(.query conn query chunk-size consumer))))

(defn measurements
"Returns a list of measurements present in a database"
Expand Down
31 changes: 18 additions & 13 deletions src/test/clojure/influxdb_clojure/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,25 @@
[influxdb-clojure.core :refer :all]
[clj-time.core :refer [now]]
[clj-time.coerce :refer [to-long]]
[version-clj.core :refer [version-compare]]))
[version-clj.core :refer [version-compare]]
[clj-time.core :as t]))

(defn fixture-write-points []
(let [conn (connect)
test-db-name "influxdb_clojure_test"
fields {:value 23}
point {:measurement "test.write"
:time (to-long (t/date-time 2000 1 1))
:fields fields}]
(write-points conn test-db-name [point])))

(defn with-test-db [f]
(let [conn (connect)
test-db-name "influxdb_clojure_test"]
(delete-database conn test-db-name)
(create-database conn test-db-name)
(fixture-write-points)
(f)))
; (delete-database conn test-db-name)))

(use-fixtures :each with-test-db)

Expand All @@ -36,15 +47,7 @@
(delete-database conn db-name)
(is (not (db-exists? conn db-name)))))))

(deftest write-tests
(testing "write-points"
(let [conn (connect)
test-db-name "influxdb_clojure_test"
fields {:value 23}
point {:measurement "test.write"
:time (to-long (now))
:fields fields}]
(write-points conn test-db-name [point]))))


;; Helper functions for query tests
(defn- count-results [query-result]
Expand Down Expand Up @@ -88,8 +91,8 @@
(is (= 1 (count-series query-result)))
(is (= "test.write" (series-name query-result)))
(is (= ["time" "value"] (series-columns query-result)))
(is (< 34 (count (series-values query-result))))
(is (= "2016-04-12T04:40:41.664Z" (series-nth-value query-result 0 "time")))))
(is (= 1 (count (series-values query-result))))
(is (= "2000-01-01T00:00:00Z" (series-nth-value query-result 0 "time")))))
(testing "query with error"
(let [conn (connect)
test-db-name "influxdb_clojure_test"
Expand All @@ -112,3 +115,5 @@
ss (series conn db-name)]
(log/debug "Series:" ss)
(is (= '("test.write") ss)))))

#_ (series (connect) "influxdb_clojure_test")