Skip to content

Commit

Permalink
Insert batching
Browse files Browse the repository at this point in the history
  • Loading branch information
obivan committed Oct 27, 2020
1 parent d15d589 commit 2ba875d
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 36 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@
• Изменено: xml-файлы обрабатываются параллельно.

[0.2.0]: https://github.com/obivan/fiaser/compare/v0.1.0...v0.2.0

## [0.3.0] - 2020-10-27
• Изменено: значительное ускорение вставки путем пакетирования записей

[0.3.0]: https://github.com/obivan/fiaser/compare/v0.2.0...v0.3.0
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject fiaser "0.2.0-SNAPSHOT"
(defproject fiaser "0.3.0-SNAPSHOT"
:description "Utilities for working with FIAS"
:url "http://github.com/obivan/fiaser"
:license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
Expand Down
48 changes: 23 additions & 25 deletions src/fiaser/convert.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
[fiaser.xml :as xml]
[fiaser.sqlite :as sqlite]
[fiaser.postgres :as pg]
[camel-snake-kebab.core :as csk]
[next.jdbc :as jdbc])
(:import (java.nio.file FileSystems)))

Expand Down Expand Up @@ -73,28 +72,26 @@
(run! (fn [[_ v]] (.close v)) connections))

(defmulti make-insert-handler :dbtype)
(defmethod make-insert-handler :sqlite [_ [conn table-name]]
(partial sqlite/skip-insert! conn table-name))
(defmethod make-insert-handler :postgres [_ [conn table-name]]
(defmethod make-insert-handler :sqlite [_ [conn schema]]
(partial sqlite/skip-insert! conn schema))
(defmethod make-insert-handler :postgres [_ [conn schema]]
(jdbc/execute-one! conn ["set synchronous_commit = off"])
(partial pg/skip-insert! conn table-name))
(partial pg/skip-insert! conn schema))

(defn make-insert-handlers
[cons schema]
(let [coll-name (:collection schema)
table-name (keyword (csk/->snake_case coll-name))]
(into [] (for [[k v] cons]
(make-insert-handler {:dbtype k} [v table-name])))))
(into [] (for [[k v] cons] (make-insert-handler {:dbtype k} [v schema]))))

(defn proceed-file
[datasources file-name schema]
(locking *out* (println "processing file" file-name))
(with-open [input-stream (io/input-stream file-name)]
(let [connections (open-connections datasources)
handlers (make-insert-handlers connections schema)]
(doseq [row (xml/stream input-stream)
handlers (make-insert-handlers connections schema)
batch-size 250]
(doseq [rows-batch (partition-all batch-size (xml/stream input-stream))
handler handlers]
(apply handler [row]))
(apply handler [rows-batch]))
(close-connections connections))))

(defn proceed-data
Expand All @@ -103,16 +100,17 @@
(let [scope (processing-scope xml-dir xsd-dir)
schemas (set (vals scope))
datasources (make-datasources targets)]
(println "creating tables ...")
(when-let [datasource (:sqlite datasources)]
(sqlite/prepare-database! datasource schemas))
(when-let [datasource (:postgres datasources)]
(let [pg-schema-name (-> targets :postgres :schema)
pg-tbs-name (or (-> targets :postgres :tablespace)
"pg_default")]
(pg/prepare-database! datasource pg-schema-name pg-tbs-name schemas)))
(println "done")
(println "loading data ...")
(doall
(pmap (fn [[file schema]] (proceed-file datasources file schema)) scope))
(println "done")))
(when (not-empty datasources)
(println "creating tables ...")
(when-let [datasource (:sqlite datasources)]
(sqlite/prepare-database! datasource schemas))
(when-let [datasource (:postgres datasources)]
(let [pg-schema-name (-> targets :postgres :schema)
pg-tbs-name (or (-> targets :postgres :tablespace)
"pg_default")]
(pg/prepare-database! datasource pg-schema-name pg-tbs-name schemas)))
(println "done")
(println "loading data ...")
(doall (pmap (fn [[file schema]]
(proceed-file datasources file schema)) scope))
(println "done"))))
4 changes: 2 additions & 2 deletions src/fiaser/postgres.clj
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@
(jdbc/with-options spec opts)))

(defn skip-insert!
[tx table-name row]
(sqlite/skip-insert! tx table-name row))
[conn schema rows-batch]
(sqlite/skip-insert! conn schema rows-batch))
30 changes: 23 additions & 7 deletions src/fiaser/sqlite.clj
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,27 @@
opts {:builder-fn rs/as-unqualified-lower-maps}]
(jdbc/with-options spec opts)))

(defn make-row-stub
[schema]
(let [fields (:fields schema)]
(into {} (for [{:keys [name]} fields] [(keyword name) nil]))))

(defn prepare-multi-insert
[schema rows]
(let [stub (make-row-stub schema)
cols (keys stub)
rows (map (partial merge stub) rows)]
[cols (map vals rows)]))

(defn skip-insert!
[tx table-name row]
(try
(sql/insert! tx table-name row)
(catch Exception e
(let [msg (.getMessage e)]
(binding [*out* *err*]
(println "skip row" row "because of error:" msg))))))
[conn schema rows-batch]
(let [coll-name (:collection schema)
table-name (keyword (csk/->snake_case coll-name))
[cols rows] (prepare-multi-insert schema rows-batch)]
(try
(sql/insert-multi! conn table-name cols rows)
(catch Exception e
(let [msg (.getMessage e)]
(binding [*out* *err*]
(locking *out*
(println "skip row" rows-batch "because of error:" msg))))))))
2 changes: 1 addition & 1 deletion src/fiaser/xml.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
xml/parse
:content
(map :attrs)
(seque 8000)))
(seque 500)))

(defn top-tag
[file-name]
Expand Down

0 comments on commit 2ba875d

Please sign in to comment.