diff --git a/csv.lisp b/csv.lisp index fa30a179..fdecb7de 100644 --- a/csv.lisp +++ b/csv.lisp @@ -4,9 +4,6 @@ (in-package :pgloader.csv) -(defparameter *csv-path-root* - (merge-pathnames "csv/" (user-homedir-pathname))) - (defun get-pathname (dbname table-name) "Return a pathname where to read or write the file data" (make-pathname diff --git a/package.lisp b/package.lisp index dae7b34a..7186345f 100644 --- a/package.lisp +++ b/package.lisp @@ -5,6 +5,9 @@ (defpackage #:pgloader.utils (:use #:cl) + (:import-from #:pgloader.params + #:*reject-path-root* + #:*state*) (:export #:report-header #:report-table-name #:report-results @@ -16,6 +19,9 @@ #:pgstate-add-table #:pgstate-setf #:pgstate-incf + #:pgstate-decf + #:pgtable-reject-data + #:pgtable-reject-logs #:report-pgtable-stats #:report-pgstate-stats)) @@ -26,16 +32,21 @@ (defpackage #:pgloader.csv (:use #:cl) + (:import-from #:pgloader.params + #:*csv-path-root*) (:export #:*csv-path-root* #:get-pathname)) (defpackage #:pgloader.mysql (:use #:cl) - (:import-from #:pgloader + (:import-from #:pgloader.params + #:*csv-path-root* + #:*reject-path-root* #:*loader-kernel* #:*myconn-host* #:*myconn-user* - #:*myconn-pass*) + #:*myconn-pass* + #:*state*) (:import-from #:pgloader.utils #:report-header #:report-table-name @@ -48,10 +59,9 @@ #:pgstate-add-table #:pgstate-setf #:pgstate-incf + #:pgstate-decf #:report-pgtable-stats #:report-pgstate-stats) - (:import-from #:pgloader - #:*state*) (:export #:map-rows #:copy-from #:list-databases @@ -63,6 +73,11 @@ (defpackage #:pgloader.pgsql (:use #:cl) + (:import-from #:pgloader.params + #:*csv-path-root* + #:*reject-path-root* + #:*loader-kernel* + #:*state*) (:import-from #:pgloader.utils #:report-header #:report-table-name @@ -75,10 +90,11 @@ #:pgstate-add-table #:pgstate-setf #:pgstate-incf + #:pgstate-decf + #:pgtable-reject-data + #:pgtable-reject-logs #:report-pgtable-stats #:report-pgstate-stats) - (:import-from #:pgloader - #:*state*) (:export #:truncate-table #:copy-from-file #:copy-from-queue @@ -89,11 +105,25 @@ (defpackage #:pgloader (:use #:cl) + (:import-from #:pgloader.params + #:*csv-path-root* + #:*reject-path-root* + #:*loader-kernel* + #:*myconn-host* + #:*myconn-user* + #:*myconn-pass* + #:*state*) (:import-from #:pgloader.pgsql #:copy-from-file #:list-databases #:list-tables) (:export #:*state* + #:*csv-path-root* + #:*reject-path-root* + #:*loader-kernel* + #:*myconn-host* + #:*myconn-user* + #:*myconn-pass* #:copy-from-file #:list-databases #:list-tables)) diff --git a/params.lisp b/params.lisp new file mode 100644 index 00000000..c95bfb4f --- /dev/null +++ b/params.lisp @@ -0,0 +1,31 @@ +;;; +;;; pgloader parameters +;;; +;;; in a separate file to break circular dependencies + +(defpackage #:pgloader.params + (:use #:cl) + (:export #:*csv-path-root* + #:*reject-path-root* + #:*loader-kernel* + #:*myconn-host* + #:*myconn-user* + #:*myconn-pass* + #:*state*)) + +(in-package :pgloader.params) + +(defparameter *csv-path-root* + (merge-pathnames "csv/" (user-homedir-pathname))) + +(defparameter *reject-path-root* + (make-pathname :directory "/tmp")) + +;;; package nicknames are only defined later, in package.lisp +(defparameter *loader-kernel* (lparallel:make-kernel 2) + "lparallel kernel to use for loading data in parallel") + +(defparameter *myconn-host* "myhost") +(defparameter *myconn-user* "myuser") +(defparameter *myconn-pass* "mypass") + diff --git a/pgloader.asd b/pgloader.asd index 4f30cfc7..9099e3d6 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -12,7 +12,8 @@ #:split-sequence #:cl-csv #:lparallel) - :components ((:file "package") + :components ((:file "params") + (:file "package" :depends-on ("params")) (:file "utils" :depends-on ("package")) (:file "pgloader" :depends-on ("package" "utils")) diff --git a/pgloader.lisp b/pgloader.lisp index 19846848..44777c11 100644 --- a/pgloader.lisp +++ b/pgloader.lisp @@ -6,21 +6,11 @@ (in-package #:pgloader) ;;; -;;; Parameters you might want to change +;;; Internal Parameters. The one to change are in params.lisp ;;; -(defparameter *loader-kernel* (lp:make-kernel 2) - "lparallel kernel to use for loading data in parallel") - -(defparameter *myconn-host* "myhost") -(defparameter *myconn-user* "myuser") -(defparameter *myconn-pass* "mypass") - (defparameter *state* (pgloader.utils:make-pgstate) - "pgloader state, global stats and per-table stats") + "State of the current loading.") ;;; ;;; TODO: define a top level API ;;; - -(defparameter *state* (pgloader.utils:make-pgstate) - "State of the current loading.") diff --git a/pgsql.lisp b/pgsql.lisp index a06f5020..7ee32bd0 100644 --- a/pgsql.lisp +++ b/pgsql.lisp @@ -253,17 +253,35 @@ Finally returns how many rows where read and processed." ;;; split 1000 rows in 10 batches of 100 rows ;;; split 352 rows in 3 batches of 100 rows + 1 batch of 52 rows ;;; + (defun process-bad-row (dbname table-name condition row) - "Process bad row" - ;; first, the stats. + "Add the row to the reject file, in PostgreSQL COPY TEXT format" + ;; first, update the stats. (pgstate-incf *state* table-name :errs 1) + (pgstate-decf *state* table-name :rows 1) ;; now, the bad row processing - (let* ((str (format nil "~a" row)) - (str (if (< 72 (length str)) (subseq str 0 72) - str))) - (format t "ERROR: ~a~%" condition) - (format t "DATA: ~a...~%" str))) + (let* ((table (pgstate-get-table *state* table-name)) + (data (pgtable-reject-data table)) + (logs (pgtable-reject-logs table))) + + ;; first log the rejected data + (with-open-file (reject-data-file data + :direction :output + :if-exists :append + :if-does-not-exist :create + :external-format :utf8) + ;; the row has already been processed when we get here + (pgloader.pgsql:format-row reject-data-file row)) + + ;; now log the condition signaled to reject the data + (with-open-file (reject-logs-file logs + :direction :output + :if-exists :append + :if-does-not-exist :create + :external-format :utf8) + ;; the row has already been processed when we get here + (format reject-logs-file "~a~%" condition)))) ;;; ;;; Compute the next batch size, must be smaller than the previous one or diff --git a/utils.lisp b/utils.lisp index 0837bcdc..aace5c88 100644 --- a/utils.lisp +++ b/utils.lisp @@ -3,9 +3,6 @@ ;;; (in-package :pgloader.utils) -(defparameter *reject-path-root* - (make-pathname :directory "/tmp")) - ;;; ;;; Timing Macro ;;; @@ -73,14 +70,14 @@ (merge-pathnames (format nil "~a" dbname) *reject-path-root*)) :name table-name - :type "rej.dat") + :type "dat") (pgtable-reject-logs table) (make-pathname :directory (pathname-directory (merge-pathnames (format nil "~a" dbname) *reject-path-root*)) :name table-name - :type "rej.log")) + :type "log")) table))) (defun pgstate-setf (pgstate name &key read rows errs secs) @@ -100,7 +97,6 @@ pgtable)) (defun pgstate-incf (pgstate name &key rows errs secs) - (format t "~&pgstate-incf: ~d rows, ~d errs, ~f secs~%" rows errs secs) (let ((pgtable (pgstate-get-table pgstate name))) (when rows (incf (pgtable-rows pgtable) rows) @@ -113,6 +109,19 @@ (incf (pgstate-secs pgstate) secs)) pgtable)) +(defun pgstate-decf (pgstate name &key rows errs secs) + (let ((pgtable (pgstate-get-table pgstate name))) + (when rows + (decf (pgtable-rows pgtable) rows) + (decf (pgstate-rows pgstate) rows)) + (when errs + (decf (pgtable-errs pgtable) errs) + (decf (pgstate-errs pgstate) errs)) + (when secs + (decf (pgtable-secs pgtable) secs) + (decf (pgstate-secs pgstate) secs)) + pgtable)) + (defun report-pgtable-stats (pgstate name) (with-slots (rows errs secs) (pgstate-get-table pgstate name) (format t "~9@a ~9@a ~9@a" rows errs (format-interval secs nil))))