Skip to content

Commit

Permalink
Implementation of Claims based authentication (Azure#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
yvgopal authored and jtaubensee committed May 2, 2017
1 parent e2a038c commit 21a0f7a
Show file tree
Hide file tree
Showing 19 changed files with 525 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

import com.microsoft.azure.servicebus.primitives.ExceptionUtil;
import com.microsoft.azure.servicebus.primitives.MessageLockLostException;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.OperationCancelledException;
Expand Down Expand Up @@ -106,7 +107,7 @@ private void receiveAndPumpMessage()
receiveMessageFuture.handleAsync((message, receiveEx) -> {
if(receiveEx != null)
{
receiveEx = Utils.extractAsyncCompletionCause(receiveEx);
receiveEx = ExceptionUtil.extractAsyncCompletionCause(receiveEx);
this.notifyExceptionToMessageHandler(receiveEx, ExceptionPhase.RECEIVE);
this.receiveAndPumpMessage();
}
Expand Down Expand Up @@ -145,7 +146,7 @@ private void receiveAndPumpMessage()
onMessageFuture.handleAsync((v, onMessageEx) -> {
if(onMessageEx != null)
{
onMessageEx = Utils.extractAsyncCompletionCause(onMessageEx);
onMessageEx = ExceptionUtil.extractAsyncCompletionCause(onMessageEx);
this.notifyExceptionToMessageHandler(onMessageEx, ExceptionPhase.USERCALLBACK);
}
if(this.innerReceiver.getReceiveMode() == ReceiveMode.PeekLock)
Expand Down Expand Up @@ -179,7 +180,7 @@ private void receiveAndPumpMessage()
updateDispositionFuture.handleAsync((u, updateDispositionEx) -> {
if(updateDispositionEx != null)
{
updateDispositionEx = Utils.extractAsyncCompletionCause(updateDispositionEx);
updateDispositionEx = ExceptionUtil.extractAsyncCompletionCause(updateDispositionEx);
this.notifyExceptionToMessageHandler(updateDispositionEx, dispositionPhase);
}
this.receiveAndPumpMessage();
Expand Down Expand Up @@ -210,7 +211,7 @@ private void acceptSessionsAndPumpMessage()
acceptSessionFuture.handleAsync((session, acceptSessionEx) -> {
if(acceptSessionEx != null)
{
acceptSessionEx = Utils.extractAsyncCompletionCause(acceptSessionEx);
acceptSessionEx = ExceptionUtil.extractAsyncCompletionCause(acceptSessionEx);

if(!(acceptSessionEx instanceof TimeoutException))
{
Expand Down Expand Up @@ -253,7 +254,7 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker)
receiverFuture.handleAsync((message, receiveEx) -> {
if(receiveEx != null)
{
receiveEx = Utils.extractAsyncCompletionCause(receiveEx);
receiveEx = ExceptionUtil.extractAsyncCompletionCause(receiveEx);
this.notifyExceptionToSessionHandler(receiveEx, ExceptionPhase.RECEIVE);
sessionTracker.shouldRetryOnNoMessageOrException().thenAcceptAsync((shouldRetry) -> {
if(shouldRetry)
Expand Down Expand Up @@ -296,7 +297,7 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker)
renewCancelTimer.cancel(true);
if(onMessageEx != null)
{
onMessageEx = Utils.extractAsyncCompletionCause(onMessageEx);
onMessageEx = ExceptionUtil.extractAsyncCompletionCause(onMessageEx);
this.notifyExceptionToSessionHandler(onMessageEx, ExceptionPhase.USERCALLBACK);
}
if(this.receiveMode == ReceiveMode.PeekLock)
Expand Down Expand Up @@ -326,7 +327,7 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker)
updateDispositionFuture.handleAsync((u, updateDispositionEx) -> {
if(updateDispositionEx != null)
{
updateDispositionEx = Utils.extractAsyncCompletionCause(updateDispositionEx);
updateDispositionEx = ExceptionUtil.extractAsyncCompletionCause(updateDispositionEx);
this.notifyExceptionToSessionHandler(updateDispositionEx, dispositionPhase);
}
this.receiveFromSessionAndPumpMessage(sessionTracker);
Expand Down Expand Up @@ -429,7 +430,7 @@ synchronized CompletableFuture<Boolean> shouldRetryOnNoMessageOrException()
renewCancelTimer.cancel(true);
if(onCloseEx != null)
{
onCloseEx = Utils.extractAsyncCompletionCause(onCloseEx);
onCloseEx = ExceptionUtil.extractAsyncCompletionCause(onCloseEx);
this.messageAndSessionPump.notifyExceptionToSessionHandler(onCloseEx, ExceptionPhase.USERCALLBACK);
}

Expand All @@ -438,7 +439,7 @@ synchronized CompletableFuture<Boolean> shouldRetryOnNoMessageOrException()
{
if(closeEx != null)
{
closeEx = Utils.extractAsyncCompletionCause(closeEx);
closeEx = ExceptionUtil.extractAsyncCompletionCause(closeEx);
this.messageAndSessionPump.notifyExceptionToSessionHandler(closeEx, ExceptionPhase.SESSIONCLOSE);
}

Expand Down Expand Up @@ -540,7 +541,7 @@ protected void loop()
{
if(renewLockEx != null)
{
renewLockEx = Utils.extractAsyncCompletionCause(renewLockEx);
renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx);
this.messageAndSessionPump.notifyExceptionToMessageHandler(renewLockEx, ExceptionPhase.RENEWMESSAGELOCK);
if(!(renewLockEx instanceof MessageLockLostException || renewLockEx instanceof OperationCancelledException))
{
Expand Down Expand Up @@ -605,7 +606,7 @@ protected void loop()
{
if(renewLockEx != null)
{
renewLockEx = Utils.extractAsyncCompletionCause(renewLockEx);
renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx);
System.out.println(this.session.getSessionId() + "-" + renewLockEx.getMessage() + ":" + Instant.now());
this.messageAndSessionPump.notifyExceptionToSessionHandler(renewLockEx, ExceptionPhase.RENEWSESSIONLOCK);
if(!(renewLockEx instanceof SessionLockLostException || renewLockEx instanceof OperationCancelledException))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.microsoft.azure.servicebus;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;

import com.microsoft.azure.servicebus.primitives.ServiceBusException;
Expand Down Expand Up @@ -38,16 +37,4 @@ static void assertNonNull(String argumentName, Object argument)
if(argument == null)
throw new IllegalArgumentException("Argument '" + argumentName +"' is null.");
}

static Throwable extractAsyncCompletionCause(Throwable completionEx)
{
if(completionEx instanceof CompletionException || completionEx instanceof ExecutionException)
{
return completionEx.getCause();
}
else
{
return completionEx;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ private AmqpConstants() { }

public static final int MAX_FRAME_SIZE = 65536;

public static final String MANAGEMENT_ADDRESS_SEGMENT = "/$management";
public static final String MANAGEMENT_NODE_ADDRESS_SEGMENT = "$management";
public static final String CBS_NODE_ADDRESS_SEGMENT = "$cbs";

public static final Symbol PRODUCT = Symbol.valueOf("product");
public static final Symbol VERSION = Symbol.valueOf("version");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,11 @@ public final class ConnectionHandler extends BaseHandler
{

private static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.SERVICEBUS_CLIENT_TRACE);

private final String username;
private final String password;
private final IAmqpConnection messagingFactory;

public ConnectionHandler(final IAmqpConnection messagingFactory, final String username, final String password)
public ConnectionHandler(final IAmqpConnection messagingFactory)
{
add(new Handshaker());

this.username = username;
this.password = password;
this.messagingFactory = messagingFactory;
}

Expand Down Expand Up @@ -70,7 +64,7 @@ public void onConnectionBound(Event event)
transport.ssl(domain);

Sasl sasl = transport.sasl();
sasl.plain(this.username, this.password);
sasl.setMechanisms("ANONYMOUS");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ private ClientConstants() { }
public static final String REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION = AmqpConstants.VENDOR + ":get-message-sessions";
public static final String REQUEST_RESPONSE_ADD_RULE_OPERATION = AmqpConstants.VENDOR + ":add-rule";
public static final String REQUEST_RESPONSE_REMOVE_RULE_OPERATION = AmqpConstants.VENDOR + ":remove-rule";
public static final String REQUEST_RESPONSE_PUT_TOKEN_OPERATION = "put-token";
public static final String REQUEST_RESPONSE_PUT_TOKEN_TYPE = "type";
public static final String REQUEST_RESPONSE_PUT_TOKEN_AUDIENCE = "name";
public static final String REQUEST_RESPONSE_LOCKTOKENS = "lock-tokens";
public static final String REQUEST_RESPONSE_LOCKTOKEN = "lock-token";
public static final String REQUEST_RESPONSE_EXPIRATION = "expiration";
Expand All @@ -109,6 +112,10 @@ private ClientConstants() { }
public static final String REQUEST_RESPONSE_STATUS_CODE = "statusCode";
public static final String REQUEST_RESPONSE_STATUS_DESCRIPTION = "statusDescription";
public static final String REQUEST_RESPONSE_ERROR_CONDITION = "errorCondition";
// Legacy property names are used in CBS responses
public static final String REQUEST_RESPONSE_LEGACY_STATUS_CODE = "status-code";
public static final String REQUEST_RESPONSE_LEGACY_STATUS_DESCRIPTION = "status-description";
public static final String REQUEST_RESPONSE_LEGACY_ERROR_CONDITION = "error-condition";
public static final String REQUEST_RESPONSE_DISPOSITION_STATUS = "disposition-status";
public static final String REQUEST_RESPONSE_DEADLETTER_REASON = "deadletter-reason";
public static final String REQUEST_RESPONSE_DEADLETTER_DESCRIPTION = "deadletter-description";
Expand Down Expand Up @@ -140,11 +147,16 @@ private ClientConstants() { }
// public static final String DISPOSITION_STATUS_UNLOCKED = "unlocked";

public static final int REQUEST_RESPONSE_OK_STATUS_CODE = 200;
public static final int REQUEST_RESPONSE_ACCEPTED_STATUS_CODE = 0xca;
public static final int REQUEST_RESPONSE_NOCONTENT_STATUS_CODE = 0xcc;
public static final int REQUEST_RESPONSE_NOTFOUND_STATUS_CODE = 0x194;
public static final int REQUEST_RESPONSE_UNDEFINED_STATUS_CODE = -1;
public static final int REQUEST_RESPONSE_SERVER_BUSY_STATUS_CODE = 0x1f7;

static final String SAS_TOKEN_TYPE = "servicebus.windows.net:sastoken";
static final int DEFAULT_SAS_TOKEN_VALIDITY_IN_SECONDS = 20*60; // 20 minutes
static final String SAS_TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";

private static String getPlatformInfo() {
final Package javaRuntimeClassPkg = Runtime.class.getPackage();
final StringBuilder patformInfo = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.message.Message;

final class MessageBrowserUtil {
public static CompletableFuture<Collection<Message>> peekMessagesAsync(RequestResponseLink requestResponseLink, Duration operationTimeout, long fromSequenceNumber, int messageCount, String sessionId)
final class CommonRequestResponseOperations {
static CompletableFuture<Collection<Message>> peekMessagesAsync(RequestResponseLink requestResponseLink, Duration operationTimeout, long fromSequenceNumber, int messageCount, String sessionId)
{
HashMap requestBodyMap = new HashMap();
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_FROM_SEQUENCE_NUMER, fromSequenceNumber);
Expand All @@ -22,7 +22,7 @@ public static CompletableFuture<Collection<Message>> peekMessagesAsync(RequestRe
{
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, sessionId);
}
Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_PEEK_OPERATION, requestBodyMap, Util.adjustServerTimeout(operationTimeout));
Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_PEEK_OPERATION, requestBodyMap, Util.adjustServerTimeout(operationTimeout));
CompletableFuture<Message> responseFuture = requestResponseLink.requestAysnc(requestMessage, operationTimeout);
return responseFuture.thenComposeAsync((responseMessage) -> {
CompletableFuture<Collection<Message>> returningFuture = new CompletableFuture<Collection<Message>>();
Expand Down Expand Up @@ -63,4 +63,26 @@ else if(statusCode == ClientConstants.REQUEST_RESPONSE_NOCONTENT_STATUS_CODE ||
return returningFuture;
});
}

static CompletableFuture<Void> sendCBSTokenAsync(RequestResponseLink requestResponseLink, Duration operationTimeout, String token, String tokenType, String tokenAudience)
{
Message requestMessage = RequestResponseUtils.createRequestMessageFromValueBody(ClientConstants.REQUEST_RESPONSE_PUT_TOKEN_OPERATION, token, Util.adjustServerTimeout(operationTimeout));
requestMessage.getApplicationProperties().getValue().put(ClientConstants.REQUEST_RESPONSE_PUT_TOKEN_TYPE, tokenType);
requestMessage.getApplicationProperties().getValue().put(ClientConstants.REQUEST_RESPONSE_PUT_TOKEN_AUDIENCE, tokenAudience);
CompletableFuture<Message> responseFuture = requestResponseLink.requestAysnc(requestMessage, operationTimeout);
return responseFuture.thenComposeAsync((responseMessage) -> {
CompletableFuture<Void> returningFuture = new CompletableFuture<Void>();
int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE || statusCode == ClientConstants.REQUEST_RESPONSE_ACCEPTED_STATUS_CODE)
{
returningFuture.complete(null);
}
else
{
// error response
returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage));
}
return returningFuture;
});
}
}
Loading

0 comments on commit 21a0f7a

Please sign in to comment.