Skip to content

Commit 79428fb

Browse files
[computes] fixed session-type extraction for connectors
The problem is that connector based query execution is not able to reuse session to fetch results. The frontend is sending the correct session_id but our session fetching logic got broken when the computes was implemented. we are now looking for the session_type from compute['name'] for computes, connector['name'] for connector and then snippets['type'] for old config file based hive/impala sessions. A related change is to make use of session for get_log and check_status calls if the frontend is sending it.
1 parent ca87369 commit 79428fb

File tree

7 files changed

+40
-33
lines changed

7 files changed

+40
-33
lines changed

apps/beeswax/src/beeswax/common.py

+17-10
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,27 @@ def tokenize_and_convert(item, key=None):
9191
return sorted(collection, key=lambda i: tokenize_and_convert(i, key=key))
9292

9393

94-
def is_compute(cluster):
94+
def find_compute_in_cluster(cluster):
9595
if not cluster:
96-
return False
96+
return None
9797
connector = cluster.get('connector')
9898
compute = cluster.get('compute')
9999

100-
def compute_check(x):
100+
def _compute_check(x):
101101
return x and x.get('type') in COMPUTE_TYPES
102-
return compute_check(cluster) or compute_check(connector) or compute_check(compute)
102+
103+
return (
104+
cluster if _compute_check(cluster)
105+
else compute if _compute_check(compute)
106+
else connector if _compute_check(connector) else None)
107+
108+
109+
def extract_session_type(snippet):
110+
compute = find_compute_in_cluster(snippet)
111+
if compute and compute.get('name'):
112+
return compute['name']
113+
return snippet.get('type') if snippet else None
114+
103115

104116

105117
'''
@@ -119,13 +131,8 @@ def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
119131
connector = cluster.get('connector')
120132
compute = cluster.get('compute')
121133

122-
def compute_check(x):
123-
return x and x.get('type') in COMPUTE_TYPES
124-
125134
# Pick the most probable compute object
126-
selected_compute = (cluster if compute_check(cluster)
127-
else compute if compute_check(compute)
128-
else connector if compute_check(connector) else None)
135+
selected_compute = find_compute_in_cluster(cluster)
129136

130137
# If found, we will attempt to reload it, first by id then by name
131138
if selected_compute:

apps/beeswax/src/beeswax/server/dbms.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from kazoo.client import KazooClient
3030

3131
from azure.abfs import abfspath
32-
from beeswax.common import apply_natural_sort, is_compute
32+
from beeswax.common import apply_natural_sort, find_compute_in_cluster
3333
from beeswax.conf import (
3434
APPLY_NATURAL_SORT_MAX,
3535
AUTH_PASSWORD,
@@ -164,7 +164,7 @@ def get(user, query_server=None, cluster=None):
164164

165165

166166
def get_query_server_config(name='beeswax', connector=None):
167-
if connector and (has_connectors() or is_compute(connector)):
167+
if connector and (has_connectors() or find_compute_in_cluster(connector)):
168168
LOG.debug("Query via connector %s (%s)" % (name, connector.get('type')))
169169
query_server = get_query_server_config_via_connector(connector)
170170
else:
@@ -1042,14 +1042,14 @@ def use(self, database, session=None):
10421042
query = hql_query('USE `%s`' % database)
10431043
return self.client.use(query, session=session)
10441044

1045-
def get_log(self, query_handle, start_over=True):
1046-
return self.client.get_log(query_handle, start_over)
1045+
def get_log(self, query_handle, start_over=True, session=None):
1046+
return self.client.get_log(query_handle, start_over, session=session)
10471047

10481048
def get_state(self, handle):
10491049
return self.client.get_state(handle)
10501050

1051-
def get_operation_status(self, handle):
1052-
return self.client.get_operation_status(handle)
1051+
def get_operation_status(self, handle, session=None):
1052+
return self.client.get_operation_status(handle, session=session)
10531053

10541054
def execute_and_wait(self, query, timeout_sec=30.0, sleep_interval=0.5):
10551055
"""

apps/beeswax/src/beeswax/server/hive_metastore_server.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def get_state(self, handle):
142142
def close(self, handle):
143143
pass
144144

145-
def get_operation_status(self, handle):
145+
def get_operation_status(self, handle, session=None):
146146
return MockFinishedOperation()
147147

148148
def get_default_configuration(self, *args, **kwargs):

apps/beeswax/src/beeswax/server/hive_server2_lib.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -1103,18 +1103,18 @@ def fetch_log(self, operation_handle, orientation=TFetchOrientation.FETCH_NEXT,
11031103

11041104
return '\n'.join(lines)
11051105

1106-
def get_operation_status(self, operation_handle):
1106+
def get_operation_status(self, operation_handle, session=None):
11071107
req = TGetOperationStatusReq(operationHandle=operation_handle)
1108-
(res, session) = self.call(self._client.GetOperationStatus, req)
1108+
(res, session) = self.call(self._client.GetOperationStatus, req, session=session)
11091109
return res
11101110

1111-
def get_log(self, operation_handle):
1111+
def get_log(self, operation_handle, session=None):
11121112
try:
11131113
req = TGetLogReq(operationHandle=operation_handle)
1114-
(res, session) = self.call(self._client.GetLog, req)
1114+
(res, session) = self.call(self._client.GetLog, req, session=session)
11151115
return res.log
11161116
except Exception as e:
1117-
if 'Invalid query handle' in str(e):
1117+
if 'Invalid query handle' in str(e) or 'Invalid or unknown query handle' in str(e):
11181118
message = 'Invalid query handle'
11191119
LOG.error('%s: %s' % (message, e))
11201120
else:
@@ -1436,9 +1436,9 @@ def get_state(self, handle):
14361436
res = self._client.get_operation_status(operationHandle)
14371437
return HiveServerQueryHistory.STATE_MAP[res.operationState]
14381438

1439-
def get_operation_status(self, handle):
1439+
def get_operation_status(self, handle, session=None):
14401440
operationHandle = handle.get_rpc_handle()
1441-
return self._client.get_operation_status(operationHandle)
1441+
return self._client.get_operation_status(operationHandle, session=session)
14421442

14431443
def use(self, query, session=None):
14441444
data = self._client.execute_query(query, session=session)
@@ -1482,11 +1482,11 @@ def close_session(self, session):
14821482
def dump_config(self):
14831483
return 'Does not exist in HS2'
14841484

1485-
def get_log(self, handle, start_over=True):
1485+
def get_log(self, handle, start_over=True, session=None):
14861486
operationHandle = handle.get_rpc_handle()
14871487

14881488
if beeswax_conf.USE_GET_LOG_API.get() or self.query_server.get('dialect') == 'impala':
1489-
return self._client.get_log(operationHandle)
1489+
return self._client.get_log(operationHandle, session=session)
14901490
else:
14911491
if start_over:
14921492
orientation = TFetchOrientation.FETCH_FIRST

apps/jobbrowser/src/jobbrowser/apis/query_api.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from babel import localtime
3030
from django.utils.translation import gettext as _
3131

32+
from beeswax.common import extract_session_type
3233
from desktop.lib import export_csvxls
3334
from impala.conf import COORDINATOR_UI_SPNEGO
3435
from jobbrowser.apis.base_api import Api
@@ -54,7 +55,7 @@ def _get_api(user, cluster=None):
5455
server_url = compute['options'].get('api_url')
5556
else:
5657
# TODO: multi computes if snippet.get('compute') or snippet['type'] has computes
57-
application = cluster['compute']['type'] if cluster.get('compute') else cluster.get('interface', 'impala')
58+
application = extract_session_type(cluster) or 'impala'
5859
session = Session.objects.get_session(user, application=application)
5960
server_url = _get_impala_server_url(session)
6061
return get_impalad_api(user=user, url=server_url)

desktop/libs/notebook/src/notebook/connectors/base.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from django.utils.encoding import smart_str
2626
from django.utils.translation import gettext as _
2727

28-
from beeswax.common import find_compute, is_compute
28+
from beeswax.common import find_compute, find_compute_in_cluster
2929
from desktop.auth.backend import is_admin
3030
from desktop.conf import TASK_SERVER, has_connectors, is_cdw_compute_enabled
3131
from desktop.lib import export_csvxls
@@ -402,7 +402,7 @@ def patch_snippet_for_connector(snippet, user=None):
402402
Connector backward compatibility switcher.
403403
# TODO Connector unification
404404
"""
405-
if is_compute(snippet):
405+
if find_compute_in_cluster(snippet):
406406
snippet['connector'] = find_compute(cluster=snippet, user=user)
407407
if snippet['connector'] and snippet['connector'].get('dialect'):
408408
snippet['dialect'] = snippet['connector']['dialect']
@@ -433,7 +433,7 @@ def get_api(request, snippet):
433433
if has_connectors() and snippet.get('type') == 'hello' and is_admin(request.user):
434434
LOG.debug('Using the interpreter from snippet')
435435
interpreter = snippet.get('interpreter')
436-
elif is_cdw_compute_enabled():
436+
elif find_compute_in_cluster(snippet):
437437
LOG.debug("Finding the compute from db using snippet: %s" % snippet)
438438
interpreter = find_compute(cluster=snippet, user=request.user)
439439
if interpreter is None:

desktop/libs/notebook/src/notebook/connectors/hiveserver2.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from django.urls import reverse
2929
from django.utils.translation import gettext as _
3030

31-
from beeswax.common import is_compute
31+
from beeswax.common import extract_session_type
3232
from desktop.auth.backend import is_admin
3333
from desktop.conf import USE_DEFAULT_CONFIGURATION, has_connectors
3434
from desktop.lib.conf import BoundConfig
@@ -321,8 +321,7 @@ def execute(self, notebook, snippet):
321321
db = self._get_db(snippet, interpreter=self.interpreter)
322322

323323
statement = self._get_current_statement(notebook, snippet)
324-
compute = snippet.get('compute', {})
325-
session_type = compute['name'] if is_compute(snippet) and compute.get('name') else snippet['type']
324+
session_type = extract_session_type(snippet)
326325
session = self._get_session(notebook, session_type)
327326

328327
query = self._prepare_hql_query(snippet, statement['statement'], session)

0 commit comments

Comments
 (0)