Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sql): only return tables in current_database #9748

Merged
merged 21 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
19ac32a
fix(postgres): scope `get_schema` to current database + temp tables
gforsyth Aug 1, 2024
0ec2503
fix(duckdb): scope `get_schema` to current database + temp tables
gforsyth Aug 1, 2024
b14ee2e
test(duckdb): add cross schema/catalog test
cpcloud Aug 1, 2024
af48c66
chore(duckdb): simplify `get_schema` by using `DESCRIBE`
cpcloud Aug 3, 2024
4cb3300
chore: clean up errors a bit in duplicate schema test
cpcloud Aug 3, 2024
86fd26f
chore: remove bogus postgres marker
cpcloud Aug 3, 2024
5e84fcf
chore: clean up error message for mysql and exasol
cpcloud Aug 3, 2024
7dce011
chore: xfail mysql
cpcloud Aug 3, 2024
83c422a
fix(trino): handle tables with the same name in different schemas in …
cpcloud Aug 3, 2024
0ebb7ae
chore: remove unnecessary `sg.and_` calls
cpcloud Aug 3, 2024
f7b4bd4
fix(mssql): always search a single schema when getting table informat…
cpcloud Aug 3, 2024
6e1cf4a
feat(mysql): enable creating tables in other databases
cpcloud Aug 4, 2024
cef5969
test(flink): xfail
cpcloud Aug 4, 2024
a348460
chore(risingwave): allow reuse of postgres `get_schema`
cpcloud Aug 4, 2024
6773416
fix(exasol): allow creating tables in other databases
cpcloud Aug 4, 2024
d9627b4
chore: commentary about exceptions
cpcloud Aug 4, 2024
607682b
test(exasol): remove testing hacks that seem to be implemented becaus…
cpcloud Aug 4, 2024
3c1296d
test: ensure that the type are different enough to test the right beh…
cpcloud Aug 4, 2024
7046a2e
fix(snowflake): ensure that schema lookup works with identically name…
cpcloud Aug 4, 2024
56f6f46
test(bigquery): import googlenotfound error to handle overlapping tab…
cpcloud Aug 4, 2024
936873b
chore: unnest
cpcloud Aug 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docker/mysql/startup.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
CREATE USER 'ibis'@'localhost' IDENTIFIED BY 'ibis';
CREATE SCHEMA IF NOT EXISTS test_schema;
GRANT CREATE, DROP ON *.* TO 'ibis'@'%';
GRANT CREATE,SELECT,DROP ON `test_schema`.* TO 'ibis'@'%';
GRANT CREATE,SELECT,DROP ON *.* TO 'ibis'@'%';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Necessary to allow the ibis user to operate on newly created databases.

FLUSH PRIVILEGES;
42 changes: 15 additions & 27 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,41 +337,29 @@ def get_schema(
-------
sch.Schema
Ibis schema

"""
conditions = [sg.column("table_name").eq(sge.convert(table_name))]

if catalog is not None:
conditions.append(sg.column("table_catalog").eq(sge.convert(catalog)))

if database is not None:
conditions.append(sg.column("table_schema").eq(sge.convert(database)))

query = (
sg.select(
"column_name",
"data_type",
sg.column("is_nullable").eq(sge.convert("YES")).as_("nullable"),
query = sge.Describe(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DESCRIBE in DuckDB used to be different than using information_schema, but only for CSV files, and that case has been fixed in a version that we no longer support.

this=sg.table(
table_name, db=database, catalog=catalog, quoted=self.compiler.quoted
)
.from_(sg.table("columns", db="information_schema"))
.where(sg.and_(*conditions))
.order_by("ordinal_position")
)
).sql(self.dialect)

with self._safe_raw_sql(query) as cur:
meta = cur.fetch_arrow_table()

if not meta:
try:
result = self.con.sql(query)
except duckdb.CatalogException:
raise exc.IbisError(f"Table not found: {table_name!r}")
else:
meta = result.fetch_arrow_table()

names = meta["column_name"].to_pylist()
types = meta["data_type"].to_pylist()
nullables = meta["nullable"].to_pylist()
types = meta["column_type"].to_pylist()
nullables = meta["null"].to_pylist()

type_mapper = self.compiler.type_mapper
return sch.Schema(
{
name: self.compiler.type_mapper.from_string(typ, nullable=nullable)
for name, typ, nullable in zip(names, types, nullables)
name: type_mapper.from_string(typ, nullable=null == "YES")
for name, typ, null in zip(names, types, nullables)
}
)

Expand Down Expand Up @@ -512,7 +500,7 @@ def _load_extensions(
query = (
sg.select(f.anon.unnest(f.list_append(C.aliases, C.extension_name)))
.from_(f.duckdb_extensions())
.where(sg.and_(C.installed, C.loaded))
.where(C.installed, C.loaded)
)
with self._safe_raw_sql(query) as cur:
installed = map(itemgetter(0), cur.fetchall())
Expand Down
30 changes: 30 additions & 0 deletions ibis/backends/duckdb/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,33 @@ def test_hugging_face(con, url, method_name):
method = getattr(con, method_name)
t = method(url)
assert t.count().execute() > 0


def test_multiple_tables_with_the_same_name(tmp_path):
# check within the same database
path = tmp_path / "test1.ddb"
with duckdb.connect(str(path)) as con:
con.execute("CREATE TABLE t (x INT)")
con.execute("CREATE SCHEMA s")
con.execute("CREATE TABLE s.t (y STRING)")

con = ibis.duckdb.connect(path)
t1 = con.table("t")
t2 = con.table("t", database="s")
assert t1.schema() == ibis.schema({"x": "int32"})
assert t2.schema() == ibis.schema({"y": "string"})

path = tmp_path / "test2.ddb"
with duckdb.connect(str(path)) as c:
c.execute("CREATE TABLE t (y DOUBLE[])")

# attach another catalog and check that too
con.attach(path, name="w")
t1 = con.table("t")
t2 = con.table("t", database="s")
assert t1.schema() == ibis.schema({"x": "int32"})
assert t2.schema() == ibis.schema({"y": "string"})

t3 = con.table("t", database="w.main")

assert t3.schema() == ibis.schema({"y": "array<float64>"})
21 changes: 11 additions & 10 deletions ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,16 +345,9 @@

if temp:
raise com.UnsupportedOperationError(
"Creating temp tables is not supported by Exasol."
f"Creating temp tables is not supported by {self.name}"
)

if database is not None and database != self.current_database:
raise com.UnsupportedOperationError(
"Creating tables in other databases is not supported by Exasol"
)
else:
database = None

quoted = self.compiler.quoted

temp_memtable_view = None
Expand Down Expand Up @@ -435,7 +428,11 @@
raise NotImplementedError(
"`catalog` argument is not supported for the Exasol backend"
)
drop_schema = sg.exp.Drop(kind="SCHEMA", this=name, exists=force)
drop_schema = sg.exp.Drop(

Check warning on line 431 in ibis/backends/exasol/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/exasol/__init__.py#L431

Added line #L431 was not covered by tests
kind="SCHEMA",
this=sg.to_identifier(name, quoted=self.compiler.quoted),
exists=force,
)
with self.begin() as con:
con.execute(drop_schema.sql(dialect=self.dialect))

Expand All @@ -446,7 +443,11 @@
raise NotImplementedError(
"`catalog` argument is not supported for the Exasol backend"
)
create_database = sg.exp.Create(kind="SCHEMA", this=name, exists=force)
create_database = sg.exp.Create(

Check warning on line 446 in ibis/backends/exasol/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/exasol/__init__.py#L446

Added line #L446 was not covered by tests
kind="SCHEMA",
this=sg.to_identifier(name, quoted=self.compiler.quoted),
exists=force,
)
open_database = self.current_database
with self.begin() as con:
con.execute(create_database.sql(dialect=self.dialect))
Expand Down
33 changes: 13 additions & 20 deletions ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,15 @@ def get_schema(
if name.startswith("ibis_cache_"):
catalog, database = ("tempdb", "dbo")
name = "##" + name
conditions = [sg.column("table_name").eq(sge.convert(name))]

if database is not None:
conditions.append(sg.column("table_schema").eq(sge.convert(database)))

query = (
sg.select(
"column_name",
"data_type",
"is_nullable",
"numeric_precision",
"numeric_scale",
"datetime_precision",
C.column_name,
C.data_type,
C.is_nullable,
C.numeric_precision,
C.numeric_scale,
C.datetime_precision,
)
.from_(
sg.table(
Expand All @@ -199,8 +195,11 @@ def get_schema(
catalog=catalog or self.current_catalog,
)
)
.where(*conditions)
.order_by("ordinal_position")
.where(
C.table_name.eq(sge.convert(name)),
C.table_schema.eq(sge.convert(database or self.current_database)),
)
.order_by(C.ordinal_position)
)

with self._safe_raw_sql(query) as cur:
Expand Down Expand Up @@ -487,26 +486,20 @@ def list_tables(
"""
table_loc = self._warn_and_create_table_loc(database, schema)
catalog, db = self._to_catalog_db_tuple(table_loc)
conditions = []

if db:
conditions.append(C.table_schema.eq(sge.convert(db)))

sql = (
sg.select("table_name")
sg.select(C.table_name)
.from_(
sg.table(
"TABLES",
db="INFORMATION_SCHEMA",
catalog=catalog if catalog is not None else self.current_catalog,
)
)
.where(C.table_schema.eq(sge.convert(db or self.current_database)))
.distinct()
)

if conditions:
sql = sql.where(*conditions)

sql = sql.sql(self.dialect)

with self._safe_raw_sql(sql) as cur:
Expand Down
7 changes: 0 additions & 7 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,6 @@ def create_table(
if obj is None and schema is None:
raise ValueError("Either `obj` or `schema` must be specified")

if database is not None and database != self.current_database:
raise com.UnsupportedOperationError(
"Creating tables in other databases is not supported by Postgres"
)
else:
database = None

properties = []

if temp:
Expand Down
28 changes: 26 additions & 2 deletions ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,20 @@ def _post_connect(self) -> None:
with self.begin() as cur:
cur.execute("SET TIMEZONE = UTC")

@property
def _session_temp_db(self) -> str | None:
# Postgres doesn't assign the temporary table database until the first
# temp table is created in a given session.
# Before that temp table is created, this will return `None`
# After a temp table is created, it will return `pg_temp_N` where N is
# some integer
res = self.raw_sql(
"select nspname from pg_namespace where oid = pg_my_temp_schema()"
).fetchone()
if res is not None:
return res[0]
return res

def list_tables(
self,
like: str | None = None,
Expand Down Expand Up @@ -458,7 +472,7 @@ def function(self, name: str, *, database: str | None = None) -> Callable:
on=n.oid.eq(p.pronamespace),
join_type="LEFT",
)
.where(sg.and_(*predicates))
.where(*predicates)
)

def split_name_type(arg: str) -> tuple[str, dt.DataType]:
Expand Down Expand Up @@ -571,6 +585,16 @@ def get_schema(

format_type = self.compiler.f["pg_catalog.format_type"]

# If no database is specified, assume the current database
db = database or self.current_database

dbs = [sge.convert(db)]

# If a database isn't specified, then include temp tables in the
# returned values
if database is None and (temp_table_db := self._session_temp_db) is not None:
dbs.append(sge.convert(temp_table_db))

type_info = (
sg.select(
a.attname.as_("column_name"),
Expand All @@ -591,7 +615,7 @@ def get_schema(
.where(
a.attnum > 0,
sg.not_(a.attisdropped),
n.nspname.eq(sge.convert(database)) if database is not None else TRUE,
n.nspname.isin(*dbs),
c.relname.eq(sge.convert(name)),
)
.order_by(a.attnum)
Expand Down
6 changes: 6 additions & 0 deletions ibis/backends/risingwave/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,3 +586,9 @@ def drop_sink(
)
with self._safe_raw_sql(src):
pass

@property
def _session_temp_db(self) -> str | None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually enough to make tests pass, since otherwise get_schema works just fine.

# Return `None`, because RisingWave does not implement temp tables like
# Postgres
return None
7 changes: 7 additions & 0 deletions ibis/backends/snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,13 @@ def get_schema(
catalog: str | None = None,
database: str | None = None,
) -> Iterable[tuple[str, dt.DataType]]:
# this will always show temp tables with the same name as a non-temp
# table first
#
# snowflake puts temp tables in the same catalog and database as
# non-temp tables and differentiates between them using a different
# mechanism than other database that often put temp tables in a hidden
# or intentionall-difficult-to-access catalog/database
table = sg.table(
table_name, db=database, catalog=catalog, quoted=self.compiler.quoted
)
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/tests/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@

try:
from google.api_core.exceptions import BadRequest as GoogleBadRequest
from google.api_core.exceptions import NotFound as GoogleNotFound
except ImportError:
GoogleBadRequest = None
GoogleBadRequest = GoogleNotFound = None

try:
from polars.exceptions import ColumnNotFoundError as PolarsColumnNotFoundError
Expand Down
Loading
Loading