Skip to content

Implement tables row count ordering for MySQL. #1120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/load/copy-data.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
(defmethod queue-raw-data ((copy copy) rawq concurrency)
"Stream data as read by the map-queue method on the COPY argument into QUEUE,
as given."
(log-message :notice "COPY ~a ~@[with ~d rows estimated~] [~a/~a]"
(format-table-name (target copy))
(table-row-count-estimate (target copy))
(lp:kernel-worker-index)
(lp:kernel-worker-count))
(log-message :debug "Reader started for ~a" (format-table-name (target copy)))
(let* ((start-time (get-internal-real-time))
(row-count 0)
Expand Down Expand Up @@ -93,7 +98,6 @@
(trivial-backtrace:print-backtrace condition
:output nil))
(lp::invoke-transfer-error condition))))
(log-message :notice "COPY ~a" table-name)

;; Check for Read Concurrency Support from our source
(when (and multiple-readers (< 1 concurrency))
Expand Down
40 changes: 34 additions & 6 deletions src/load/migrate-database.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,24 @@
(setf (catalog-distribution-rules catalog)
(citus-distribute-schema catalog distribute))))

(defun optimize-table-copy-ordering (catalog)
"Return a list of tables to copy over in optimized order"
(let ((table-list (copy-list (table-list catalog)))
(view-list (copy-list (view-list catalog))))
;; when materialized views are not supported, view-list is empty here
(cond
((notevery #'zerop (mapcar #'table-row-count-estimate table-list))
(let ((sorted-table-list
(sort table-list #'> :key #'table-row-count-estimate)))
(log-message :notice
"Processing tables in this order: ~{~a: ~d rows~^, ~}"
(loop :for table :in (append table-list view-list)
:collect (format-table-name table)
:collect (table-row-count-estimate table)))
(nconc sorted-table-list view-list)))
(t
(nconc table-list view-list)))))


;;;
;;; Generic enough implementation of the copy-database method.
Expand Down Expand Up @@ -414,10 +432,7 @@
(return-from copy-database)))

(loop
:for table :in (append (table-list catalog)
;; when materialized views are not supported,
;; view-list is empty here
(view-list catalog))
:for table :in (optimize-table-copy-ordering catalog)

:do (let ((table-source (instanciate-table-copy-object copy table)))
;; first COPY the data from source to PostgreSQL, using copy-kernel
Expand Down Expand Up @@ -472,8 +487,21 @@

(when (and create-indexes
(zerop (gethash table writers-count)))
(log-message :notice "DONE copying ~a"
(format-table-name table))

(let* ((stats pgloader.monitor::*sections*)
(section (get-state-section stats :data))
(table-stats (pgstate-get-label section table))
(pprint-secs
(pgloader.state::format-interval seconds nil)))
;; in CCL we have access to the *sections* dynamic
;; binding from another thread, in SBCL we access
;; an empty copy.
(log-message :notice
"DONE copying ~a in ~a~@[ for ~d rows~]"
(format-table-name table)
pprint-secs
(when table-stats
(pgtable-rows table-stats))))
(alexandria:appendf
pkeys
(create-indexes-in-kernel (target-db copy)
Expand Down
1 change: 1 addition & 0 deletions src/package.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
#:table-comment
#:table-storage-parameter-list
#:table-tablespace
#:table-row-count-estimate
#:table-field-list
#:table-column-list
#:table-index-list
Expand Down
3 changes: 3 additions & 0 deletions src/sources/common/api.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@
(defgeneric fetch-foreign-keys (catalog db-copy &key including excluding)
(:documentation "Get the list of foreign keys from the source database."))

(defgeneric fetch-table-row-count (catalog db-copy &key including excluding)
(:documentation "Retrieve and set the row count estimate for given tables."))

(defgeneric fetch-comments (catalog db-copy &key including excluding)
(:documentation "Get the list of comments from the source database."))

Expand Down
21 changes: 21 additions & 0 deletions src/sources/mysql/mysql-schema.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,27 @@
:finally
(return schema)))

;;;
;;; MySQL table row count estimate
;;;
(defmethod fetch-table-row-count ((schema schema)
(mysql copy-mysql)
&key
including
excluding)
"Retrieve and set the row count estimate for given MySQL tables."
(loop
:for (table-name count)
:in (mysql-query (sql "/mysql/list-table-rows.sql"
(db-name *connection*)
including ; do we print the clause?
including
excluding ; do we print the clause?
excluding))
:do (let* ((table (find-table schema table-name)))
(when table
(setf (table-row-count-estimate table) (parse-integer count))))))


;;;
;;; Queries to get the MySQL comments.
Expand Down
5 changes: 5 additions & 0 deletions src/sources/mysql/mysql.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ Illegal ~a character starting at position ~a~@[: ~a~].~%"
:including including
:excluding excluding)

;; fetch tables row count estimate
(fetch-table-row-count schema mysql
:including including
:excluding excluding)

;; fetch view (and their columns) metadata, covering comments too
(let* ((view-names (unless (eq :all materialize-views)
(mapcar #'matview-source-name materialize-views)))
Expand Down
12 changes: 12 additions & 0 deletions src/sources/mysql/sql/list-table-rows.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- params: db-name
-- including
-- filter-list-to-where-clause incuding
-- excluding
-- filter-list-to-where-clause excluding
SELECT table_name,
cast(data_length/avg_row_length as integer)
FROM information_schema.tables
WHERE table_schema = '~a'
and table_type = 'BASE TABLE'
~:[~*~;and (~{table_name ~a~^ or ~})~]
~:[~*~;and (~{table_name ~a~^ and ~})~];
8 changes: 7 additions & 1 deletion src/utils/catalog.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,16 @@

(defstruct table source-name name schema oid comment
storage-parameter-list tablespace
(row-count-estimate 0 :type fixnum)
;; field is for SOURCE
field-list
;; column is for TARGET
column-list
index-list
fkey-list
trigger-list
;; citus is an extra slot for citus support
field-list column-list index-list fkey-list trigger-list citus-rule)
citus-rule)

(defstruct matview source-name name schema definition)

Expand Down