Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Session leaks #957

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9d83ef2
feat: session leak changes
surbhigarg92 May 8, 2023
4d16548
Merge remote-tracking branch 'upstream/main' into session_leaks
surbhigarg92 Jun 7, 2023
920eb55
unit tests and logging
surbhigarg92 Jun 13, 2023
aff17b1
revert test changes
surbhigarg92 Jun 13, 2023
f271251
tests
surbhigarg92 Jun 14, 2023
799589e
revert noxfile chanegs
surbhigarg92 Jun 14, 2023
38f71b8
tests
surbhigarg92 Jun 15, 2023
54e1717
session traces
surbhigarg92 Jun 15, 2023
7ed07c6
tests cases
surbhigarg92 Jun 15, 2023
57309de
formatting
surbhigarg92 Jun 15, 2023
c38b15d
documentation
surbhigarg92 Jun 15, 2023
d678841
Review comments
surbhigarg92 Jun 20, 2023
286e1f4
fix system tests
surbhigarg92 Jun 20, 2023
be6eee3
review comments
surbhigarg92 Jun 21, 2023
86a2552
review comments
surbhigarg92 Jun 21, 2023
c33f930
review comments
surbhigarg92 Jun 22, 2023
cb748d8
lint
surbhigarg92 Jun 22, 2023
1b72281
unit test
surbhigarg92 Jun 22, 2023
ac9e0e4
review comments
surbhigarg92 Jul 3, 2023
ff8674d
review comments
surbhigarg92 Jul 3, 2023
5a09450
review commemts
surbhigarg92 Jul 4, 2023
a75f147
review comments
surbhigarg92 Jul 5, 2023
9540c0a
fix: long running error message
surbhigarg92 Jul 24, 2023
c419204
Merge branch 'main' into session_leaks
surbhigarg92 Jul 24, 2023
b735e5e
Merge branch 'main' into session_leaks
surbhigarg92 Jul 25, 2023
2e489c4
Merge branch 'main' into session_leaks
surbhigarg92 Jul 28, 2023
1dccb4f
Merge branch 'main' into session_leaks
asthamohta Sep 19, 2023
0e04dde
Merge branch 'main' into session_leaks
asthamohta Sep 21, 2023
ee5b3a3
race condition-session already returned to pool
surbhigarg92 Nov 6, 2023
f8c258d
merge
surbhigarg92 Nov 6, 2023
9331df2
lint
surbhigarg92 Nov 6, 2023
33988c9
Merge branch 'googleapis:main' into session_leaks
surbhigarg92 Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion google/cloud/spanner_dbapi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,15 @@ def connect(

instance = client.instance(instance_id)
conn = Connection(
instance, instance.database(database_id, pool=pool) if database_id else None
instance,
instance.database(
database_id,
pool=pool,
logging_enabled=False,
close_inactive_transactions=False,
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
)
if database_id
else None,
)
if pool is not None:
conn._own_pool = False
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/spanner_v1/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
+ "numeric has a whole component with precision {}"
)

# Constants
DELETE_LONG_RUNNING_TRANSACTION_INTERVAL_SEC = 120
DELETE_LONG_RUNNING_TRANSACTION_TIMEOUT_SEC = 3600


def _try_to_coerce_bytes(bytestring):
"""Try to coerce a byte string into the right thing based on Python
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def _check_state(self):
"""
if self.committed is not None:
raise ValueError("Batch already committed")
if self._session is None:
raise ValueError(
"Transaction has been closed as it was running for more than 60 minutes"
)

def commit(self, return_commit_stats=False, request_options=None):
"""Commit mutations to the database.
Expand Down
71 changes: 53 additions & 18 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,20 @@ class Database(object):
passed, the database will construct an instance of
:class:`~google.cloud.spanner_v1.pool.BurstyPool`.

:type logging_enabled: boolean
:parama logging_enabled: (Optional) Represents whether the database
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
has logging enabled or not. Default is True

asthamohta marked this conversation as resolved.
Show resolved Hide resolved
:type logger: :class:`logging.Logger`
:param logger: (Optional) a custom logger that is used if `log_commit_stats`
is `True` to log commit statistics. If not passed, a logger
will be created when needed that will log the commit statistics
to stdout.

:type close_inactive_transactions: boolean
:param close_inactive_transactions: (Optional) Represents whether the database
has close inactive transactions enabled or not. Default is False
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved

:type encryption_config:
:class:`~google.cloud.spanner_admin_database_v1.types.EncryptionConfig`
or :class:`~google.cloud.spanner_admin_database_v1.types.RestoreDatabaseEncryptionConfig`
Expand Down Expand Up @@ -141,7 +150,9 @@ def __init__(
instance,
ddl_statements=(),
pool=None,
logging_enabled=True,
logger=None,
close_inactive_transactions=False,
encryption_config=None,
database_dialect=DatabaseDialect.DATABASE_DIALECT_UNSPECIFIED,
database_role=None,
Expand All @@ -159,7 +170,9 @@ def __init__(
self._encryption_info = None
self._default_leader = None
self.log_commit_stats = False
self._logging_enabled = logging_enabled
self._logger = logger
self._close_inactive_transactions = close_inactive_transactions
self._encryption_config = encryption_config
self._database_dialect = database_dialect
self._database_role = database_role
Expand Down Expand Up @@ -339,6 +352,14 @@ def database_role(self):
"""
return self._database_role

@property
def logging_enabled(self):
"""Whether the database has logging enabled. Default: True.
:rtype: bool
:returns: True if logging is enabled, else False.
"""
return self._logging_enabled

@property
def reconciling(self):
"""Whether the database is currently reconciling.
Expand Down Expand Up @@ -366,7 +387,7 @@ def enable_drop_protection(self, value):
def logger(self):
"""Logger used by the database.

The default logger will log commit stats at the log level INFO using
The default logger will log at the log level INFO using
`sys.stderr`.

:rtype: :class:`logging.Logger` or `None`
Expand All @@ -381,6 +402,14 @@ def logger(self):
self._logger.addHandler(ch)
return self._logger

@property
def close_inactive_transactions(self):
"""Whether the database has has closing inactive transactions enabled. Default: False.
:rtype: bool
:returns: True if closing inactive transactions is enabled, else False.
"""
return self._close_inactive_transactions

@property
def spanner_api(self):
"""Helper for session-related API calls."""
Expand Down Expand Up @@ -647,7 +676,7 @@ def execute_partitioned_dml(
)

def execute_pdml():
with SessionCheckout(self._pool) as session:
with SessionCheckout(self._pool, isLongRunning=True) as session:
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved

txn = api.begin_transaction(
session=session.name, options=txn_options, metadata=metadata
Expand Down Expand Up @@ -1008,7 +1037,7 @@ class BatchCheckout(object):

def __init__(self, database, request_options=None):
self._database = database
self._session = self._batch = None
self._batch = None
if request_options is None:
self._request_options = RequestOptions()
elif type(request_options) == dict:
Expand All @@ -1018,11 +1047,12 @@ def __init__(self, database, request_options=None):

def __enter__(self):
"""Begin ``with`` block."""
session = self._session = self._database._pool.get()
batch = self._batch = Batch(session)
session = self._database._pool.get()
self._batch = Batch(session)
session._batch = self._batch
if self._request_options.transaction_tag:
batch.transaction_tag = self._request_options.transaction_tag
return batch
self._batch.transaction_tag = self._request_options.transaction_tag
return self._batch

def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
Expand All @@ -1038,7 +1068,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"CommitStats: {}".format(self._batch.commit_stats),
extra={"commit_stats": self._batch.commit_stats},
)
self._database._pool.put(self._session)
if self._batch._session is not None:
self._database._pool.put(self._batch._session)


class SnapshotCheckout(object):
Expand All @@ -1061,23 +1092,27 @@ class SnapshotCheckout(object):

def __init__(self, database, **kw):
self._database = database
self._session = None
self._snapshot = None
self._kw = kw

def __enter__(self):
"""Begin ``with`` block."""
session = self._session = self._database._pool.get()
return Snapshot(session, **self._kw)
session = self._database._pool.get()
self._snapshot = Snapshot(session, **self._kw)
session._snapshot = self._snapshot
return self._snapshot

def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
if isinstance(exc_val, NotFound):
# If NotFound exception occurs inside the with block
# then we validate if the session still exists.
if not self._session.exists():
self._session = self._database._pool._new_session()
self._session.create()
self._database._pool.put(self._session)
# self._snapshot._session is None that means session has been returned by background task
if self._snapshot._session is not None:
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(exc_val, NotFound):
# If NotFound exception occurs inside the with block
# then we validate if the session still exists.
if not self._snapshot._session.exists():
self._snapshot._session = self._database._pool._new_session()
self._snapshot._session.create()
self._database._pool.put(self._snapshot._session)


class BatchSnapshot(object):
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/spanner_v1/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,9 @@ def database(
database_id,
ddl_statements=(),
pool=None,
logging_enabled=True,
logger=None,
close_inactive_transactions=False,
encryption_config=None,
database_dialect=DatabaseDialect.DATABASE_DIALECT_UNSPECIFIED,
database_role=None,
Expand All @@ -447,12 +449,20 @@ def database(
:class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`.
:param pool: (Optional) session pool to be used by database.

:type logging_enabled: boolean
:param logging_enabled: (Optional) Represents whether the database
has logging enabled or not. Default is True

:type logger: :class:`logging.Logger`
:param logger: (Optional) a custom logger that is used if `log_commit_stats`
is `True` to log commit statistics. If not passed, a logger
will be created when needed that will log the commit statistics
to stdout.

:type close_inactive_transactions: boolean
:param close_inactive_transactions: (Optional) Represents whether the database
has close inactive transactions enabled or not. Default is False

:type encryption_config:
:class:`~google.cloud.spanner_admin_database_v1.types.EncryptionConfig`
or :class:`~google.cloud.spanner_admin_database_v1.types.RestoreDatabaseEncryptionConfig`
Expand Down Expand Up @@ -480,7 +490,9 @@ def database(
self,
ddl_statements=ddl_statements,
pool=pool,
logging_enabled=logging_enabled,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we using the instance level configs that's newly introduced here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not instance level properties. Instance class has database method which ultimately calls database class and create an object. So we need to give an option to pass the database properties in this method as well.

logger=logger,
close_inactive_transactions=close_inactive_transactions,
encryption_config=encryption_config,
database_dialect=database_dialect,
database_role=database_role,
Expand Down
Loading