-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.R
205 lines (172 loc) · 6.81 KB
/
db.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#' Connect to a database using YAML configs
#' @param db name of database, must be present in the file specified by \code{getOption('dbr.db_config_path')}
#' @param cache optional caching of the connection. If set to \code{TRUE}, the connection will be cached in the background and an all future \code{db_connect} calls will simply return that (even if called automatically from eg \code{db_query}) until the end of the R session or when caching on the \code{db} is disabled in a future \code{db_connect} call with explic \code{cache = FALSE}. See the examples for more details.
#' @param ... extra parameters passed to the database driver, even ones overriding the default values loaded from the YAML config
#' @importFrom DBI dbConnect dbDriver
#' @export
#' @seealso \code{\link{db_close}} \code{\link{db_query}}
#' @examples \dontrun{
#' ## create new connection
#' optbak <- options()
#' options('dbr.db_config_path' = system.file('example_db_config.yaml', package = 'dbr'))
#' con <- db_connect('sqlite')
#' str(con)
#' db_query('SELECT 42', 'sqlite')
#'
#' ## reusing the connection
#' str(db_connect('sqlite', cache = TRUE))
#' str(db_connect('sqlite'))
#' str(db_connect('sqlite'))
#' ## kill cached connection
#' db_close(db_connect('sqlite', cache = FALSE))
#'
#' ## restore options
#' options(optbak)
#' }
db_connect <- function(db, cache, ...) {
cache <- ifelse(missing(cache), 'default', cache)
if (exists(db, dbs)) {
if (cache != FALSE) {
return(dbs[[db]])
}
else {
## reset cached connection
db_close(dbs[[db]])
rm(list = db, envir = dbs)
}
}
params <- db_config(db)
## override defaults
extraparams <- list(...)
for (i in seq_len(length(extraparams))) {
params[[names(extraparams)[i]]] <- extraparams[[i]]
}
extralog <- ifelse(
length(extraparams) > 0,
paste0(' [', paste(paste(names(extraparams), extraparams, sep = '='), collapse = ', '), ']'),
'')
log_info(paste('Connecting to', db, extralog))
con <- structure(do.call(dbConnect, params), db = db, cached = cache)
## cache connection
if (isTRUE(cache)) {
dbs[[db]] <- con
}
con
}
#' Close a database connection
#' @param db database object returned by \code{\link{db_connect}}
#' @importFrom DBI dbDisconnect
#' @export
#' @seealso \code{\link{db_connect}}
#' @note To close a cached connection, call \code{db_close} on an object returned by \code{db_connect(..., cache = FALSE)}
db_close <- function(db) {
assert_attr(db, 'db')
if (!isTRUE(attr(db, 'cached'))) {
log_info(paste('Closing connection to', attr(db, 'db')))
dbDisconnect(db)
}
}
#' Execute an SQL query in a database
#' @param sql string
#' @param db database reference by name or object
#' @param ... passed to \code{sql_formatter}
#' @param sql_formatter function to be applied on \code{sql} potentially with \code{...}, eg using \code{glue} for string interpolation
#' @param output_format preferred output format that defaults to \code{data.frame}, but could be also \code{data.table} or \code{tibble} as well if the related R package is installed
#' @return data.frame with query metadata
#' @export
#' @importFrom DBI dbGetQuery
#' @importFrom logger log_info skip_formatter
#' @importFrom checkmate assert_string assert_function
#' @seealso \code{\link{db_connect}} \code{\link{db_refresh}}
#' @examples \dontrun{
#' options('dbr.db_config_path' = system.file('example_db_config.yaml', package = 'dbr'))
#' db_query('SELECT 42', 'sqlite')
#' db_query('SELECT {40 + 2}', 'sqlite')
#' }
db_query <- function(sql, db, ...,
sql_formatter = getOption('dbr.sql_formatter'),
output_format = getOption('dbr.output_format')) {
if (!is.object(db)) {
db <- db_connect(db)
on.exit({
db_close(db)
})
}
assert_attr(db, 'db')
assert_string(sql)
assert_function(sql_formatter)
sql <- do.call(sql_formatter, c(list(sql), list(...)), envir = parent.frame())
log_info('Executing:**********')
log_info(skip_formatter(sql))
log_info('********************')
start <- Sys.time()
result_set <- dbGetQuery(db, sql)
time_to_exec <- Sys.time() - start
log_info('Finished in %s returning %s rows', format(time_to_exec, digits = 4), nrow(result_set))
attr(result_set, 'when') <- start
attr(result_set, 'db') <- attr(db, 'db')
attr(result_set, 'time_to_exec') <- time_to_exec
attr(result_set, 'statement') <- sql
## convert to proffered output format
if (output_format != 'data.frame') {
result_set <- switch(
output_format,
'data.table' = data.table::setDT(result_set),
'tibble' = tibble::as_tibble(result_set),
stop('Unsupported output_format -- please use data.frame, data.table or tibble.'))
}
result_set
}
#' Refresh SQL query
#' @param x object returned by \code{db_query}
#' @seealso \code{\link{db_query}}
#' @importFrom checkmate assert_data_frame
#' @export
db_refresh <- function(x) {
assert_data_frame(x)
assert_attr(x, 'db')
assert_attr(x, 'statement')
with(attributes(x), db_query(statement, db))
}
#' Insert rows into a database table
#' @param df data.frame
#' @param table character vector of an optional schema and table name
#' @inheritParams db_close
#' @param ... further parameters passed to \code{dbWriteTable}, eg to modify \code{row.names} or \code{append} (depends on driver)
#' @seealso \code{RMySQL::\link[RMySQL]{dbWriteTable}}, \code{RPostgreSQL::\link[RPostgreSQL]{dbReadTable-methods}}
#' @importFrom DBI dbWriteTable
#' @importFrom checkmate assert_character
#' @export
#' @seealso \code{\link{db_append}}
#' @examples \dontrun{
#' options('db_config_path' = system.file('example_db_config.yaml', package = 'dbr'))
#' db_insert(mtcars, 'mtcars', 'sqlite')
#' db_append(mtcars, c('dm', 'mtcars'), 'sqlite')
#' }
db_insert <- function(df, table, db, ...) {
if (!is.object(db)) {
db <- db_connect(db)
on.exit({
db_close(db)
})
}
assert_data_frame(df)
assert_character(table, min.len = 1)
assert_attr(db, 'db')
log_info('Inserting %s rows into %s', nrow(df), paste(table, collapse = "."))
dbWriteTable(conn = db, name = table, value = df, ...)
}
#' Append rows into a database table
#'
#' This is a wrapper around \code{\link{db_insert}} with the default parameters set to append to a table.
#' @inheritParams db_insert
#' @export
db_append <- function(df, table, db, ...) {
## check if it's Redshift, as COPY FROM stdin doesn't work there
if (is.redshift(db)) {
redshift_insert_via_copy_from_s3(df = df, table = table, db = db)
} else {
## otherwise do a COPY FROM stdin
db_insert(df, table, db, overwrite = FALSE, append = TRUE, row.names = FALSE, ...)
}
}