Skip to content

Commit b3ceb92

Browse files
[computes] fixed session-type extraction for connectors
The problem is that connector based query execution is not able to reuse session to fetch results. The frontend is sending the correct session_id but our session fetching logic got broken when the computes was implemented. we are now looking for the session_type from compute['name'] for computes, connector['name'] for connector and then snippets['type'] for old config file based hive/impala sessions. Change-Id: I386f3af1b0057e078e7b804f447f401e976b433a
1 parent c1875de commit b3ceb92

File tree

1 file changed

+17
-43
lines changed

1 file changed

+17
-43
lines changed

desktop/libs/notebook/src/notebook/connectors/hiveserver2.py

+17-43
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ class HS2Api(Api):
179179
def get_properties(lang='hive'):
180180
return ImpalaConfiguration.PROPERTIES if lang == 'impala' else HiveConfiguration.PROPERTIES
181181

182-
183182
@query_error_handler
184183
def create_session(self, lang='hive', properties=None):
185184
application = 'beeswax' if lang == 'hive' or lang == 'llap' else lang
@@ -246,7 +245,6 @@ def create_session(self, lang='hive', properties=None):
246245

247246
return response
248247

249-
250248
@query_error_handler
251249
def close_session(self, session):
252250
app_name = session.get('type')
@@ -281,7 +279,6 @@ def close_session(self, session):
281279

282280
return response
283281

284-
285282
def close_session_idle(self, notebook, session):
286283
idle = True
287284
response = {'result': []}
@@ -317,16 +314,14 @@ def execute(self, notebook, snippet):
317314
db = self._get_db(snippet, interpreter=self.interpreter)
318315

319316
statement = self._get_current_statement(notebook, snippet)
320-
compute = snippet.get('compute', {})
321-
session_type = compute['name'] if is_compute(snippet) and compute.get('name') else snippet['type']
317+
session_type = self._find_session_type(snippet)
322318
session = self._get_session(notebook, session_type)
323319

324320
query = self._prepare_hql_query(snippet, statement['statement'], session)
325321
_session = self._get_session_by_id(notebook, session_type)
326322

327-
328323
try:
329-
if statement.get('statement_id') == 0: # TODO: move this to client
324+
if statement.get('statement_id') == 0: # TODO: move this to client
330325
if query.database and not statement['statement'].lower().startswith('set'):
331326
result = db.use(query.database, session=_session)
332327
if result.session:
@@ -356,6 +351,14 @@ def execute(self, notebook, snippet):
356351

357352
return response
358353

354+
def _find_session_type(self, snippet):
355+
compute = snippet.get('compute', {})
356+
if is_compute(snippet) and compute.get('name'):
357+
return compute['name']
358+
connector = snippet.get('connector', {})
359+
if connector and connector.get('name'):
360+
return connector['name']
361+
return snippet.get('type')
359362

360363
@query_error_handler
361364
def check_status(self, notebook, snippet):
@@ -384,7 +387,6 @@ def check_status(self, notebook, snippet):
384387

385388
return response
386389

387-
388390
@query_error_handler
389391
def fetch_result(self, notebook, snippet, rows, start_over):
390392
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -411,7 +413,6 @@ def fetch_result(self, notebook, snippet, rows, start_over):
411413
'type': 'table'
412414
}
413415

414-
415416
@query_error_handler
416417
def fetch_result_size(self, notebook, snippet):
417418
resp = {
@@ -440,7 +441,6 @@ def fetch_result_size(self, notebook, snippet):
440441

441442
return resp
442443

443-
444444
@query_error_handler
445445
def cancel(self, notebook, snippet):
446446
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -449,15 +449,13 @@ def cancel(self, notebook, snippet):
449449
db.cancel_operation(handle)
450450
return {'status': 0}
451451

452-
453452
@query_error_handler
454453
def get_log(self, notebook, snippet, startFrom=None, size=None):
455454
db = self._get_db(snippet, interpreter=self.interpreter)
456455

457456
handle = self._get_handle(snippet)
458457
return db.get_log(handle, start_over=startFrom == 0)
459458

460-
461459
@query_error_handler
462460
def close_statement(self, notebook, snippet):
463461
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -472,7 +470,6 @@ def close_statement(self, notebook, snippet):
472470
raise e
473471
return {'status': 0}
474472

475-
476473
def can_start_over(self, notebook, snippet):
477474
try:
478475
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -484,13 +481,12 @@ def can_start_over(self, notebook, snippet):
484481
raise e
485482
return can_start_over
486483

487-
488484
@query_error_handler
489485
def progress(self, notebook, snippet, logs=''):
490486
patch_snippet_for_connector(snippet)
491487

492488
if snippet['dialect'] == 'hive':
493-
match = re.search('Total jobs = (\d+)', logs, re.MULTILINE)
489+
match = re.search(r'Total jobs = (\d+)', logs, re.MULTILINE)
494490
total = int(match.group(1)) if match else 1
495491

496492
started = logs.count('Starting Job')
@@ -499,13 +495,12 @@ def progress(self, notebook, snippet, logs=''):
499495
progress = int((started + ended) * 100 / (total * 2))
500496
return max(progress, 5) # Return 5% progress as a minimum
501497
elif snippet['dialect'] == 'impala':
502-
match = re.findall('(\d+)% Complete', logs, re.MULTILINE)
498+
match = re.findall(r'(\d+)% Complete', logs, re.MULTILINE)
503499
# Retrieve the last reported progress percentage if it exists
504500
return int(match[-1]) if match and isinstance(match, list) else 0
505501
else:
506502
return 50
507503

508-
509504
@query_error_handler
510505
def get_jobs(self, notebook, snippet, logs):
511506
jobs = []
@@ -552,7 +547,6 @@ def get_jobs(self, notebook, snippet, logs):
552547

553548
return jobs
554549

555-
556550
@query_error_handler
557551
def autocomplete(self, snippet, database=None, table=None, column=None, nested=None, operation=None):
558552
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -577,7 +571,6 @@ def autocomplete(self, snippet, database=None, table=None, column=None, nested=N
577571

578572
return resp
579573

580-
581574
@query_error_handler
582575
def get_sample_data(self, snippet, database=None, table=None, column=None, is_async=False, operation=None):
583576
try:
@@ -586,7 +579,6 @@ def get_sample_data(self, snippet, database=None, table=None, column=None, is_as
586579
except QueryServerException as ex:
587580
raise QueryError(ex.message)
588581

589-
590582
@query_error_handler
591583
def explain(self, notebook, snippet):
592584
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -613,7 +605,6 @@ def explain(self, notebook, snippet):
613605
'statement': statement,
614606
}
615607

616-
617608
@query_error_handler
618609
def export_data_as_hdfs_file(self, snippet, target_file, overwrite):
619610
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -626,8 +617,7 @@ def export_data_as_hdfs_file(self, snippet, target_file, overwrite):
626617

627618
return '/filebrowser/view=%s' % urllib_quote(
628619
urllib_quote(target_file.encode('utf-8'), safe=SAFE_CHARACTERS_URI_COMPONENTS)
629-
) # Quote twice, because of issue in the routing on client
630-
620+
) # Quote twice, because of issue in the routing on client
631621

632622
def export_data_as_table(self, notebook, snippet, destination, is_temporary=False, location=None):
633623
db = self._get_db(snippet, interpreter=self.interpreter)
@@ -654,7 +644,6 @@ def export_data_as_table(self, notebook, snippet, destination, is_temporary=Fals
654644

655645
return hql, success_url
656646

657-
658647
def export_large_data_to_hdfs(self, notebook, snippet, destination):
659648
response = self._get_current_statement(notebook, snippet)
660649
session = self._get_session(notebook, snippet['type'])
@@ -684,7 +673,6 @@ def export_large_data_to_hdfs(self, notebook, snippet, destination):
684673

685674
return hql, success_url
686675

687-
688676
def upgrade_properties(self, lang='hive', properties=None):
689677
upgraded_properties = copy.deepcopy(self.get_properties(lang))
690678

@@ -708,7 +696,6 @@ def upgrade_properties(self, lang='hive', properties=None):
708696

709697
return upgraded_properties
710698

711-
712699
def _get_session(self, notebook, type='hive'):
713700
session = next((session for session in notebook['sessions'] if session['type'] == type), None)
714701
return session
@@ -723,7 +710,6 @@ def _get_session_by_id(self, notebook, type='hive'):
723710
filters['owner'] = self.user
724711
return Session.objects.get(**filters)
725712

726-
727713
def _get_hive_execution_engine(self, notebook, snippet):
728714
# Get hive.execution.engine from snippet properties, if none, then get from session
729715
properties = snippet['properties']
@@ -746,7 +732,6 @@ def _get_hive_execution_engine(self, notebook, snippet):
746732

747733
return engine
748734

749-
750735
def _prepare_hql_query(self, snippet, statement, session):
751736
settings = snippet['properties'].get('settings', None)
752737
file_resources = snippet['properties'].get('files', None)
@@ -775,7 +760,6 @@ def _prepare_hql_query(self, snippet, statement, session):
775760
database=database
776761
)
777762

778-
779763
def get_browse_query(self, snippet, database, table, partition_spec=None):
780764
db = self._get_db(snippet, interpreter=self.interpreter)
781765
table = db.get_table(database, table)
@@ -789,7 +773,6 @@ def get_browse_query(self, snippet, database, table, partition_spec=None):
789773
else:
790774
return db.get_select_star_query(database, table, limit=100)
791775

792-
793776
def _get_handle(self, snippet):
794777
try:
795778
handle = snippet['result']['handle'].copy()
@@ -805,7 +788,6 @@ def _get_handle(self, snippet):
805788

806789
return HiveServerQueryHandle(**handle)
807790

808-
809791
def _get_db(self, snippet, is_async=False, interpreter=None):
810792
if interpreter and interpreter.get('dialect'):
811793
dialect = interpreter['dialect']
@@ -828,7 +810,6 @@ def _get_db(self, snippet, is_async=False, interpreter=None):
828810
# Note: name is not used if interpreter is present
829811
return dbms.get(self.user, query_server=get_query_server_config(name=name, connector=interpreter))
830812

831-
832813
def _parse_job_counters(self, job_id):
833814
# Attempt to fetch total records from the job's Hive counter
834815
total_records, total_size = None, None
@@ -864,7 +845,6 @@ def _parse_job_counters(self, job_id):
864845

865846
return total_records, total_size
866847

867-
868848
def _get_hive_result_size(self, notebook, snippet):
869849
total_records, total_size, msg = None, None, None
870850
engine = self._get_hive_execution_engine(notebook, snippet).lower()
@@ -879,8 +859,8 @@ def _get_hive_result_size(self, notebook, snippet):
879859
else:
880860
msg = _('Hive query did not execute any jobs.')
881861
elif engine == 'spark':
882-
total_records_re = "RECORDS_OUT_0: (?P<total_records>\d+)"
883-
total_size_re = "Spark Job\[[a-z0-9-]+\] Metrics[A-Za-z0-9:\s]+ResultSize: (?P<total_size>\d+)"
862+
total_records_re = r"RECORDS_OUT_0: (?P<total_records>\d+)"
863+
total_size_re = r"Spark Job\[[a-z0-9-]+\] Metrics[A-Za-z0-9:\s]+ResultSize: (?P<total_size>\d+)"
884864
total_records_match = re.search(total_records_re, logs, re.MULTILINE)
885865
total_size_match = re.search(total_size_re, logs, re.MULTILINE)
886866

@@ -891,7 +871,6 @@ def _get_hive_result_size(self, notebook, snippet):
891871

892872
return total_records, total_size, msg
893873

894-
895874
def _get_impala_result_size(self, notebook, snippet):
896875
total_records_match = None
897876
total_records, total_size, msg = None, None, None
@@ -904,7 +883,7 @@ def _get_impala_result_size(self, notebook, snippet):
904883

905884
fragment = self._get_impala_query_profile(server_url, query_id=query_id)
906885
total_records_re = \
907-
"Coordinator Fragment F\d\d.+?RowsReturned: \d+(?:.\d+[KMB])? \((?P<total_records>\d+)\).*?(Averaged Fragment F\d\d)"
886+
r"Coordinator Fragment F\d\d.+?RowsReturned: \d+(?:.\d+[KMB])? \((?P<total_records>\d+)\).*?(Averaged Fragment F\d\d)"
908887
total_records_match = re.search(total_records_re, fragment, re.MULTILINE | re.DOTALL)
909888

910889
if total_records_match:
@@ -917,7 +896,6 @@ def _get_impala_result_size(self, notebook, snippet):
917896

918897
return total_records, total_size, msg
919898

920-
921899
def _get_impala_query_id(self, snippet):
922900
guid = None
923901
if 'result' in snippet and 'handle' in snippet['result'] and 'guid' in snippet['result']['handle']:
@@ -929,7 +907,6 @@ def _get_impala_query_id(self, snippet):
929907
LOG.warning('Snippet does not contain a valid result handle, cannot extract Impala query ID.')
930908
return guid
931909

932-
933910
def _get_impala_query_profile(self, server_url, query_id):
934911
api = get_impalad_api(user=self.user, url=server_url)
935912

@@ -944,18 +921,15 @@ def _get_impala_query_profile(self, server_url, query_id):
944921

945922
return profile
946923

947-
948924
def _get_impala_profile_plan(self, query_id, profile):
949-
query_plan_re = "Query \(id=%(query_id)s\):.+?Execution Profile %(query_id)s" % {'query_id': query_id}
925+
query_plan_re = r"Query \(id=%(query_id)s\):.+?Execution Profile %(query_id)s" % {'query_id': query_id}
950926
query_plan_match = re.search(query_plan_re, profile, re.MULTILINE | re.DOTALL)
951927
return query_plan_match.group() if query_plan_match else None
952928

953-
954929
def describe_column(self, notebook, snippet, database=None, table=None, column=None):
955930
db = self._get_db(snippet, interpreter=self.interpreter)
956931
return db.get_table_columns_stats(database, table, column)
957932

958-
959933
def describe_table(self, notebook, snippet, database=None, table=None):
960934
db = self._get_db(snippet, interpreter=self.interpreter)
961935
tb = db.get_table(database, table)

0 commit comments

Comments
 (0)