Skip to content

Commit

Permalink
Send errors into separate files (logs, data), fix system loading and …
Browse files Browse the repository at this point in the history
…parameters.
  • Loading branch information
dimitri committed Feb 7, 2013
1 parent f6bdf64 commit d795ac0
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 35 deletions.
3 changes: 0 additions & 3 deletions csv.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

(in-package :pgloader.csv)

(defparameter *csv-path-root*
(merge-pathnames "csv/" (user-homedir-pathname)))

(defun get-pathname (dbname table-name)
"Return a pathname where to read or write the file data"
(make-pathname
Expand Down
42 changes: 36 additions & 6 deletions package.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

(defpackage #:pgloader.utils
(:use #:cl)
(:import-from #:pgloader.params
#:*reject-path-root*
#:*state*)
(:export #:report-header
#:report-table-name
#:report-results
Expand All @@ -16,6 +19,9 @@
#:pgstate-add-table
#:pgstate-setf
#:pgstate-incf
#:pgstate-decf
#:pgtable-reject-data
#:pgtable-reject-logs
#:report-pgtable-stats
#:report-pgstate-stats))

Expand All @@ -26,16 +32,21 @@

(defpackage #:pgloader.csv
(:use #:cl)
(:import-from #:pgloader.params
#:*csv-path-root*)
(:export #:*csv-path-root*
#:get-pathname))

(defpackage #:pgloader.mysql
(:use #:cl)
(:import-from #:pgloader
(:import-from #:pgloader.params
#:*csv-path-root*
#:*reject-path-root*
#:*loader-kernel*
#:*myconn-host*
#:*myconn-user*
#:*myconn-pass*)
#:*myconn-pass*
#:*state*)
(:import-from #:pgloader.utils
#:report-header
#:report-table-name
Expand All @@ -48,10 +59,9 @@
#:pgstate-add-table
#:pgstate-setf
#:pgstate-incf
#:pgstate-decf
#:report-pgtable-stats
#:report-pgstate-stats)
(:import-from #:pgloader
#:*state*)
(:export #:map-rows
#:copy-from
#:list-databases
Expand All @@ -63,6 +73,11 @@

(defpackage #:pgloader.pgsql
(:use #:cl)
(:import-from #:pgloader.params
#:*csv-path-root*
#:*reject-path-root*
#:*loader-kernel*
#:*state*)
(:import-from #:pgloader.utils
#:report-header
#:report-table-name
Expand All @@ -75,10 +90,11 @@
#:pgstate-add-table
#:pgstate-setf
#:pgstate-incf
#:pgstate-decf
#:pgtable-reject-data
#:pgtable-reject-logs
#:report-pgtable-stats
#:report-pgstate-stats)
(:import-from #:pgloader
#:*state*)
(:export #:truncate-table
#:copy-from-file
#:copy-from-queue
Expand All @@ -89,11 +105,25 @@

(defpackage #:pgloader
(:use #:cl)
(:import-from #:pgloader.params
#:*csv-path-root*
#:*reject-path-root*
#:*loader-kernel*
#:*myconn-host*
#:*myconn-user*
#:*myconn-pass*
#:*state*)
(:import-from #:pgloader.pgsql
#:copy-from-file
#:list-databases
#:list-tables)
(:export #:*state*
#:*csv-path-root*
#:*reject-path-root*
#:*loader-kernel*
#:*myconn-host*
#:*myconn-user*
#:*myconn-pass*
#:copy-from-file
#:list-databases
#:list-tables))
Expand Down
31 changes: 31 additions & 0 deletions params.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
;;;
;;; pgloader parameters
;;;
;;; in a separate file to break circular dependencies

(defpackage #:pgloader.params
(:use #:cl)
(:export #:*csv-path-root*
#:*reject-path-root*
#:*loader-kernel*
#:*myconn-host*
#:*myconn-user*
#:*myconn-pass*
#:*state*))

(in-package :pgloader.params)

(defparameter *csv-path-root*
(merge-pathnames "csv/" (user-homedir-pathname)))

(defparameter *reject-path-root*
(make-pathname :directory "/tmp"))

;;; package nicknames are only defined later, in package.lisp
(defparameter *loader-kernel* (lparallel:make-kernel 2)
"lparallel kernel to use for loading data in parallel")

(defparameter *myconn-host* "myhost")
(defparameter *myconn-user* "myuser")
(defparameter *myconn-pass* "mypass")

3 changes: 2 additions & 1 deletion pgloader.asd
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
#:split-sequence
#:cl-csv
#:lparallel)
:components ((:file "package")
:components ((:file "params")
(:file "package" :depends-on ("params"))
(:file "utils" :depends-on ("package"))
(:file "pgloader" :depends-on ("package" "utils"))

Expand Down
14 changes: 2 additions & 12 deletions pgloader.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,11 @@
(in-package #:pgloader)

;;;
;;; Parameters you might want to change
;;; Internal Parameters. The one to change are in params.lisp
;;;
(defparameter *loader-kernel* (lp:make-kernel 2)
"lparallel kernel to use for loading data in parallel")

(defparameter *myconn-host* "myhost")
(defparameter *myconn-user* "myuser")
(defparameter *myconn-pass* "mypass")

(defparameter *state* (pgloader.utils:make-pgstate)
"pgloader state, global stats and per-table stats")
"State of the current loading.")

;;;
;;; TODO: define a top level API
;;;

(defparameter *state* (pgloader.utils:make-pgstate)
"State of the current loading.")
32 changes: 25 additions & 7 deletions pgsql.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,35 @@ Finally returns how many rows where read and processed."
;;; split 1000 rows in 10 batches of 100 rows
;;; split 352 rows in 3 batches of 100 rows + 1 batch of 52 rows
;;;

(defun process-bad-row (dbname table-name condition row)
"Process bad row"
;; first, the stats.
"Add the row to the reject file, in PostgreSQL COPY TEXT format"
;; first, update the stats.
(pgstate-incf *state* table-name :errs 1)
(pgstate-decf *state* table-name :rows 1)

;; now, the bad row processing
(let* ((str (format nil "~a" row))
(str (if (< 72 (length str)) (subseq str 0 72)
str)))
(format t "ERROR: ~a~%" condition)
(format t "DATA: ~a...~%" str)))
(let* ((table (pgstate-get-table *state* table-name))
(data (pgtable-reject-data table))
(logs (pgtable-reject-logs table)))

;; first log the rejected data
(with-open-file (reject-data-file data
:direction :output
:if-exists :append
:if-does-not-exist :create
:external-format :utf8)
;; the row has already been processed when we get here
(pgloader.pgsql:format-row reject-data-file row))

;; now log the condition signaled to reject the data
(with-open-file (reject-logs-file logs
:direction :output
:if-exists :append
:if-does-not-exist :create
:external-format :utf8)
;; the row has already been processed when we get here
(format reject-logs-file "~a~%" condition))))

;;;
;;; Compute the next batch size, must be smaller than the previous one or
Expand Down
21 changes: 15 additions & 6 deletions utils.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
;;;
(in-package :pgloader.utils)

(defparameter *reject-path-root*
(make-pathname :directory "/tmp"))

;;;
;;; Timing Macro
;;;
Expand Down Expand Up @@ -73,14 +70,14 @@
(merge-pathnames
(format nil "~a" dbname) *reject-path-root*))
:name table-name
:type "rej.dat")
:type "dat")
(pgtable-reject-logs table)
(make-pathname
:directory (pathname-directory
(merge-pathnames
(format nil "~a" dbname) *reject-path-root*))
:name table-name
:type "rej.log"))
:type "log"))
table)))

(defun pgstate-setf (pgstate name &key read rows errs secs)
Expand All @@ -100,7 +97,6 @@
pgtable))

(defun pgstate-incf (pgstate name &key rows errs secs)
(format t "~&pgstate-incf: ~d rows, ~d errs, ~f secs~%" rows errs secs)
(let ((pgtable (pgstate-get-table pgstate name)))
(when rows
(incf (pgtable-rows pgtable) rows)
Expand All @@ -113,6 +109,19 @@
(incf (pgstate-secs pgstate) secs))
pgtable))

(defun pgstate-decf (pgstate name &key rows errs secs)
(let ((pgtable (pgstate-get-table pgstate name)))
(when rows
(decf (pgtable-rows pgtable) rows)
(decf (pgstate-rows pgstate) rows))
(when errs
(decf (pgtable-errs pgtable) errs)
(decf (pgstate-errs pgstate) errs))
(when secs
(decf (pgtable-secs pgtable) secs)
(decf (pgstate-secs pgstate) secs))
pgtable))

(defun report-pgtable-stats (pgstate name)
(with-slots (rows errs secs) (pgstate-get-table pgstate name)
(format t "~9@a ~9@a ~9@a" rows errs (format-interval secs nil))))
Expand Down

0 comments on commit d795ac0

Please sign in to comment.