37
37
import java .util .ArrayDeque ;
38
38
import java .util .Locale ;
39
39
import java .util .Queue ;
40
+ import java .util .Set ;
40
41
import java .util .concurrent .ConcurrentHashMap ;
41
42
import java .util .concurrent .ConcurrentMap ;
42
43
import java .util .concurrent .ThreadLocalRandom ;
43
44
import javax .security .sasl .SaslException ;
44
45
import org .apache .hadoop .conf .Configuration ;
45
46
import org .apache .hadoop .hbase .DoNotRetryIOException ;
47
+ import org .apache .hadoop .hbase .client .ConnectionUtils ;
46
48
import org .apache .hadoop .hbase .exceptions .ConnectionClosingException ;
47
49
import org .apache .hadoop .hbase .io .ByteArrayOutputStream ;
48
50
import org .apache .hadoop .hbase .ipc .HBaseRpcController .CancellationCallback ;
@@ -352,13 +354,13 @@ private void disposeSasl() {
352
354
}
353
355
}
354
356
355
- private boolean setupSaslConnection (final InputStream in2 , final OutputStream out2 )
356
- throws IOException {
357
+ private boolean setupSaslConnection (final InputStream in2 , final OutputStream out2 ,
358
+ String serverPrincipal ) throws IOException {
357
359
if (this .metrics != null ) {
358
360
this .metrics .incrNsLookups ();
359
361
}
360
362
saslRpcClient = new HBaseSaslRpcClient (this .rpcClient .conf , provider , token ,
361
- socket .getInetAddress (), securityInfo , this .rpcClient .fallbackAllowed ,
363
+ socket .getInetAddress (), serverPrincipal , this .rpcClient .fallbackAllowed ,
362
364
this .rpcClient .conf .get ("hbase.rpc.protection" ,
363
365
QualityOfProtection .AUTHENTICATION .name ().toLowerCase (Locale .ROOT )),
364
366
this .rpcClient .conf .getBoolean (CRYPTO_AES_ENABLED_KEY , CRYPTO_AES_ENABLED_DEFAULT ));
@@ -379,7 +381,8 @@ private boolean setupSaslConnection(final InputStream in2, final OutputStream ou
379
381
* </p>
380
382
*/
381
383
private void handleSaslConnectionFailure (final int currRetries , final int maxRetries ,
382
- final Exception ex , final UserGroupInformation user ) throws IOException , InterruptedException {
384
+ final Exception ex , final UserGroupInformation user , final String serverPrincipal )
385
+ throws IOException , InterruptedException {
383
386
closeSocket ();
384
387
user .doAs (new PrivilegedExceptionAction <Object >() {
385
388
@ Override
@@ -419,25 +422,75 @@ public Object run() throws IOException, InterruptedException {
419
422
Thread .sleep (ThreadLocalRandom .current ().nextInt (reloginMaxBackoff ) + 1 );
420
423
return null ;
421
424
} else {
422
- String msg =
423
- "Failed to initiate connection for " + UserGroupInformation .getLoginUser ().getUserName ()
424
- + " to " + securityInfo .getServerPrincipal ();
425
+ String msg = "Failed to initiate connection for "
426
+ + UserGroupInformation .getLoginUser ().getUserName () + " to " + serverPrincipal ;
425
427
throw new IOException (msg , ex );
426
428
}
427
429
}
428
430
});
429
431
}
430
432
431
- private void getConnectionRegistry (OutputStream outStream ) throws IOException {
433
+ private void getConnectionRegistry (InputStream inStream , OutputStream outStream ,
434
+ Call connectionRegistryCall ) throws IOException {
432
435
outStream .write (RpcClient .REGISTRY_PREAMBLE_HEADER );
436
+ readResponse (new DataInputStream (inStream ), calls , connectionRegistryCall , remoteExc -> {
437
+ synchronized (this ) {
438
+ closeConn (remoteExc );
439
+ }
440
+ });
433
441
}
434
442
435
443
private void createStreams (InputStream inStream , OutputStream outStream ) {
436
444
this .in = new DataInputStream (new BufferedInputStream (inStream ));
437
445
this .out = new DataOutputStream (new BufferedOutputStream (outStream ));
438
446
}
439
447
440
- private void setupIOstreams () throws IOException {
448
+ // choose the server principal to use
449
+ private String chooseServerPrincipal (InputStream inStream , OutputStream outStream )
450
+ throws IOException {
451
+ Set <String > serverPrincipals = getServerPrincipals ();
452
+ if (serverPrincipals .size () == 1 ) {
453
+ return serverPrincipals .iterator ().next ();
454
+ }
455
+ // this means we use kerberos authentication and there are multiple server principal candidates,
456
+ // in this way we need to send a special preamble header to get server principal from server
457
+ Call securityPreambleCall = createSecurityPreambleCall (r -> {
458
+ });
459
+ outStream .write (RpcClient .SECURITY_PREAMBLE_HEADER );
460
+ readResponse (new DataInputStream (inStream ), calls , securityPreambleCall , remoteExc -> {
461
+ synchronized (this ) {
462
+ closeConn (remoteExc );
463
+ }
464
+ });
465
+ if (securityPreambleCall .error != null ) {
466
+ LOG .debug ("Error when trying to do a security preamble call to {}" , remoteId .address ,
467
+ securityPreambleCall .error );
468
+ if (ConnectionUtils .isUnexpectedPreambleHeaderException (securityPreambleCall .error )) {
469
+ // this means we are connecting to an old server which does not support the security
470
+ // preamble call, so we should fallback to randomly select a principal to use
471
+ // TODO: find a way to reconnect without failing all the pending calls, for now, when we
472
+ // reach here, shutdown should have already been scheduled
473
+ throw securityPreambleCall .error ;
474
+ }
475
+ if (IPCUtil .isSecurityNotEnabledException (securityPreambleCall .error )) {
476
+ // server tells us security is not enabled, then we should check whether fallback to
477
+ // simple is allowed, if so we just go without security, otherwise we should fail the
478
+ // negotiation immediately
479
+ if (rpcClient .fallbackAllowed ) {
480
+ // TODO: just change the preamble and skip the fallback to simple logic, for now, just
481
+ // select the first principal can finish the connection setup, but waste one client
482
+ // message
483
+ return serverPrincipals .iterator ().next ();
484
+ } else {
485
+ throw new FallbackDisallowedException ();
486
+ }
487
+ }
488
+ return randomSelect (serverPrincipals );
489
+ }
490
+ return chooseServerPrincipal (serverPrincipals , securityPreambleCall );
491
+ }
492
+
493
+ private void setupIOstreams (Call connectionRegistryCall ) throws IOException {
441
494
if (socket != null ) {
442
495
// The connection is already available. Perfect.
443
496
return ;
@@ -465,32 +518,37 @@ private void setupIOstreams() throws IOException {
465
518
// This creates a socket with a write timeout. This timeout cannot be changed.
466
519
OutputStream outStream = NetUtils .getOutputStream (socket , this .rpcClient .writeTO );
467
520
if (connectionRegistryCall != null ) {
468
- getConnectionRegistry (outStream );
469
- createStreams ( inStream , outStream );
470
- break ;
521
+ getConnectionRegistry (inStream , outStream , connectionRegistryCall );
522
+ closeSocket ( );
523
+ return ;
471
524
}
472
- // Write out the preamble -- MAGIC, version, and auth to use.
473
- writeConnectionHeaderPreamble (outStream );
525
+
474
526
if (useSasl ) {
475
- final InputStream in2 = inStream ;
476
- final OutputStream out2 = outStream ;
477
527
UserGroupInformation ticket = provider .getRealUser (remoteId .ticket );
478
528
boolean continueSasl ;
479
529
if (ticket == null ) {
480
530
throw new FatalConnectionException ("ticket/user is null" );
481
531
}
532
+ String serverPrincipal = chooseServerPrincipal (inStream , outStream );
533
+ // Write out the preamble -- MAGIC, version, and auth to use.
534
+ writeConnectionHeaderPreamble (outStream );
482
535
try {
536
+ final InputStream in2 = inStream ;
537
+ final OutputStream out2 = outStream ;
483
538
continueSasl = ticket .doAs (new PrivilegedExceptionAction <Boolean >() {
484
539
@ Override
485
540
public Boolean run () throws IOException {
486
- return setupSaslConnection (in2 , out2 );
541
+ return setupSaslConnection (in2 , out2 , serverPrincipal );
487
542
}
488
543
});
489
544
} catch (Exception ex ) {
490
545
ExceptionUtil .rethrowIfInterrupt (ex );
491
- handleSaslConnectionFailure (numRetries ++, reloginMaxRetries , ex , ticket );
546
+ saslNegotiationDone (serverPrincipal , false );
547
+ handleSaslConnectionFailure (numRetries ++, reloginMaxRetries , ex , ticket ,
548
+ serverPrincipal );
492
549
continue ;
493
550
}
551
+ saslNegotiationDone (serverPrincipal , true );
494
552
if (continueSasl ) {
495
553
// Sasl connect is successful. Let's set up Sasl i/o streams.
496
554
inStream = saslRpcClient .getInputStream ();
@@ -501,6 +559,9 @@ public Boolean run() throws IOException {
501
559
// reconnecting because regionserver may change its sasl config after restart.
502
560
saslRpcClient = null ;
503
561
}
562
+ } else {
563
+ // Write out the preamble -- MAGIC, version, and auth to use.
564
+ writeConnectionHeaderPreamble (outStream );
504
565
}
505
566
createStreams (inStream , outStream );
506
567
// Now write out the connection header
@@ -618,9 +679,10 @@ private void writeRequest(Call call) throws IOException {
618
679
}
619
680
RequestHeader requestHeader = buildRequestHeader (call , cellBlockMeta );
620
681
if (call .isConnectionRegistryCall ()) {
621
- connectionRegistryCall = call ;
682
+ setupIOstreams (call );
683
+ return ;
622
684
}
623
- setupIOstreams ();
685
+ setupIOstreams (null );
624
686
625
687
// Now we're going to write the call. We take the lock, then check that the connection
626
688
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
@@ -655,7 +717,7 @@ private void writeRequest(Call call) throws IOException {
655
717
*/
656
718
private void readResponse () {
657
719
try {
658
- readResponse (in , calls , remoteExc -> {
720
+ readResponse (in , calls , null , remoteExc -> {
659
721
synchronized (this ) {
660
722
closeConn (remoteExc );
661
723
}
0 commit comments