|
16 | 16 |
|
17 | 17 | import pytest
|
18 | 18 |
|
19 |
| -from sqlalchemy.testing import config |
| 19 | +from sqlalchemy.testing import config, db |
20 | 20 | from sqlalchemy.testing import eq_
|
21 | 21 | from sqlalchemy.testing import provide_metadata
|
22 | 22 | from sqlalchemy.testing.schema import Column
|
23 | 23 | from sqlalchemy.testing.schema import Table
|
24 | 24 | from sqlalchemy import literal_column
|
25 |
| -from sqlalchemy import select, case, bindparam |
| 25 | + |
| 26 | +from sqlalchemy import bindparam, case, literal, select, util |
26 | 27 | from sqlalchemy import exists
|
27 | 28 | from sqlalchemy import Boolean
|
28 | 29 | from sqlalchemy import String
|
29 |
| -from sqlalchemy.testing import requires |
30 | 30 | from sqlalchemy.types import Integer
|
| 31 | +from sqlalchemy.testing import requires |
| 32 | + |
31 | 33 | from google.api_core.datetime_helpers import DatetimeWithNanoseconds
|
32 | 34 |
|
33 | 35 | from sqlalchemy.testing.suite.test_ddl import * # noqa: F401, F403
|
|
44 | 46 | from sqlalchemy.testing.suite.test_dialect import EscapingTest as _EscapingTest
|
45 | 47 | from sqlalchemy.testing.suite.test_select import ExistsTest as _ExistsTest
|
46 | 48 | from sqlalchemy.testing.suite.test_types import BooleanTest as _BooleanTest
|
47 |
| - |
48 |
| -config.test_schema = "" |
49 |
| - |
| 49 | +from sqlalchemy.testing.suite.test_types import IntegerTest as _IntegerTest |
50 | 50 |
|
51 | 51 | from sqlalchemy.testing.suite.test_types import ( # noqa: F401, F403
|
52 | 52 | DateTest as _DateTest,
|
|
59 | 59 | TimestampMicrosecondsTest,
|
60 | 60 | )
|
61 | 61 |
|
| 62 | +config.test_schema = "" |
| 63 | + |
62 | 64 |
|
63 | 65 | class EscapingTest(_EscapingTest):
|
64 | 66 | @provide_metadata
|
@@ -422,3 +424,88 @@ class TimeTests(_TimeMicrosecondsTest, _TimeTest):
|
422 | 424 | @pytest.mark.skip("Spanner doesn't coerce dates from datetime.")
|
423 | 425 | class DateTimeCoercedToDateTimeTest(_DateTimeCoercedToDateTimeTest):
|
424 | 426 | pass
|
| 427 | + |
| 428 | + |
| 429 | +class IntegerTest(_IntegerTest): |
| 430 | + @provide_metadata |
| 431 | + def _round_trip(self, datatype, data): |
| 432 | + """ |
| 433 | + SPANNER OVERRIDE: |
| 434 | +
|
| 435 | + This is the helper method for integer class tests which creates a table and |
| 436 | + performs an insert operation. |
| 437 | + Cloud Spanner supports tables with an empty primary key, but only one |
| 438 | + row can be inserted into such a table - following insertions will fail with |
| 439 | + `400 id must not be NULL in table date_table`. |
| 440 | + Overriding the tests and adding a manual primary key value to avoid the same |
| 441 | + failures and deleting the table at the end. |
| 442 | + """ |
| 443 | + metadata = self.metadata |
| 444 | + int_table = Table( |
| 445 | + "integer_table", |
| 446 | + metadata, |
| 447 | + Column("id", Integer, primary_key=True, test_needs_autoincrement=True), |
| 448 | + Column("integer_data", datatype), |
| 449 | + ) |
| 450 | + |
| 451 | + metadata.create_all(config.db) |
| 452 | + |
| 453 | + config.db.execute(int_table.insert(), {"id": 1, "integer_data": data}) |
| 454 | + |
| 455 | + row = config.db.execute(select([int_table.c.integer_data])).first() |
| 456 | + |
| 457 | + eq_(row, (data,)) |
| 458 | + |
| 459 | + if util.py3k: |
| 460 | + assert isinstance(row[0], int) |
| 461 | + else: |
| 462 | + assert isinstance(row[0], (long, int)) # noqa |
| 463 | + |
| 464 | + config.db.execute(int_table.delete()) |
| 465 | + |
| 466 | + @provide_metadata |
| 467 | + def _literal_round_trip(self, type_, input_, output, filter_=None): |
| 468 | + """ |
| 469 | + SPANNER OVERRIDE: |
| 470 | +
|
| 471 | + Spanner DBAPI does not execute DDL statements unless followed by a |
| 472 | + non DDL statement, which is preventing correct table clean up. |
| 473 | + The table already exists after related tests finish, so it doesn't |
| 474 | + create a new table and when running tests for other data types |
| 475 | + insertions will fail with `400 Duplicate name in schema: t`. |
| 476 | + Overriding the tests to create and drop a new table to prevent |
| 477 | + database existence errors. |
| 478 | + """ |
| 479 | + |
| 480 | + # for literal, we test the literal render in an INSERT |
| 481 | + # into a typed column. we can then SELECT it back as its |
| 482 | + # official type; ideally we'd be able to use CAST here |
| 483 | + # but MySQL in particular can't CAST fully |
| 484 | + t = Table("int_t", self.metadata, Column("x", type_)) |
| 485 | + t.create() |
| 486 | + |
| 487 | + with db.connect() as conn: |
| 488 | + for value in input_: |
| 489 | + ins = ( |
| 490 | + t.insert() |
| 491 | + .values(x=literal(value)) |
| 492 | + .compile( |
| 493 | + dialect=db.dialect, compile_kwargs=dict(literal_binds=True), |
| 494 | + ) |
| 495 | + ) |
| 496 | + conn.execute(ins) |
| 497 | + conn.execute("SELECT 1") |
| 498 | + |
| 499 | + if self.supports_whereclause: |
| 500 | + stmt = t.select().where(t.c.x == literal(value)) |
| 501 | + else: |
| 502 | + stmt = t.select() |
| 503 | + |
| 504 | + stmt = stmt.compile( |
| 505 | + dialect=db.dialect, compile_kwargs=dict(literal_binds=True), |
| 506 | + ) |
| 507 | + for row in conn.execute(stmt): |
| 508 | + value = row[0] |
| 509 | + if filter_ is not None: |
| 510 | + value = filter_(value) |
| 511 | + assert value in output |
0 commit comments