diff --git a/README.md b/README.md index 98686be..9367d6c 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # e6data Python Connector -![version](https://img.shields.io/badge/version-2.3.9-blue.svg) +![version](https://img.shields.io/badge/version-2.3.10,rc5-blue.svg) ## Introduction diff --git a/e6data_python_connector/e6data_grpc.py b/e6data_python_connector/e6data_grpc.py index 6cecfbf..b15c0e2 100644 --- a/e6data_python_connector/e6data_grpc.py +++ b/e6data_python_connector/e6data_grpc.py @@ -10,11 +10,11 @@ import re import sys import time +import uuid from decimal import Decimal from io import BytesIO from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED import threading -import multiprocessing import grpc from grpc._channel import _InactiveRpcError @@ -26,6 +26,9 @@ from e6data_python_connector.datainputstream import get_query_columns_info, read_rows_from_chunk from e6data_python_connector.server import e6x_engine_pb2_grpc, e6x_engine_pb2 from e6data_python_connector.typeId import * +import logging + +_logger = logging.getLogger(__name__) apilevel = '2.0' threadsafety = 2 # Threads may share the e6xdb and connections. @@ -78,10 +81,12 @@ def wrapper(self, *args, **kwargs): if current_retry == max_retry: raise e if e.code() == grpc.StatusCode.INTERNAL and 'Access denied' in e.details(): + _logger.debug(f'Re-authenticating due to {e}') time.sleep(0.2) self.connection.get_re_authenticate_session_id() elif 'status: 456' in e.details(): # Strategy changed, clear cache and retry + _logger.debug(f'Re-authenticating due to status: 456') _clear_strategy_cache() # Force re-authentication which will detect new strategy self.connection.get_re_authenticate_session_id() @@ -131,10 +136,15 @@ def escape_string(self, item): # Global set to track debug-enabled connections _debug_connections = set() -def _strategy_debug_log(message): +def _generate_trace_id(): + """Generate a unique trace ID for tracking requests across a session.""" + return str(uuid.uuid4())[:8] # Use first 8 characters for readability + +def _strategy_debug_log(message, trace_id=None): """Log strategy debug messages if any connection has debug enabled.""" if _debug_connections: - print(f"[E6DATA_STRATEGY_DEBUG] {time.strftime('%Y-%m-%d %H:%M:%S')} - {message}") + trace_info = f"[{trace_id}] " if trace_id else "" + _logger.debug(f"[E6DATA_STRATEGY_DEBUG] {time.strftime('%Y-%m-%d %H:%M:%S')} - {trace_info}{message}") def _get_shared_strategy(): @@ -241,7 +251,7 @@ def _invalidate_all_sessions(): shared_strategy['session_invalidated'] = True -def _register_query_strategy(query_id, strategy): +def _register_query_strategy(query_id, strategy, session_id=None): """Register the strategy used for a specific query.""" if not query_id or not strategy: return @@ -255,7 +265,8 @@ def _register_query_strategy(query_id, strategy): query_map = shared_strategy.get('query_strategy_map', {}) query_map[query_id] = normalized_strategy shared_strategy['query_strategy_map'] = query_map - _strategy_debug_log(f"Query {query_id} registered with strategy: {normalized_strategy}") + session_info = f", session: {session_id}" if session_id else "" + _strategy_debug_log(f"Query {query_id} registered with strategy: {normalized_strategy}{session_info}") def _get_query_strategy(query_id): @@ -269,7 +280,7 @@ def _get_query_strategy(query_id): return query_map.get(query_id, current_active_strategy) -def _cleanup_query_strategy(query_id): +def _cleanup_query_strategy(query_id, session_id=None): """Remove the strategy mapping for a completed query.""" if not query_id: return @@ -281,7 +292,8 @@ def _cleanup_query_strategy(query_id): del query_map[query_id] shared_strategy['query_strategy_map'] = query_map remaining_queries = len(query_map) - _strategy_debug_log(f"Query {query_id} completed (was using {strategy}). Remaining active queries: {remaining_queries}") + session_info = f", session: {session_id}" if session_id else "" + _strategy_debug_log(f"Query {query_id} completed (was using {strategy}{session_info}). Remaining active queries: {remaining_queries}") def _get_strategy_debug_info(): @@ -298,10 +310,19 @@ def _get_strategy_debug_info(): } -def _get_grpc_header(engine_ip=None, cluster=None, strategy=None): +def _get_grpc_header(engine_ip=None, cluster=None, strategy=None, session_id=None, request_type=None, trace_id=None): """Generate gRPC metadata headers for the request.""" # Use the strategy module's implementation - return _get_strategy_header(engine_ip=engine_ip, cluster=cluster, strategy=strategy) + headers = _get_strategy_header(engine_ip=engine_ip, cluster=cluster, strategy=strategy) + + # Log headers being sent if debug connections exist + if _debug_connections: + session_info = f", session: {session_id}" if session_id else "" + request_info = f" [{request_type}]" if request_type else "" + trace_info = f", trace: {trace_id}" if trace_id else "" + _strategy_debug_log(f"Sending headers{request_info}: strategy={strategy}, cluster={cluster}, engine_ip={engine_ip}{session_info}{trace_info}", trace_id) + + return headers def connect(*args, **kwargs): @@ -372,6 +393,8 @@ def __init__( self.database = database self.cluster_name = cluster_name self._session_id = None + self._session_strategy = None # Track which strategy was used for current session + self._trace_id = _generate_trace_id() # Generate trace ID for this connection self._host = host self._port = port @@ -396,7 +419,7 @@ def __init__( self._debug = debug if self._debug: _debug_connections.add(id(self)) - _strategy_debug_log(f"Debug mode enabled for connection {id(self)}") + _strategy_debug_log(f"Debug mode enabled for connection {id(self)}", self._trace_id) self._create_client() @@ -494,6 +517,7 @@ def get_session_id(self): # Check if session was invalidated globally if self._session_id and session_invalidated: self._session_id = None + self._session_strategy = None self.close() self._create_client() # Clear the invalidation flag @@ -507,8 +531,20 @@ def get_session_id(self): _apply_pending_strategy() # Force complete reconnection with new strategy self._session_id = None + self._session_strategy = None self.close() self._create_client() + + # Check if session strategy matches current active strategy + elif self._session_id and self._session_strategy and self._session_strategy != _get_active_strategy(): + # Session was created with different strategy, need to re-authenticate + _strategy_debug_log(f"Session strategy mismatch: session {self._session_id} created with {self._session_strategy}, current active is {_get_active_strategy()}", self._trace_id) + old_session_id = self._session_id + self._session_id = None + self._session_strategy = None + _strategy_debug_log(f"Invalidating session {old_session_id} due to strategy mismatch", self._trace_id) + self.close() + self._create_client() if not self._session_id: try: @@ -524,29 +560,40 @@ def get_session_id(self): if active_strategy and not pending_strategy: # Use cached strategy only if there's no pending strategy - _strategy_debug_log(f"Authenticating with cached strategy: {active_strategy}") + _strategy_debug_log(f"Authenticating with cached strategy: {active_strategy} (session: {self._session_id})", self._trace_id) try: authenticate_response = self._client.authenticate( authenticate_request, - metadata=_get_grpc_header(cluster=self.cluster_name, strategy=active_strategy) + metadata=_get_grpc_header(cluster=self.cluster_name, strategy=active_strategy, + session_id=self._session_id, request_type="authenticate", trace_id=self._trace_id) ) self._session_id = authenticate_response.sessionId if not self._session_id: raise ValueError("Invalid credentials.") + # Track which strategy was used for this session + self._session_strategy = active_strategy + # Generate new trace ID for this authenticated session + old_trace_id = self._trace_id + self._trace_id = _generate_trace_id() + _strategy_debug_log(f"Authentication successful with strategy: {active_strategy}, session: {self._session_id}, new trace: {self._trace_id} (was: {old_trace_id})", self._trace_id) # Check for new strategy in authenticate response if hasattr(authenticate_response, 'new_strategy') and authenticate_response.new_strategy: new_strategy = authenticate_response.new_strategy.lower() if new_strategy != active_strategy: + _strategy_debug_log(f"Strategy changed from {active_strategy} to {new_strategy} in authenticate response, session: {self._session_id}") _set_pending_strategy(new_strategy) _apply_pending_strategy() + old_session_id = self._session_id self._session_id = None + self._session_strategy = None + _strategy_debug_log(f"Resetting session {old_session_id} due to new strategy: {new_strategy}") self.close() self._create_client() return self.get_session_id except _InactiveRpcError as e: if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details(): # Strategy changed, clear cache and retry - _strategy_debug_log(f"Got 456 error with strategy {active_strategy}, clearing cache and retrying") + _strategy_debug_log(f"Got 456 error with strategy {active_strategy}, session: {self._session_id}, clearing cache and retrying") _clear_strategy_cache() active_strategy = None else: @@ -568,33 +615,43 @@ def get_session_id(self): _strategy_debug_log(f"No cached strategy, will try strategies in order: {strategies}") last_error = None for strategy in strategies: - _strategy_debug_log(f"Attempting authentication with strategy: {strategy}") + _strategy_debug_log(f"Attempting authentication with strategy: {strategy} (no session yet)", self._trace_id) try: authenticate_response = self._client.authenticate( authenticate_request, - metadata=_get_grpc_header(cluster=self.cluster_name, strategy=strategy) + metadata=_get_grpc_header(cluster=self.cluster_name, strategy=strategy, + session_id=None, request_type="authenticate", trace_id=self._trace_id) ) self._session_id = authenticate_response.sessionId if self._session_id: # Success! Cache this strategy - _strategy_debug_log(f"Authentication successful with strategy: {strategy}") _set_active_strategy(strategy) + # Track which strategy was used for this session + self._session_strategy = strategy + # Generate new trace ID for this authenticated session + old_trace_id = self._trace_id + self._trace_id = _generate_trace_id() + _strategy_debug_log(f"Authentication successful with strategy: {strategy}, session: {self._session_id}, new trace: {self._trace_id} (was: {old_trace_id})", self._trace_id) # Check for new strategy in authenticate response if hasattr(authenticate_response, 'new_strategy') and authenticate_response.new_strategy: new_strategy = authenticate_response.new_strategy.lower() if new_strategy != strategy: + _strategy_debug_log(f"Strategy changed from {strategy} to {new_strategy} in authenticate response, session: {self._session_id}") _set_pending_strategy(new_strategy) _apply_pending_strategy() + old_session_id = self._session_id self.close() + _strategy_debug_log(f"Resetting session {old_session_id} due to new strategy: {new_strategy}") self._create_client() self._session_id = None + self._session_strategy = None return self.get_session_id break except _InactiveRpcError as e: if e.code() == grpc.StatusCode.UNKNOWN and 'status: 456' in e.details(): # Wrong strategy, try the next one - _strategy_debug_log(f"Strategy {strategy} failed with 456 error, trying next") + _strategy_debug_log(f"Strategy {strategy} failed with 456 error (session attempt failed), trying next") last_error = e continue else: @@ -662,15 +719,22 @@ def close(self): This method ensures that the gRPC channel is properly closed and the session ID is reset to None. """ + old_session = self._session_id + old_trace_id = self._trace_id if self._channel is not None: self._channel.close() self._channel = None self._session_id = None + self._session_strategy = None # Remove from debug connections if debug was enabled if self._debug: _debug_connections.discard(id(self)) - _strategy_debug_log(f"Debug mode disabled for connection {id(self)}") + session_info = f", session was: {old_session}" if old_session else "" + _strategy_debug_log(f"Debug mode disabled for connection {id(self)}{session_info}, trace: {old_trace_id}", old_trace_id) + + # Reset trace ID to None after connection close + self._trace_id = None def check_connection(self): """ @@ -733,7 +797,8 @@ def clear(self, query_id, engine_ip=None): ) clear_response = self._client.clear( clear_request, - metadata=_get_grpc_header(engine_ip=engine_ip, cluster=self.cluster_name, strategy=_get_active_strategy()) + metadata=_get_grpc_header(engine_ip=engine_ip, cluster=self.cluster_name, strategy=_get_active_strategy(), + session_id=self.connection.get_session_id, request_type="clear", trace_id=self.connection._trace_id) ) # Check for new strategy in clear response @@ -764,7 +829,8 @@ def query_cancel(self, engine_ip, query_id): ) cancel_response = self._client.cancelQuery( cancel_query_request, - metadata=_get_grpc_header(engine_ip=engine_ip, cluster=self.cluster_name, strategy=_get_active_strategy()) + metadata=_get_grpc_header(engine_ip=engine_ip, cluster=self.cluster_name, strategy=_get_active_strategy(), + session_id=self._session_id, request_type="query_cancel", trace_id=self._trace_id) ) # Check for new strategy in cancel response @@ -788,7 +854,8 @@ def dry_run(self, query): ) dry_run_response = self._client.dryRun( dry_run_request, - metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy()) + metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy(), + session_id=self.get_session_id, request_type="dry_run", trace_id=self._trace_id) ) return dry_run_response.dryrunValue @@ -810,7 +877,8 @@ def get_tables(self, catalog, database): ) get_table_response = self._client.getTablesV2( get_table_request, - metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy()) + metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy(), + session_id=self.get_session_id, request_type="get_tables", trace_id=self._trace_id) ) # Check for new strategy in get tables response @@ -838,7 +906,8 @@ def get_columns(self, catalog, database, table): ) get_columns_response = self._client.getColumnsV2( get_columns_request, - metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy()) + metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy(), + session_id=self.get_session_id, request_type="get_columns", trace_id=self._trace_id) ) # Check for new strategy in get columns response @@ -862,7 +931,8 @@ def get_schema_names(self, catalog): ) get_schema_response = self._client.getSchemaNamesV2( get_schema_request, - metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy()) + metadata=_get_grpc_header(cluster=self.cluster_name, strategy=_get_active_strategy(), + session_id=self.get_session_id, request_type="get_schema", trace_id=self._trace_id) ) # Check for new strategy in get schema names response @@ -949,6 +1019,27 @@ def _reset_state(self): """Reset state about the previous query in preparation for running another query""" pass + def _get_metadata(self, request_type="query_operation"): + """ + Get the gRPC metadata for the current query with specific request type. + + Args: + request_type (str): The type of request being made + + Returns: + list: A list of tuples containing gRPC metadata. + """ + # Use query-specific strategy if available, otherwise use session strategy, otherwise use active strategy + if self._query_id: + strategy = _get_query_strategy(self._query_id) + elif self.connection._session_strategy: + # Use the strategy that was used to create the current session + strategy = self.connection._session_strategy + else: + strategy = _get_active_strategy() + return _get_grpc_header(engine_ip=self._engine_ip, cluster=self.connection.cluster_name, strategy=strategy, + session_id=self.connection.get_session_id, request_type=request_type, trace_id=self.connection._trace_id) + @property def metadata(self): """ @@ -957,9 +1048,7 @@ def metadata(self): Returns: list: A list of tuples containing gRPC metadata. """ - # Use query-specific strategy if available, otherwise use active strategy - strategy = _get_query_strategy(self._query_id) if self._query_id else _get_active_strategy() - return _get_grpc_header(engine_ip=self._engine_ip, cluster=self.connection.cluster_name, strategy=strategy) + return self._get_metadata("query_operation") @property def arraysize(self): @@ -1101,7 +1190,7 @@ def clear(self, query_id=None): ) # Get fresh client after session access (may have been invalidated) client = self.connection.client - clear_response = client.clearOrCancelQuery(clear_request, metadata=self.metadata) + clear_response = client.clearOrCancelQuery(clear_request, metadata=self._get_metadata("clear_or_cancel")) # Check for new strategy in clear response if hasattr(clear_response, 'new_strategy') and clear_response.new_strategy: @@ -1109,7 +1198,7 @@ def clear(self, query_id=None): # Clean up query strategy mapping if query_id: - _cleanup_query_strategy(query_id) + _cleanup_query_strategy(query_id, self.connection.get_session_id) # Check if this was the last query and we have a pending strategy shared_strategy = _get_shared_strategy() @@ -1117,7 +1206,7 @@ def clear(self, query_id=None): query_map = shared_strategy.get('query_strategy_map', {}) if pending_strategy and len(query_map) == 0: - _strategy_debug_log(f"Last query cleared, triggering pending strategy transition") + _strategy_debug_log(f"Last query cleared for session {self.connection.get_session_id}, triggering pending strategy transition from {_get_active_strategy()} to {pending_strategy}") _apply_pending_strategy() return clear_response @@ -1133,7 +1222,7 @@ def cancel(self, query_id): self.connection.query_cancel(engine_ip=self._engine_ip, query_id=query_id) if query_id: - _cleanup_query_strategy(query_id) + _cleanup_query_strategy(query_id, self.connection.get_session_id) # Check if this was the last query and we have a pending strategy shared_strategy = _get_shared_strategy() @@ -1141,7 +1230,7 @@ def cancel(self, query_id): query_map = shared_strategy.get('query_strategy_map', {}) if pending_strategy and len(query_map) == 0: - _strategy_debug_log(f"Last query cleared, triggering pending strategy transition") + _strategy_debug_log(f"Last query cleared for session {self.connection.get_session_id}, triggering pending strategy transition from {_get_active_strategy()} to {pending_strategy}") _apply_pending_strategy() def status(self, query_id): @@ -1159,7 +1248,7 @@ def status(self, query_id): queryId=query_id, engineIP=self._engine_ip ) - status_response = self.connection.client.status(status_request, metadata=self.metadata) + status_response = self.connection.client.status(status_request, metadata=self._get_metadata("status")) # Check for new strategy in status response if hasattr(status_response, 'new_strategy') and status_response.new_strategy: @@ -1200,7 +1289,7 @@ def execute(self, operation, parameters=None, **kwargs): client = self.connection.client prepare_statement_response = client.prepareStatement( prepare_statement_request, - metadata=self.metadata + metadata=self._get_metadata("prepare_statement") ) self._query_id = prepare_statement_response.queryId @@ -1215,7 +1304,7 @@ def execute(self, operation, parameters=None, **kwargs): # Register this query with the current strategy current_strategy = _get_active_strategy() if current_strategy: - _register_query_strategy(self._query_id, current_strategy) + _register_query_strategy(self._query_id, current_strategy, self.connection.get_session_id) execute_statement_request = e6x_engine_pb2.ExecuteStatementRequest( engineIP=self._engine_ip, @@ -1226,7 +1315,7 @@ def execute(self, operation, parameters=None, **kwargs): client = self.connection.client execute_response = client.executeStatement( execute_statement_request, - metadata=self.metadata + metadata=self._get_metadata("execute_statement") ) # Check for new strategy in execute response @@ -1245,7 +1334,7 @@ def execute(self, operation, parameters=None, **kwargs): client = self.connection.client prepare_statement_response = client.prepareStatementV2( prepare_statement_request, - metadata=self.metadata, + metadata=self._get_metadata("prepare_statement_v2"), timeout=self.connection.grpc_prepare_timeout ) @@ -1262,7 +1351,7 @@ def execute(self, operation, parameters=None, **kwargs): current_strategy = _get_active_strategy() if current_strategy: - _register_query_strategy(self._query_id, current_strategy) + _register_query_strategy(self._query_id, current_strategy, self.connection.get_session_id) execute_statement_request = e6x_engine_pb2.ExecuteStatementV2Request( engineIP=self._engine_ip, @@ -1273,7 +1362,7 @@ def execute(self, operation, parameters=None, **kwargs): client = self.connection.client execute_response = client.executeStatementV2( execute_statement_request, - metadata=self.metadata + metadata=self._get_metadata("execute_statement_v2") ) # Check for new strategy in execute response diff --git a/setup.py b/setup.py index 051ca53..3323958 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ import setuptools -VERSION = (2, 3, 9,) +VERSION = (2, 3, 10, 'rc5') def get_long_desc():