Skip to content

Commit

Permalink
Add support for AWS Athena
Browse files Browse the repository at this point in the history
  • Loading branch information
aleDsz committed Jun 29, 2022
1 parent b53629c commit 0cfd8b3
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 16 deletions.
78 changes: 76 additions & 2 deletions lib/assets/connection_cell/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,73 @@ export function init(ctx, info) {
`
};

const AthenaForm = {
name: "AthenaForm",

components: {
BaseInput: BaseInput
},

props: {
fields: {
type: Object,
default: {}
},
},

template: `
<div class="row mixed-row">
<BaseInput
name="access_key_id"
label="Access Key ID"
type="text"
placeholder="Access Key ID"
v-model="fields.access_key_id"
inputClass="input"
:grow
/>
<BaseInput
name="secret_access_key"
label="Secret Access Key"
type="password"
placeholder="Secret Access Key"
v-model="fields.secret_access_key"
inputClass="input"
:grow
/>
<BaseInput
name="region"
label="Region"
type="text"
placeholder="Region"
v-model="fields.region"
inputClass="input"
:grow
/>
</div>
<div class="row mixed-row">
<BaseInput
name="database"
label="Database"
type="text"
placeholder="Database"
v-model="fields.database"
inputClass="input"
:grow
/>
<BaseInput
name="output_location"
label="Output Location"
type="url"
placeholder="Output Location"
v-model="fields.output_location"
inputClass="input"
:grow
/>
</div>
`
};

const BigQueryForm = {
name: "BigQueryForm",

Expand Down Expand Up @@ -291,7 +358,8 @@ export function init(ctx, info) {
BaseSelect: BaseSelect,
SQLiteForm: SQLiteForm,
DefaultSQLForm: DefaultSQLForm,
BigQueryForm: BigQueryForm
BigQueryForm: BigQueryForm,
AthenaForm: AthenaForm
},

template: `
Expand Down Expand Up @@ -328,6 +396,7 @@ export function init(ctx, info) {
<SQLiteForm v-bind:fields="fields" v-if="isSQLite" />
<BigQueryForm v-bind:fields="fields" v-if="isBigQuery" />
<AthenaForm v-bind:fields="fields" v-if="isAthena" />
<DefaultSQLForm v-bind:fields="fields" v-if="isDefaultDatabase" />
</div>
</form>
Expand All @@ -342,7 +411,8 @@ export function init(ctx, info) {
{label: "PostgreSQL", value: "postgres"},
{label: "MySQL", value: "mysql"},
{label: "SQLite", value: "sqlite"},
{label: "Google BigQuery", value: "bigquery"}
{label: "Google BigQuery", value: "bigquery"},
{label: "AWS Athena", value: "athena"}
]
}
},
Expand All @@ -356,6 +426,10 @@ export function init(ctx, info) {
return this.fields.type === "bigquery";
},

isAthena() {
return this.fields.type === "athena";
},

isDefaultDatabase() {
return ["postgres", "mysql"].includes(this.fields.type);
}
Expand Down
3 changes: 2 additions & 1 deletion lib/assets/sql_cell/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ export function init(ctx, payload) {
postgres: "PostgreSQL",
mysql: "MySQL",
sqlite: "SQLite",
bigquery: "Google BigQuery"
bigquery: "Google BigQuery",
athena: "AWS Athena"
};

connectionEl.innerHTML = connections.map((connection) => `
Expand Down
2 changes: 1 addition & 1 deletion lib/kino_db.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
results = [Postgrex.Result, MyXQL.Result, Exqlite.Result, ReqBigQuery.Result]
results = [Postgrex.Result, MyXQL.Result, Exqlite.Result, ReqBigQuery.Result, ReqAthena.Result]

for mod <- results do
defimpl Kino.Render, for: mod do
Expand Down
32 changes: 31 additions & 1 deletion lib/kino_db/connection_cell.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ defmodule KinoDB.ConnectionCell do
"database" => attrs["database"] || "",
"project_id" => attrs["project_id"] || "",
"default_dataset_id" => attrs["default_dataset_id"] || "",
"credentials" => attrs["credentials"] || %{}
"credentials" => attrs["credentials"] || %{},
"access_key_id" => attrs["access_key_id"] || "",
"secret_access_key" => attrs["secret_access_key"] || "",
"region" => attrs["region"] || "",
"output_location" => attrs["output_location"] || ""
}

{:ok, assign(ctx, fields: fields, missing_dep: missing_dep(fields))}
Expand Down Expand Up @@ -97,6 +101,9 @@ defmodule KinoDB.ConnectionCell do
"bigquery" ->
~w|project_id default_dataset_id credentials|

"athena" ->
~w|access_key_id secret_access_key region output_location database|

type when type in ["postgres", "mysql"] ->
~w|database hostname port username password|
end
Expand Down Expand Up @@ -185,6 +192,22 @@ defmodule KinoDB.ConnectionCell do
end
end

defp to_quoted(%{"type" => "athena"} = attrs) do
quote do
unquote(quoted_var(attrs["variable"])) =
Req.new(http_errors: :raise)
|> ReqAthena.attach(
access_key_id: unquote(attrs["access_key_id"]),
secret_access_key: unquote(attrs["secret_access_key"]),
region: unquote(attrs["region"]),
database: unquote(attrs["database"]),
output_location: unquote(attrs["output_location"])
)

:ok
end
end

defp shared_options(attrs) do
quote do
[
Expand All @@ -205,6 +228,7 @@ defmodule KinoDB.ConnectionCell do
Code.ensure_loaded?(MyXQL) -> "mysql"
Code.ensure_loaded?(Exqlite) -> "sqlite"
Code.ensure_loaded?(ReqBigQuery) -> "bigquery"
Code.ensure_loaded?(ReqAthena) -> "athena"
true -> "postgres"
end
end
Expand Down Expand Up @@ -233,6 +257,12 @@ defmodule KinoDB.ConnectionCell do
end
end

defp missing_dep(%{"type" => "athena"}) do
unless Code.ensure_loaded?(ReqAthena) do
~s|{:req_athena, github: "livebook-dev/req_athena"}|
end
end

defp missing_dep(_ctx), do: nil

defp join_quoted(quoted_blocks) do
Expand Down
31 changes: 20 additions & 11 deletions lib/kino_db/sql_cell.ex
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ defmodule KinoDB.SQLCell do
defp connection_type(%{request_steps: request_steps}) do
cond do
Keyword.has_key?(request_steps, :bigquery_run) -> "bigquery"
Keyword.has_key?(request_steps, :athena_run) -> "athena"
true -> nil
end
end
Expand Down Expand Up @@ -157,18 +158,11 @@ defmodule KinoDB.SQLCell do
end

defp to_quoted(%{"connection" => %{"type" => "bigquery"}} = attrs) do
{query, params} = parameterize(attrs["query"], fn _n -> "?" end)
bigquery = {quoted_query(query), params}
opts = query_opts_args(attrs)
req_opts = opts |> Enum.at(0, []) |> Keyword.put(:bigquery, bigquery)
to_req_quoted(attrs, fn _n -> "?" end, :bigquery)
end

quote do
unquote(quoted_var(attrs["result_variable"])) =
Req.post!(
unquote(quoted_var(attrs["connection"]["variable"])),
unquote(req_opts)
).body
end
defp to_quoted(%{"connection" => %{"type" => "athena"}} = attrs) do
to_req_quoted(attrs, fn _n -> "?" end, :athena)
end

defp to_quoted(_ctx) do
Expand All @@ -191,6 +185,21 @@ defmodule KinoDB.SQLCell do
end
end

defp to_req_quoted(attrs, next, req_key) do
{query, params} = parameterize(attrs["query"], next)
query = {quoted_query(query), params}
opts = query_opts_args(attrs)
req_opts = opts |> Enum.at(0, []) |> Keyword.put(req_key, query)

quote do
unquote(quoted_var(attrs["result_variable"])) =
Req.post!(
unquote(quoted_var(attrs["connection"]["variable"])),
unquote(req_opts)
).body
end
end

defp quoted_var(nil), do: nil
defp quoted_var(string), do: {String.to_atom(string), [], nil}

Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ defmodule KinoDB.MixProject do
{:myxql, "~> 0.6.2 or ~> 0.7", optional: true},
{:db_connection, "~> 2.4.2", optional: true},
{:req_bigquery, github: "livebook-dev/req_bigquery", optional: true},
{:req_athena, github: "livebook-dev/req_athena", optional: true},
{:ex_doc, "~> 0.28", only: :dev, runtime: false}
]
end
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
%{
"aws_signature": {:hex, :aws_signature, "0.3.1", "67f369094cbd55ffa2bbd8cc713ede14b195fcfb45c86665cd7c5ad010276148", [:rebar3], [], "hexpm", "50fc4dc1d1f7c2d0a8c63f455b3c66ecd74c1cf4c915c768a636f9227704a674"},
"castore": {:hex, :castore, "0.1.17", "ba672681de4e51ed8ec1f74ed624d104c0db72742ea1a5e74edbc770c815182f", [:mix], [], "hexpm", "d9844227ed52d26e7519224525cb6868650c272d4a3d327ce3ca5570c12163f9"},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"db_connection": {:hex, :db_connection, "2.4.2", "f92e79aff2375299a16bcb069a14ee8615c3414863a6fef93156aee8e86c2ff3", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4fe53ca91b99f55ea249693a0229356a08f4d1a7931d8ffa79289b145fe83668"},
Expand Down
29 changes: 29 additions & 0 deletions test/kino_db/connection_cell_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,35 @@ defmodule KinoDB.ConnectionCellTest do
:ok\
"""
end

test "restores source code from attrs with Athena" do
attrs = %{
"variable" => "db",
"type" => "athena",
"access_key_id" => "id",
"secret_access_key" => "secret",
"region" => "region",
"database" => "default",
"output_location" => "s3://my-bucket"
}

{_kino, source} = start_smart_cell!(ConnectionCell, attrs)

assert source ==
"""
db =
Req.new(http_errors: :raise)
|> ReqAthena.attach(
access_key_id: "id",
secret_access_key: "secret",
region: "region",
database: "default",
output_location: "s3://my-bucket"
)
:ok\
"""
end
end

test "when a field changes, broadcasts the change and sends source update" do
Expand Down
38 changes: 38 additions & 0 deletions test/kino_db/sql_cell_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ defmodule KinoDB.SQLCellTest do
assert SQLCell.to_source(put_in(attrs["connection"]["type"], "bigquery")) == """
result = Req.post!(conn, bigquery: {\"SELECT id FROM users\", []}).body\
"""

assert SQLCell.to_source(put_in(attrs["connection"]["type"], "athena")) == """
result = Req.post!(conn, athena: {\"SELECT id FROM users\", []}).body\
"""
end

test "uses heredoc string for a multi-line query" do
Expand Down Expand Up @@ -142,6 +146,17 @@ defmodule KinoDB.SQLCellTest do
""", []}
).body\
'''

assert SQLCell.to_source(put_in(attrs["connection"]["type"], "athena")) == ~s'''
result =
Req.post!(conn,
athena:
{"""
SELECT id FROM users
WHERE last_name = 'Sherlock'
""", []}
).body\
'''
end

test "parses parameter expressions" do
Expand Down Expand Up @@ -183,6 +198,13 @@ defmodule KinoDB.SQLCellTest do
{"SELECT id FROM users WHERE id ? AND name LIKE ?", [user_id, search <> "%"]}
).body\
'''

assert SQLCell.to_source(put_in(attrs["connection"]["type"], "athena")) == ~s'''
result =
Req.post!(conn,
athena: {"SELECT id FROM users WHERE id ? AND name LIKE ?", [user_id, search <> "%"]}
).body\
'''
end

test "ignores parameters inside comments" do
Expand Down Expand Up @@ -247,6 +269,18 @@ defmodule KinoDB.SQLCellTest do
""", [user_id3]}
).body\
'''

assert SQLCell.to_source(put_in(attrs["connection"]["type"], "athena")) == ~s'''
result =
Req.post!(conn,
athena:
{"""
SELECT id from users
-- WHERE id = {{user_id1}}
/* WHERE id = {{user_id2}} */ WHERE id = ?
""", [user_id3]}
).body\
'''
end

test "passes timeout option when a timeout is specified" do
Expand All @@ -272,6 +306,10 @@ defmodule KinoDB.SQLCellTest do
assert SQLCell.to_source(put_in(attrs["connection"]["type"], "bigquery")) == """
result = Req.post!(conn, bigquery: {"SELECT id FROM users", []}, timeout: 30000).body\
"""

assert SQLCell.to_source(put_in(attrs["connection"]["type"], "athena")) == """
result = Req.post!(conn, athena: {"SELECT id FROM users", []}, timeout: 30000).body\
"""
end
end
end

0 comments on commit 0cfd8b3

Please sign in to comment.