Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ public SimpleMessageQueue Connect(string queueURL, out GXBaseCollection<SdtMessa

{
MessageQueueProvider messageQueueProvider = new MessageQueueProvider();
GXProperties properties = new GXProperties();
properties.Add("QUEUE_AWSSQS_QUEUE_URL", queueURL);
GXProperties properties = new GXProperties
{
{ "QUEUE_AWSSQS_QUEUE_URL", queueURL }
};
SimpleMessageQueue simpleMessageQueue = messageQueueProvider.Connect(AWS_SQS, properties, out GXBaseCollection<SdtMessages_Message> errorMessagesConnect, out bool successConnect);
errorMessages = errorMessagesConnect;
success = successConnect;
Expand All @@ -31,10 +33,12 @@ public SimpleMessageQueue Connect(string queueURL, out GXBaseCollection<SdtMessa

public GXProperties TransformAWSCredentials(GxUserType awsCredentials)
{
GXProperties properties = new GXProperties();
properties.Add("QUEUE_AWSSQS_ACCESS_KEY", awsCredentials.GetPropertyValue<string>("Accesskey"));
properties.Add("QUEUE_AWSSQS_SECRET_KEY", awsCredentials.GetPropertyValue<string>("Secretkey"));
properties.Add("QUEUE_AWSSQS_REGION", awsCredentials.GetPropertyValue<string>("Region"));
GXProperties properties = new GXProperties
{
{ "QUEUE_AWSSQS_ACCESS_KEY", awsCredentials.GetPropertyValue<string>("Accesskey") },
{ "QUEUE_AWSSQS_SECRET_KEY", awsCredentials.GetPropertyValue<string>("Secretkey") },
{ "QUEUE_AWSSQS_REGION", awsCredentials.GetPropertyValue<string>("Region") }
};
return properties;
}
}
Expand Down
41 changes: 22 additions & 19 deletions dotnet/src/dotnetcore/Providers/Messaging/GXAmazonSQS/AWSQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace GeneXus.Messaging.Queue
public class AWSQueue : QueueBase, IQueue
{

public static String Name = "AWSSQS";
public static string Name = "AWSSQS";
const string ACCESS_KEY = "ACCESS_KEY";
const string SECRET_ACCESS_KEY = "SECRET_KEY";
const string REGION = "REGION";
Expand Down Expand Up @@ -313,13 +313,14 @@ private MessageQueueResult SetupMessageQueueResult(SendMessageResponse response)
messageQueueResult.MessageId = response.MessageId;
messageQueueResult.MessageStatus = MessageQueueResultStatus.Sent;

messageQueueResult.MessageAttributes = new GXProperties();

messageQueueResult.MessageAttributes.Add("MD5OfMessageSystemAttributes", response.MD5OfMessageSystemAttributes);
messageQueueResult.MessageAttributes.Add("MD5OfMessageAttributes", response.MD5OfMessageAttributes);
messageQueueResult.MessageAttributes.Add("ContentLength", response.ContentLength.ToString());
messageQueueResult.MessageAttributes.Add("MD5OfMessageBody", response.MD5OfMessageBody);
messageQueueResult.MessageAttributes.Add("SequenceNumber", response.SequenceNumber);
messageQueueResult.MessageAttributes = new GXProperties
{
{ "MD5OfMessageSystemAttributes", response.MD5OfMessageSystemAttributes },
{ "MD5OfMessageAttributes", response.MD5OfMessageAttributes },
{ "ContentLength", response.ContentLength.ToString() },
{ "MD5OfMessageBody", response.MD5OfMessageBody },
{ "SequenceNumber", response.SequenceNumber }
};

Type t = response.ResponseMetadata.GetType();
PropertyInfo[] props = t.GetProperties();
Expand All @@ -342,12 +343,13 @@ private MessageQueueResult SetupMessageQueueResult(SendMessageBatchResultEntry r
messageQueueResult.MessageId = response.Id;
messageQueueResult.MessageStatus = MessageQueueResultStatus.Sent;

messageQueueResult.MessageAttributes = new GXProperties();

messageQueueResult.MessageAttributes.Add("MD5OfMessageSystemAttributes", response.MD5OfMessageSystemAttributes);
messageQueueResult.MessageAttributes.Add("MD5OfMessageAttributes", response.MD5OfMessageAttributes);
messageQueueResult.MessageAttributes.Add("MD5OfMessageBody", response.MD5OfMessageBody);
messageQueueResult.MessageAttributes.Add("SequenceNumber", response.SequenceNumber);
messageQueueResult.MessageAttributes = new GXProperties
{
{ "MD5OfMessageSystemAttributes", response.MD5OfMessageSystemAttributes },
{ "MD5OfMessageAttributes", response.MD5OfMessageAttributes },
{ "MD5OfMessageBody", response.MD5OfMessageBody },
{ "SequenceNumber", response.SequenceNumber }
};
return messageQueueResult;
}

Expand Down Expand Up @@ -378,11 +380,12 @@ private SimpleQueueMessage SetupSimpleQueueMessage(Message response)
simpleQueueMessage.MessageBody = response.Body;
simpleQueueMessage.MessageHandleId = response.ReceiptHandle;

simpleQueueMessage.MessageAttributes = new GXProperties();

simpleQueueMessage.MessageAttributes.Add("MD5OfMessageAttributes", response.MD5OfMessageAttributes);
simpleQueueMessage.MessageAttributes.Add("MD5OfBody", response.MD5OfBody);

simpleQueueMessage.MessageAttributes = new GXProperties
{
{ "MD5OfMessageAttributes", response.MD5OfMessageAttributes },
{ "MD5OfBody", response.MD5OfBody }
};

foreach (var messageAttribute in response.MessageAttributes)
{
MessageAttributeValue messageAttributeValue = messageAttribute.Value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ private void Initialize(GXService providerService)
else
//Try authenticating using AD
{
ChainedTokenCredential credential = new ChainedTokenCredential(new ManagedIdentityCredential(), new ManagedIdentityCredential(Environment.GetEnvironmentVariable("AZURE_CLIENT_ID")), new EnvironmentCredential(), new AzureCliCredential());
GXLogging.Debug(logger,"Authentication using Oauth 2.0.");
_client = new EventGridPublisherClient(
new Uri(_endpoint),
new DefaultAzureCredential());
credential);
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,35 @@

namespace GeneXus.Messaging.Queue
{
/// <summary>
/// Implementation of AzureQueue.MessageQueueProvider external object.
/// </summary>
///
public class AzureMessageQueueProvider
{
private const string AZUREQUEUE = "AZUREQUEUE";
public SimpleMessageQueue Connect(string queueName, string queueURL, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)
public SimpleMessageQueue Connect(string queueName, string connectionString, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)
{
MessageQueueProvider messageQueueProvider = new MessageQueueProvider();
GXProperties properties = new GXProperties();
properties.Add("QUEUE_AZUREQUEUE_QUEUENAME", queueName);
properties.Add("QUEUE_AZUREQUEUE_CONNECTIONSTRING", queueURL);
SimpleMessageQueue simpleMessageQueue = messageQueueProvider.Connect(AZUREQUEUE, properties, out GXBaseCollection<SdtMessages_Message> errorMessagesConnect, out bool successConnect);
GXProperties properties = new GXProperties
{
{ PropertyConstants.QUEUE_AZUREQUEUE_QUEUENAME, queueName },
{ PropertyConstants.QUEUE_AZUREQUEUE_CONNECTIONSTRING, connectionString },
{ PropertyConstants.AUTHENTICATION_METHOD, AuthenticationMethod.Password.ToString()}
};
SimpleMessageQueue simpleMessageQueue = messageQueueProvider.Connect(PropertyConstants.AZURE_QUEUE_PROVIDERTYPENAME, properties, out GXBaseCollection<SdtMessages_Message> errorMessagesConnect, out bool successConnect);
errorMessages = errorMessagesConnect;
success = successConnect;
return simpleMessageQueue;
}
public SimpleMessageQueue Authenticate(string queueURI, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)
{
MessageQueueProvider messageQueueProvider = new MessageQueueProvider();
GXProperties properties = new GXProperties
{
{ PropertyConstants.QUEUE_AZUREQUEUE_QUEUEURI, queueURI },
{ PropertyConstants.AUTHENTICATION_METHOD, AuthenticationMethod.ActiveDirectory.ToString()}
};
SimpleMessageQueue simpleMessageQueue = messageQueueProvider.Connect(PropertyConstants.AZURE_QUEUE_PROVIDERTYPENAME, properties, out GXBaseCollection<SdtMessages_Message> errorMessagesConnect, out bool successConnect);
errorMessages = errorMessagesConnect;
success = successConnect;
return simpleMessageQueue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using Azure.Identity;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using GeneXus.Messaging.Common;
using GeneXus.Services;
using GeneXus.Utils;
using log4net;

namespace GeneXus.Messaging.Queue
{
public class AzureQueue : QueueBase, IQueue
{
public static string Name = "AZUREQUEUE";

public static String Name = "AZUREQUEUE";
const string QUEUE_NAME = "QUEUENAME";
const string QUEUE_CONNECTION_STRING = "CONNECTIONSTRING";

static readonly ILog logger = LogManager.GetLogger(typeof(AzureQueue));
QueueClient _queueClient { get; set; }
private string _queueName { get; set; }
private string _connectionString { get; set; }
private string _queueURI { get; set; }

public AzureQueue() : this(null)
{
}
{}

public AzureQueue(GXService providerService) : base(providerService)
{
Expand All @@ -31,16 +31,28 @@ public AzureQueue(GXService providerService) : base(providerService)

private void Initialize(GXService providerService)
{
ServiceSettings serviceSettings = new("QUEUE", Name, providerService);
_queueName = serviceSettings.GetEncryptedPropertyValue(QUEUE_NAME);
_connectionString = serviceSettings.GetEncryptedPropertyValue(QUEUE_CONNECTION_STRING);
ServiceSettings serviceSettings = new(PropertyConstants.QUEUE_SERVICE_NAME, Name, providerService);
_queueName = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.QUEUENAME);
_connectionString = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.CONNECTIONSTRING);
_queueURI = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.QUEUEURI);
string authenticationMethod = serviceSettings.GetPropertiesValue(PropertyConstants.AUTHENTICATION_METHOD);

QueueClientOptions queueClientOptions = new QueueClientOptions()
{
MessageEncoding = QueueMessageEncoding.Base64
};

_queueClient = new QueueClient(_connectionString, _queueName, queueClientOptions);
if (authenticationMethod.Equals(AuthenticationMethod.ActiveDirectory.ToString()))
{
ChainedTokenCredential credential = new ChainedTokenCredential(new ManagedIdentityCredential(), new ManagedIdentityCredential(Environment.GetEnvironmentVariable("AZURE_CLIENT_ID")), new EnvironmentCredential(), new AzureCliCredential());
_queueClient = new QueueClient(new Uri(_queueURI), credential,queueClientOptions);
GXLogging.Debug(logger, "Authenticate to Azure Storage Queue using Active Directory authentication.");
}
else
{
_queueClient = new QueueClient(_connectionString, _queueName, queueClientOptions);
GXLogging.Debug(logger, "Authenticate to Azure Storage Queue using Access Keys.");
}
}

QueueClient QueueClient
Expand All @@ -59,11 +71,6 @@ public AzureQueue(string connectionString, string queueName)
_connectionString = connectionString;
}

//public AzureQueue(Uri uri, TokenCredential tokenCredential)
//{
//_queueClient = new QueueClient(uri, tokenCredential);
//}

public bool GetMessageFromException(Exception ex, SdtMessages_Message msg)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.9.0" />
<PackageReference Include="Azure.Storage.Queues" Version="12.15.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,27 @@
using System.Reflection;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.Messaging.ServiceBus;
using GeneXus.Messaging.Common;
using GeneXus.Services;
using GeneXus.Utils;
using log4net;

namespace GeneXus.Messaging.GXAzureServiceBus
{
public class AzureServiceBus : MessageBrokerBase, IMessageBroker
{
private const int MAX_MESSAGES_DEFAULT = 10;
private const short LOCK_DURATION = 5;
public static String Name = "AZURESB";
public static string Name = "AZURESB";
static readonly ILog logger = LogManager.GetLogger(typeof(AzureServiceBus));

private ConcurrentDictionary<string, Tuple<DateTime, ServiceBusReceivedMessage>> m_messages = new ConcurrentDictionary<string, Tuple<DateTime, ServiceBusReceivedMessage>>();
ServiceBusClient _serviceBusClient { get; set; }
private string _queueOrTopicName { get; set; }
private string _connectionString { get; set; }
private string _fullyqualifiedNamespace { get; set; }
private string _subscriptionName { get; set; }
private ServiceBusSender _sender { get; set; }
private ServiceBusReceiver _receiver { get; set; }
Expand All @@ -41,15 +45,17 @@ private void Initialize(GXService providerService)
_queueOrTopicName = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.QUEUE_NAME);
_connectionString = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.QUEUE_CONNECTION_STRING);
_subscriptionName = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.TOPIC_SUBSCRIPTION);
_fullyqualifiedNamespace = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.FULLYQUALIFIEDNAMESPACE);
string authenticationMethod = serviceSettings.GetPropertiesValue(PropertyConstants.AUTHENTICATION_METHOD);

string sessionEnabled = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.SESSION_ENABLED);
string sessionEnabled = serviceSettings.GetPropertiesValue(PropertyConstants.SESSION_ENABLED);

if (!string.IsNullOrEmpty(sessionEnabled))
_sessionEnabled = Convert.ToBoolean(sessionEnabled);
else
_sessionEnabled = false;

string senderIdentifier = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.SENDER_IDENTIFIER);
string senderIdentifier = serviceSettings.GetPropertiesValue(PropertyConstants.SENDER_IDENTIFIER);

ServiceBusSenderOptions serviceBusSenderOptions = new ServiceBusSenderOptions();
if (!string.IsNullOrEmpty(senderIdentifier))
Expand All @@ -58,17 +64,28 @@ private void Initialize(GXService providerService)
//TO DO Consider connection options here
//https://docs.microsoft.com/en-us/javascript/api/@azure/service-bus/servicebusclientoptions?view=azure-node-latest#@azure-service-bus-servicebusclientoptions-websocketoptions

_serviceBusClient = new ServiceBusClient(_connectionString);
if (authenticationMethod.Equals(AuthenticationMethod.ActiveDirectory.ToString()))
{
ChainedTokenCredential credential = new ChainedTokenCredential(new ManagedIdentityCredential(), new ManagedIdentityCredential(Environment.GetEnvironmentVariable("AZURE_CLIENT_ID")), new EnvironmentCredential(), new AzureCliCredential());
_serviceBusClient = new ServiceBusClient(_fullyqualifiedNamespace, credential);
GXLogging.Debug(logger, "Authenticate to Azure Service Bus using Active Directory authentication.");
}
else
{
_serviceBusClient = new ServiceBusClient(_connectionString);
GXLogging.Debug(logger, "Authenticate to Azure Service Bus using SAS authentication.");
}

if (_serviceBusClient != null)
{
_sender = _serviceBusClient.CreateSender(_queueOrTopicName, serviceBusSenderOptions);
if (!_sessionEnabled && _sender != null)
{
_serviceBusReceiverOptions = new ServiceBusReceiverOptions();

string receiveMode = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.RECEIVE_MODE);
string prefetchCount = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.PREFETCH_COUNT);
string receiverIdentifier = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.RECEIVER_IDENTIFIER);
string receiveMode = serviceSettings.GetPropertiesValue(PropertyConstants.RECEIVE_MODE);
string prefetchCount = serviceSettings.GetPropertiesValue(PropertyConstants.PREFETCH_COUNT);
string receiverIdentifier = serviceSettings.GetPropertiesValue(PropertyConstants.RECEIVER_IDENTIFIER);

if (!string.IsNullOrEmpty(receiveMode))
_serviceBusReceiverOptions.ReceiveMode = (ServiceBusReceiveMode)Convert.ToInt16(receiveMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.9.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.15.0" />
</ItemGroup>

Expand Down
Loading