Skip to content

Commit

Permalink
Merge pull request #54 from TidierOrg/improve_CH
Browse files Browse the repository at this point in the history
Improve clickhouse backend, improve docs
  • Loading branch information
drizk1 authored Aug 11, 2024
2 parents d909d8a + 7823eff commit 0303dc9
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 54 deletions.
7 changes: 6 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
# TidierDB.jl updates

## v0.3.2 - 2024-07-28

## v0.3.2 - 2024-08-**
- adds `@head` for limiting number of collected rows
- adds support for reading URLS in `db_table` with ClickHouse
- adds support for reading from multiple files at once as a vector of urls in `db_table` when using ClickHouse
- ie `db_table(db, ["url1", "url2"])`
- adds docs around using `*` for reading in multiple files from folder
- adds docs for `db_table`
- adds docs for previewing or saving intermediate tables in ongoing `@chain`
- Bugfix: `@count` updates metadata

## v0.3.1 - 2024-07-28
- adds support for reading from multiple files at once as a vector of paths in `db_table` when using DuckDB
Expand Down
47 changes: 47 additions & 0 deletions docs/examples/UserGuide/from_queryex.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,51 @@
# 1 │ Pontiac Firebird 8 19.2
# 2 │ Toyota Corolla 4 33.9
# 3 │ Hornet 4 Drive 6 21.4
# ```

# ## Preview or save an intermediate table
# While querying a dataset, you may wish to see an intermediate table, or even save it. You can use `@aside` and `from_query(_)`, illustrated below, to do just that.
# While we opted to print the results in this simple example below, we could have saved them by using `name = DB.@chain...`

# ```julia
# import ClickHouse;
# conn = conn = DB.connect(DB.clickhouse(); host="localhost", port=19000, database="default", user="default", password="")
# path = "https://huggingface.co/datasets/maharshipandya/spotify-tracks-dataset/resolve/refs%2Fconvert%2Fparquet/default/train/0000.parquet"
# DB.@chain DB.db_table(conn, path) begin
# DB.@count(cyl)
# @aside println(DB.@chain DB.from_query(_) DB.@head(5) DB.@collect)
# DB.@arrange(desc(count))
# DB.@collect
# end
# ```
# ```
# 5×2 DataFrame
# Row │ artists count
# │ String? UInt64
# ─────┼─────────────────
# 1 │ missing 1
# 2 │ Wizo 3
# 3 │ MAGIC! 3
# 4 │ Macaco 1
# 5 │ SOYOU 1
# 31438×2 DataFrame
# Row │ artists count
# │ String? UInt64
# ───────┼─────────────────────────
# 1 │ The Beatles 279
# 2 │ George Jones 271
# 3 │ Stevie Wonder 236
# 4 │ Linkin Park 224
# 5 │ Ella Fitzgerald 222
# 6 │ Prateek Kuhad 217
# 7 │ Feid 202
# ⋮ │ ⋮ ⋮
# 31432 │ Leonard 1
# 31433 │ marcos g 1
# 31434 │ BLVKSHP 1
# 31435 │ Memtrix 1
# 31436 │ SOYOU 1
# 31437 │ Macaco 1
# 31438 │ missing 1
# 31424 rows omitted
# ```
1 change: 1 addition & 0 deletions docs/examples/UserGuide/functions_pass_to_DB.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

# ```
# using TidierDB
# db = connect(duckdb())
# path = "https://gist.githubusercontent.com/seankross/a412dfbd88b3db70b74b/raw/5f23f993cd87c283ce766e7ac6b329ee7cc2e1d1/mtcars.csv"
# copy_to(db, path, "mtcars");
#
Expand Down
20 changes: 17 additions & 3 deletions docs/examples/UserGuide/getting_started.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,25 @@

# Alternatively, `using Tidier` will import TidierDB in the above manner for you, where TidierDB functions and macros will be available as `DB.@mutate()` and so on, and the TidierData equivalent would be `@mutate()`.

# ## Connecting
# To connect to a database, you can uset the `connect` function as shown below, or establish your own connection through the respecitve libraries.

# For example
# Connecting to MySQL
# ```julia
# conn = connect(mysql(); host="localhost", user="root", password="password", db="mydb")
# conn = DB.connect(DB.mysql(); host="localhost", user="root", password="password", db="mydb")
# ```
# versus connecting to DuckDB
# ```julia
# conn = connect(duckdb())
# conn = DB.connect(DB.duckdb())
# ```

# You can also use establish a connection through an alternate method that you preferred, and use that as your connection as well.

# ## Package Extensions
# The following backends utilize package extensions. To use one of backends listed below, you will need to write `using Library`

# - ClickHouse: `using ClickHouse`
# - ClickHouse: `import ClickHouse`
# - MySQL and MariaDB: `using MySQL`
# - MSSQL: `using ODBC`
# - Postgres: `using LibPQ`
Expand All @@ -33,3 +36,14 @@
# - Oracle: `using ODBC`
# - Google BigQuery: `using GoogleCloud`

# ## `db_table`
# What does `db_table` do?
# `db_table` starts the underlying SQL query struct, in addition to pulling the table metadata and storing it there. Storing metadata is what enables a lazy interface that also supports tidy selection.
# `db_table` has two required arguments: `connection` and `table`
# `table` can be a table name on a database or a path/url to file to read. When passing `db_table` a path or url, the table is not copied into memory.
# With DuckDB and ClickHouse, if you have a folder of multiple files to read, you can use `*` read in all files matching the pattern.
# For example, the below would read all files that end in `.csv` in the given folder.
# ```julia
# db_table(db, "folder/path/*.csv")
# ```
# `db_table` also supports iceberg, delta, and S3 file paths via DuckDB.
56 changes: 41 additions & 15 deletions ext/CHExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,46 @@ end

# ClickHouse
function TidierDB.get_table_metadata(conn::ClickHouse.ClickHouseSock, table_name::String)
# Query to get column names and types from INFORMATION_SCHEMA
query = """
SELECT
name AS column_name,
type AS data_type
FROM system.columns
WHERE table = '$table_name' AND database = 'default'
"""
result = ClickHouse.select_df(conn,query)

result[!, :current_selxn] .= 1
result[!, :table_name] .= table_name
# Adjust the select statement to include the new table_name column
return select(result, 1 => :name, 2 => :type, :current_selxn, :table_name)
if occursin("/", table_name) || occursin("http", table_name)


query = "DESCRIBE url($table_name)
SETTINGS enable_url_encoding=0,
max_http_get_redirects=10
"
# println(query)
column_info = ClickHouse.select_df(conn, query)
column_info = select(column_info, :name, :type)

# Prepare the column_info DataFrame

# Add the table name and selection marker
column_info[!, :current_selxn] .= 1
table_name = if occursin(r"[:/]", table_name)
split(basename(table_name), '.')[1]
#"'$table_name'"
else
table_name
end
column_info[!, :table_name] .= table_name

else
# Standard case: Querying from system.columns
query = """
SELECT
name AS column_name,
type AS data_type
FROM system.columns
WHERE table = '$table_name' AND database = 'default'
"""
column_info = ClickHouse.select_df(conn, query)

# Add the table name and selection marker
column_info[!, :current_selxn] .= 1
column_info[!, :table_name] .= table_name
end
# Return the result with relevant columns
return select(column_info, 1 => :name, 2 => :type, :current_selxn, :table_name)
end


Expand All @@ -51,4 +77,4 @@ function TidierDB.final_collect(sqlquery, ::Type{<:clickhouse})
return df_result
end

end
end
11 changes: 11 additions & 0 deletions src/TBD_macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -455,15 +455,26 @@ macro count(sqlquery, group_by_columns...)
group_clause = join(group_by_cols_str, ", ")

return quote

sq = $(esc(sqlquery))
if isa(sq, SQLQuery)
# Interpolate `group_clause` directly into the quoted code to avoid scope issues
if !isempty($group_clause)
for col in $group_by_cols_str
$(esc(sqlquery)).metadata.current_selxn .= 0
matching_indices = findall($(esc(sqlquery)).metadata.name .== col)
$(esc(sqlquery)).metadata.current_selxn[matching_indices] .= 1
end
sq.select = "SELECT " * $group_clause * ", COUNT(*) AS count"
sq.groupBy = "GROUP BY " * $group_clause
push!(sq.metadata, Dict("name" => "count", "type" => "UNKNOWN", "current_selxn" => 1, "table_name" => sq.from))

else
# If no grouping columns are specified, just count all records
$(esc(sqlquery)).metadata.current_selxn .= 0
sq.select = "SELECT COUNT(*) AS count"
push!(sq.metadata, Dict("name" => "count", "type" => "UNKNOWN", "current_selxn" => 1, "table_name" => sq.from))

end

# Adjustments for previously set GROUP BY or ORDER BY clauses might be needed here
Expand Down
66 changes: 49 additions & 17 deletions src/TidierDB.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ using GZip
@reexport using Chain
@reexport using DuckDB


export db_table, set_sql_mode, @arrange, @group_by, @filter, @select, @mutate, @summarize, @summarise,
@distinct, @left_join, @right_join, @inner_join, @count, @window_order, @window_frame, @show_query, @collect, @slice_max,
@slice_min, @slice_sample, @rename, copy_to, duckdb_open, duckdb_connect, @semi_join, @full_join,
Expand Down Expand Up @@ -145,8 +144,12 @@ function finalize_query(sqlquery::SQLQuery)
if !isempty(sqlquery.having) push!(query_parts, " " * sqlquery.having) end
if !isempty(sqlquery.orderBy) push!(query_parts, " " * sqlquery.orderBy) end
if !isempty(sqlquery.limit) push!(query_parts, " LIMIT " * sqlquery.limit) end

complete_query = join(filter(!isempty, query_parts), " ")

if !isempty(sqlquery.ch_settings) && current_sql_mode[] == clickhouse()
complete_query = complete_query * " \n " * string(sqlquery.ch_settings)
end
complete_query = replace(complete_query, "&&" => " AND ", "||" => " OR ",
"FROM )" => ")" , "SELECT SELECT " => "SELECT ", "SELECT SELECT " => "SELECT ", "DISTINCT SELECT " => "DISTINCT ",
"SELECT SELECT SELECT " => "SELECT ", "PARTITION BY GROUP BY" => "PARTITION BY", "GROUP BY GROUP BY" => "GROUP BY", "HAVING HAVING" => "HAVING", )
Expand Down Expand Up @@ -224,11 +227,14 @@ function db_table(db, table, athena_params::Any=nothing; iceberg::Bool=false, de
else
error("Unsupported SQL mode: $(current_sql_mode[])")
end

clickhouse_settings =""
formatted_table_name = if current_sql_mode[] == snowflake()
"$(db.database).$(db.schema).$table_name"
elseif db isa DatabricksConnection || current_sql_mode[] == databricks()
"$(db.database).$(db.schema).$table_name"
elseif current_sql_mode[] == clickhouse() && occursin(r"[:/]", table_name)
clickhouse_settings = " SETTINGS enable_url_encoding=0, max_http_get_redirects=10 "
"url('$table_name')"
elseif iceberg
"iceberg_scan('$table_name', allow_moved_paths = true)"
elseif delta
Expand All @@ -241,7 +247,7 @@ function db_table(db, table, athena_params::Any=nothing; iceberg::Bool=false, de
table_name
end

return SQLQuery(from=formatted_table_name, metadata=metadata, db=db, athena_params=athena_params)
return SQLQuery(from=formatted_table_name, metadata=metadata, db=db, athena_params=athena_params, ch_settings=clickhouse_settings)
end

function db_table(db, table::Vector{String}, athena_params::Any=nothing)
Expand All @@ -250,23 +256,49 @@ function db_table(db, table::Vector{String}, athena_params::Any=nothing)
end

# Get file type from the first file
file_type = lowercase(splitext(first(table))[2])

# Format paths: wrap each in single quotes and join with commas
formatted_paths = join(map(path -> "'$path'", table), ", ")
# Check the current SQL mode
if current_sql_mode[] == duckdb()
file_type = lowercase(splitext(first(table))[2])

# Format paths: wrap each in single quotes and join with commas
formatted_paths = join(map(path -> "'$path'", table), ", ")

formatted_table_name = if file_type == ".csv"
"read_csv([$formatted_paths])"
elseif file_type == ".parquet"
"read_parquet([$formatted_paths])"
else
error("Unsupported file type: $file_type")
end

# Get metadata from the first file
meta_vec = first(table)
metadata = get_table_metadata(db, "'$meta_vec'")

return SQLQuery(from=formatted_table_name, metadata=metadata, db=db, athena_params=athena_params)

elseif current_sql_mode[] == clickhouse()

# Construct the ClickHouse SQL query with UNION ALL for each file
union_queries = join(map(path -> """
SELECT *
FROM url('$path')
""", table), " UNION ALL ")

# Wrap the union_queries in a subquery for further processing
formatted_table_name = "($union_queries)"
if occursin(r"[:/]", first(table))
clickhouse_settings = " SETTINGS enable_url_encoding=0, max_http_get_redirects=10 "
end
meta_vec = first(table)
metadata = get_table_metadata(db, "'$meta_vec'")

return SQLQuery(from=formatted_table_name, metadata=metadata, db=db, athena_params=athena_params, ch_settings = clickhouse_settings)

formatted_table_name = if file_type == ".csv"
"read_csv([$formatted_paths])"
elseif file_type == ".parquet"
"read_parquet([$formatted_paths])"
else
error("Unsupported file type: $file_type")
error("Unsupported SQL mode: $(current_sql_mode[])")
end
meta_vec = first(table)
# Get metadata from the first file
metadata = get_table_metadata(db, "'$meta_vec'")

return SQLQuery(from=formatted_table_name, metadata=metadata, db=db, athena_params=athena_params)
end

"""
Expand Down
28 changes: 15 additions & 13 deletions src/docstrings.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1049,21 +1049,23 @@ const docstring_db_table =
"""
db_table(database, table_name, athena_params, delta = false, iceberg = false)
`db_table` starts the underlying SQL query struct, adding the metadata and table.
`db_table` starts the underlying SQL query struct, adding the metadata and table. If paths are passed directly to db_table instead of a
name it will not copy it to memory, but rather ready directly from the file.
# Arguments
`database`: The Database or connection object
`table_name`: tablename as a string. Table name can be a name of a table on the database or paths to the following types
-CSV
-Parquet
-Json
-Iceberg
-Delta
-S3 tables from AWS or Google Cloud
- vector of CSV or Parquet paths to read multiple at once
`delta`: must be true to read delta files
`iceberg`: must be true to read iceberg finalize_ctes
- `database`: The Database or connection object
- `table_name`: tablename as a string (name, local path, or URL).
- CSV/TSV
- Parquet
- Json
- Iceberg
- Delta
- S3 tables from AWS or Google Cloud
- DuckDB and ClickHouse support vectors of paths and URLs.
- DuckDB and ClickHouse also support use of `*` wildcards to read all files of a type in a location such as:
- `db_table(db, "Path/to/testing_files/*.parquet")`
- `delta`: must be true to read delta files
- `iceberg`: must be true to read iceberg finalize_ctes
# Example
```julia
Expand Down
15 changes: 10 additions & 5 deletions src/structs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ mutable struct SQLQuery
cte_count::Int
athena_params::Any
limit::String
ch_settings::String

function SQLQuery(;select::String="", from::String="", where::String="", groupBy::String="", orderBy::String="", having::String="", window_order::String="", windowFrame::String="",
is_aggregated::Bool=false, post_aggregation::Bool=false, metadata::DataFrame=DataFrame(), distinct::Bool=false, db::Any=nothing, ctes::Vector{CTE}=Vector{CTE}(),
cte_count::Int=0, athena_params::Any=nothing, limit::String="" )
new(select, from, where, groupBy, orderBy, having, window_order, windowFrame, is_aggregated, post_aggregation, metadata, distinct, db, ctes, cte_count, athena_params, limit)
function SQLQuery(;select::String="", from::String="", where::String="", groupBy::String="", orderBy::String="", having::String="",
window_order::String="", windowFrame::String="", is_aggregated::Bool=false, post_aggregation::Bool=false, metadata::DataFrame=DataFrame(),
distinct::Bool=false, db::Any=nothing, ctes::Vector{CTE}=Vector{CTE}(), cte_count::Int=0, athena_params::Any=nothing, limit::String="",
ch_settings::String="")
new(select, from, where, groupBy, orderBy, having, window_order, windowFrame, is_aggregated,
post_aggregation, metadata, distinct, db, ctes, cte_count, athena_params, limit, ch_settings)
end
end

Expand Down Expand Up @@ -93,7 +96,9 @@ function from_query(query::TidierDB.SQLQuery)
db=query.db,
ctes=[copy(cte) for cte in query.ctes],
cte_count=query.cte_count,
athena_params = query.athena_params
athena_params = query.athena_params,
limit = query.limit,
ch_settings = query.ch_settings
)
return new_query
end

0 comments on commit 0303dc9

Please sign in to comment.