Skip to content

Commit

Permalink
Modernize SQLA pessimistic handling (#3256)
Browse files Browse the repository at this point in the history
Looks like SQLAlchemy has redefined the best practice around
pessimistic connection handling.
  • Loading branch information
mistercrunch authored Aug 9, 2017
1 parent 6da68ab commit cc36428
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 14 deletions.
2 changes: 1 addition & 1 deletion superset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def get_js_manifest():
if conf.get('WTF_CSRF_ENABLED'):
csrf = CSRFProtect(app)

utils.pessimistic_connection_handling(db.engine.pool)
utils.pessimistic_connection_handling(db.engine)

cache = utils.setup_cache(app, conf.get('CACHE_CONFIG'))
tables_cache = utils.setup_cache(app, conf.get('TABLE_NAMES_CACHE_CONFIG'))
Expand Down
49 changes: 36 additions & 13 deletions superset/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import markdown as md
from past.builtins import basestring
from pydruid.utils.having import Having
from sqlalchemy import event, exc
from sqlalchemy import event, exc, select
from sqlalchemy.types import TypeDecorator, TEXT

logging.getLogger('MARKDOWN').setLevel(logging.INFO)
Expand Down Expand Up @@ -436,19 +436,42 @@ def __exit__(self, type, value, traceback):
logging.warning("timeout can't be used in the current context")
logging.exception(e)

def pessimistic_connection_handling(target):
@event.listens_for(target, "checkout")
def ping_connection(dbapi_connection, connection_record, connection_proxy):
"""
Disconnect Handling - Pessimistic, taken from:
http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
"""
cursor = dbapi_connection.cursor()

def pessimistic_connection_handling(some_engine):
@event.listens_for(some_engine, "engine_connect")
def ping_connection(connection, branch):
if branch:
# "branch" refers to a sub-connection of a connection,
# we don't want to bother pinging on these.
return

# turn off "close with result". This flag is only used with
# "connectionless" execution, otherwise will be False in any case
save_should_close_with_result = connection.should_close_with_result
connection.should_close_with_result = False

try:
cursor.execute("SELECT 1")
except:
raise exc.DisconnectionError()
cursor.close()
# run a SELECT 1. use a core select() so that
# the SELECT of a scalar value without a table is
# appropriately formatted for the backend
connection.scalar(select([1]))
except exc.DBAPIError as err:
# catch SQLAlchemy's DBAPIError, which is a wrapper
# for the DBAPI's exception. It includes a .connection_invalidated
# attribute which specifies if this connection is a "disconnect"
# condition, which is based on inspection of the original exception
# by the dialect in use.
if err.connection_invalidated:
# run the same SELECT again - the connection will re-validate
# itself and establish a new connection. The disconnect detection
# here also causes the whole connection pool to be invalidated
# so that all stale connections are discarded.
connection.scalar(select([1]))
else:
raise
finally:
# restore "close with result"
connection.should_close_with_result = save_should_close_with_result


class QueryStatus(object):
Expand Down

0 comments on commit cc36428

Please sign in to comment.