@@ -144,104 +144,104 @@ class KyuubiSessionImpl(
144144
145145 private [kyuubi] def openEngineSession (extraEngineLog : Option [OperationLog ] = None ): Unit =
146146 handleSessionException {
147- withWriteLockAcquired {
148- withDiscoveryClient(sessionConf) { discoveryClient =>
149- var openEngineSessionConf =
150- optimizedConf ++ Map (KYUUBI_SESSION_HANDLE_KEY -> handle.identifier.toString)
151- if (engineCredentials.nonEmpty) {
152- sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY , engineCredentials)
153- openEngineSessionConf =
154- openEngineSessionConf ++ Map (KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
155- }
147+ withDiscoveryClient(sessionConf) { discoveryClient =>
148+ var openEngineSessionConf =
149+ optimizedConf ++ Map (KYUUBI_SESSION_HANDLE_KEY -> handle.identifier.toString)
150+ if (engineCredentials.nonEmpty) {
151+ sessionConf.set(KYUUBI_ENGINE_CREDENTIALS_KEY , engineCredentials)
152+ openEngineSessionConf =
153+ openEngineSessionConf ++ Map (KYUUBI_ENGINE_CREDENTIALS_KEY -> engineCredentials)
154+ }
156155
157- if (sessionConf.get(SESSION_USER_SIGN_ENABLED )) {
158- openEngineSessionConf = openEngineSessionConf +
159- (SESSION_USER_SIGN_ENABLED .key ->
160- sessionConf.get(SESSION_USER_SIGN_ENABLED ).toString) +
161- (KYUUBI_SESSION_SIGN_PUBLICKEY ->
162- Base64 .getEncoder.encodeToString(
163- sessionManager.signingPublicKey.getEncoded)) +
164- (KYUUBI_SESSION_USER_SIGN -> sessionUserSignBase64)
165- }
156+ if (sessionConf.get(SESSION_USER_SIGN_ENABLED )) {
157+ openEngineSessionConf = openEngineSessionConf +
158+ (SESSION_USER_SIGN_ENABLED .key ->
159+ sessionConf.get(SESSION_USER_SIGN_ENABLED ).toString) +
160+ (KYUUBI_SESSION_SIGN_PUBLICKEY ->
161+ Base64 .getEncoder.encodeToString(
162+ sessionManager.signingPublicKey.getEncoded)) +
163+ (KYUUBI_SESSION_USER_SIGN -> sessionUserSignBase64)
164+ }
166165
167- val maxAttempts = sessionManager.getConf.get(ENGINE_OPEN_MAX_ATTEMPTS )
168- val retryWait = sessionManager.getConf.get(ENGINE_OPEN_RETRY_WAIT )
169- val openOnFailure =
170- EngineOpenOnFailure .withName(sessionManager.getConf.get(ENGINE_OPEN_ON_FAILURE ))
171- var attempt = 0
172- var shouldRetry = true
173- while (attempt <= maxAttempts && shouldRetry) {
174- val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
175-
176- def deregisterEngine (): Unit =
177- try {
178- engine.deregister(discoveryClient, (host, port))
179- } catch {
180- case e : Throwable =>
181- warn(s " Error on de-registering engine [ ${engine.engineSpace} $host: $port] " , e)
182- }
166+ val maxAttempts = sessionManager.getConf.get(ENGINE_OPEN_MAX_ATTEMPTS )
167+ val retryWait = sessionManager.getConf.get(ENGINE_OPEN_RETRY_WAIT )
168+ val openOnFailure =
169+ EngineOpenOnFailure .withName(sessionManager.getConf.get(ENGINE_OPEN_ON_FAILURE ))
170+ var attempt = 0
171+ var shouldRetry = true
172+ while (attempt <= maxAttempts && shouldRetry) {
173+ val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
183174
175+ def deregisterEngine (): Unit =
184176 try {
185- val passwd = {
186- if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED )) {
187- InternalSecurityAccessor .get().issueToken()
188- } else {
189- Option (password).filter(_.nonEmpty).getOrElse(" anonymous" )
190- }
177+ engine.deregister(discoveryClient, (host, port))
178+ } catch {
179+ case e : Throwable =>
180+ warn(s " Error on de-registering engine [ ${engine.engineSpace} $host: $port] " , e)
181+ }
182+
183+ try {
184+ val passwd = {
185+ if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED )) {
186+ InternalSecurityAccessor .get().issueToken()
187+ } else {
188+ Option (password).filter(_.nonEmpty).getOrElse(" anonymous" )
191189 }
190+ }
191+ withWriteLockAcquired{
192192 if (_client != null ) _client.closeSession()
193193 _client = KyuubiSyncThriftClient .createClient(user, passwd, host, port, sessionConf)
194194 _engineSessionHandle =
195195 _client.openSession(protocol, user, passwd, openEngineSessionConf)
196196 logSessionInfo(
197197 s " Connected to engine [ $host: $port]/[ ${client.engineId.getOrElse(" " )}] " +
198198 s " with ${_engineSessionHandle}] " )
199- shouldRetry = false
200- } catch {
201- case e : TTransportException
202- if attempt < maxAttempts && e.getCause. isInstanceOf [java.net. ConnectException ] &&
203- e.getCause.getMessage.contains( " Connection refused " ) =>
204- warn(
205- s " Failed to open [ ${engine.defaultEngineName} $host : $port ] after " +
206- s " $attempt / $maxAttempts times, retrying " ,
207- e.getCause)
208- Thread .sleep(retryWait )
209- openOnFailure match {
210- case DEREGISTER_IMMEDIATELY => deregisterEngine()
211- case _ =>
212- }
213- shouldRetry = true
214- case e : Throwable =>
215- error(
216- s " Opening engine [ ${engine.defaultEngineName} $host : $port ] " +
217- s " for $user session failed " ,
218- e)
219- openSessionError = Some ( e)
220- openOnFailure match {
221- case DEREGISTER_IMMEDIATELY | DEREGISTER_AFTER_RETRY => deregisterEngine()
222- case _ =>
223- }
224- throw e
225- } finally {
226- attempt += 1
227- if (shouldRetry && _client != null ) {
228- try {
229- _client.closeSession()
230- } catch {
231- case e : Throwable =>
232- warn(
233- " Error on closing broken client of engine " +
234- s " [ ${ engine.defaultEngineName} $host : $port ] " ,
235- e)
236- }
199+ }
200+ shouldRetry = false
201+ } catch {
202+ case e : TTransportException
203+ if attempt < maxAttempts && e.getCause.isInstanceOf [java.net. ConnectException ] &&
204+ e.getCause.getMessage.contains( " Connection refused " ) =>
205+ warn(
206+ s " Failed to open [ ${engine.defaultEngineName} $host : $port ] after " +
207+ s " $attempt / $maxAttempts times, retrying " ,
208+ e.getCause )
209+ Thread .sleep(retryWait)
210+ openOnFailure match {
211+ case DEREGISTER_IMMEDIATELY => deregisterEngine()
212+ case _ =>
213+ }
214+ shouldRetry = true
215+ case e : Throwable =>
216+ error(
217+ s " Opening engine [ ${engine.defaultEngineName} $host : $port ] " +
218+ s " for $user session failed " ,
219+ e)
220+ openSessionError = Some (e)
221+ openOnFailure match {
222+ case DEREGISTER_IMMEDIATELY | DEREGISTER_AFTER_RETRY => deregisterEngine()
223+ case _ =>
224+ }
225+ throw e
226+ } finally {
227+ attempt += 1
228+ if (shouldRetry && _client != null ) {
229+ try {
230+ withWriteLockAcquired{ _client.closeSession() }
231+ } catch {
232+ case e : Throwable =>
233+ warn(
234+ " Error on closing broken client of engine " +
235+ s " [ ${engine.defaultEngineName} $host : $port ] " ,
236+ e)
237237 }
238238 }
239239 }
240- sessionEvent.openedTime = System .currentTimeMillis()
241- sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
242- _client.engineId.foreach(e => sessionEvent.engineId = e)
243- EventBus .post(sessionEvent)
244240 }
241+ sessionEvent.openedTime = System .currentTimeMillis()
242+ sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
243+ _client.engineId.foreach(e => sessionEvent.engineId = e)
244+ EventBus .post(sessionEvent)
245245 }
246246 }
247247
0 commit comments