Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement storage events #46648

Merged
merged 7 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@
# ServiceOwners: @sunilagarwal @lfittl-msft @sr-msft @niklarin

# PRLabel: %Provisioning
/sdk/provisioning/ @JoshLove-msft @tg-msft
/sdk/provisioning/ @JoshLove-msft @tg-msft @christothes @KrzysztofCwalina

# ServiceLabel: %Provisioning
# AzureSdkOwners: @JoshLove-msft
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,33 @@ public readonly partial struct MessagingServices
public void SendMessage(object serializable) { }
public void WhenMessageReceived(System.Action<string> received) { }
}
public partial class StorageFile
{
internal StorageFile() { }
public System.Threading.CancellationToken CancellationToken { get { throw null; } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public System.Type GetType { get { throw null; } }
public string Path { get { throw null; } }
public string RequestId { get { throw null; } }
public void Delete() { }
public System.BinaryData Download() { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override int GetHashCode() { throw null; }
public static implicit operator Azure.Response (Azure.CloudMachine.StorageFile result) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
}
[System.Runtime.InteropServices.StructLayoutAttribute(System.Runtime.InteropServices.LayoutKind.Sequential)]
public readonly partial struct StorageServices
{
private readonly object _dummy;
private readonly int _dummyPrimitive;
public System.BinaryData DownloadBlob(string name) { throw null; }
public void DeleteBlob(string path) { }
public System.BinaryData DownloadBlob(string path) { throw null; }
public string UploadBlob(object json, string? name = null) { throw null; }
public void WhenBlobCreated(System.Func<string, System.Threading.Tasks.Task> function) { }
public void WhenBlobUploaded(System.Action<string> function) { }
public void WhenBlobUploaded(System.Action<Azure.CloudMachine.StorageFile> function) { }
}
}
namespace Azure.Core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<PackageReference Include="Azure.AI.OpenAI" />
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Azure.Messaging.ServiceBus" />
<PackageReference Include="Azure.Messaging.EventGrid" />
<PackageReference Include="Azure.Security.KeyVault.Secrets" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" VersionOverride="8.0.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ namespace Azure.Provisioning.CloudMachine;

public class CloudMachineInfrastructure
{
internal const string SB_PRIVATE_TOPIC = "cm_servicebus_topic_private";
internal const string SB_PRIVATE_SUB = "cm_servicebus_subscription_private";
private readonly string _cmid;

private Infrastructure _infrastructure = new Infrastructure("cm");
Expand Down Expand Up @@ -114,9 +116,9 @@ public CloudMachineInfrastructure(string cmId)
SupportOrdering = true,
Status = ServiceBusMessagingEntityStatus.Active
};
_serviceBusSubscription_private = new("cm_servicebus_subscription_private", "2021-11-01")
_serviceBusSubscription_private = new(SB_PRIVATE_SUB, "2021-11-01")
{
Name = "cm_servicebus_subscription_private",
Name = SB_PRIVATE_SUB,
Parent = _serviceBusTopic_private,
IsClientAffine = false,
LockDuration = new StringLiteral("PT30S"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Diagnostics.CodeAnalysis;
using Azure.Core;
using Azure.Identity;
using Azure.Provisioning.CloudMachine;
using Microsoft.Extensions.Configuration;

namespace Azure.CloudMachine;
Expand Down Expand Up @@ -52,9 +53,9 @@ public override ClientConnectionOptions GetConnectionOptions(Type clientType, st
case "Azure.Security.KeyVault.Secrets.SecretClient":
return new ClientConnectionOptions(new($"https://{this.Id}.vault.azure.net/"), Credential);
case "Azure.Messaging.ServiceBus.ServiceBusClient":
return new ClientConnectionOptions(new($"{this.Id}.servicebus.windows.net"), Credential);
return new ClientConnectionOptions(new($"https://{this.Id}.servicebus.windows.net"), Credential);
case "Azure.Messaging.ServiceBus.ServiceBusSender":
if (instanceId == default) instanceId = "cm_default_topic_sender";
if (instanceId == default) instanceId = CloudMachineInfrastructure.SB_PRIVATE_TOPIC;
return new ClientConnectionOptions(instanceId);
case "Azure.Storage.Blobs.BlobContainerClient":
if (instanceId == default) instanceId = "default";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// Licensed under the MIT License.

using System;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Messaging.ServiceBus;
using Azure.Provisioning.CloudMachine;

namespace Azure.CloudMachine;

Expand All @@ -25,7 +27,20 @@ public void SendMessage(object serializable)

public void WhenMessageReceived(Action<string> received)
{
throw new NotImplementedException();
var processor = _cm.Messaging.GetServiceBusProcessor();
var cm = _cm;

// TODO: How to unsubscribe?
// TODO: Use a subscription filter to ignore Event Grid system events
processor.ProcessMessageAsync += async (args) =>
{
received(args.Message.Body.ToString());
await args.CompleteMessageAsync(args.Message).ConfigureAwait(false);
await Task.CompletedTask.ConfigureAwait(false);
};
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult().
processor.StartProcessingAsync().GetAwaiter().GetResult();
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult().
}

private ServiceBusClient GetServiceBusClient()
Expand All @@ -42,6 +57,13 @@ private ServiceBusSender GetServiceBusSender()
return sender;
}

internal ServiceBusProcessor GetServiceBusProcessor()
{
MessagingServices messagingServices = this;
ServiceBusProcessor sender = _cm.Subclients.Get(() => messagingServices.CreateProcessor());
return sender;
}

private ServiceBusSender CreateSender()
{
ServiceBusClient client = GetServiceBusClient();
Expand All @@ -53,7 +75,19 @@ private ServiceBusSender CreateSender()
private ServiceBusClient CreateClient()
{
ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusClient));
ServiceBusClient client = new(connection.Id, connection.TokenCredential);
ServiceBusClient client = new(connection.Endpoint!.AbsoluteUri, connection.TokenCredential);
return client;
}
private ServiceBusProcessor CreateProcessor()
{
ServiceBusClient client = GetServiceBusClient();

ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusSender));
ServiceBusProcessor processor = client.CreateProcessor(
connection.Id,
CloudMachineInfrastructure.SB_PRIVATE_SUB,
new() { ReceiveMode = ServiceBusReceiveMode.PeekLock, MaxConcurrentCalls = 5 });
processor.ProcessErrorAsync += (args) => throw new Exception("error processing event", args.Exception);
return processor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.ComponentModel;
using System.Threading;

namespace Azure.CloudMachine;

public class StorageFile
{
private readonly Response? _response;

private StorageServices _storage;
public string Path { get; internal set; }

/// <summary>
/// The requestId for the storage operation that triggered this event
/// </summary>
public string RequestId { get; internal set; }

/// <summary>
///
/// </summary>
/// <param name="result"></param>
/// <remarks>returns null if the file is not created as a return value of a service method call.</remarks>
public static implicit operator Response?(StorageFile result) => result._response;

public CancellationToken CancellationToken { get; internal set; }

public BinaryData Download()
=> _storage.DownloadBlob(Path);

// public async Task<BinaryData> DownloadAsync()
// => await _storage.DownloadBlobAsync(Path).ConfigureAwait(false);

public void Delete()
=> _storage.DeleteBlob(Path);

// public async Task DeleteAsync()
// => await _storage.DeleteBlobAsync(Path).ConfigureAwait(false);

// public Uri ShareFolder(AccessPermissions permissions, TimeSpan expiresAfter)
// => _storage.ShareFolder(Path, permissions, expiresAfter);

// public Uri ShareFile(AccessPermissions permissions, TimeSpan expiresAfter)
// => _storage.ShareFile(Path, permissions, expiresAfter);

internal StorageFile(StorageServices storage, string path, string requestId, Response? response = default)
{
_storage = storage;
Path = path;
RequestId = requestId;
_response = response;
}

[EditorBrowsable(EditorBrowsableState.Never)]
public override bool Equals(object obj) => base.Equals(obj);

[EditorBrowsable(EditorBrowsableState.Never)]
public override int GetHashCode() => base.GetHashCode();

[EditorBrowsable(EditorBrowsableState.Never)]
public override string ToString() => $"{Path}";

[EditorBrowsable(EditorBrowsableState.Never)]
public new Type GetType => base.GetType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// Licensed under the MIT License.

using System;
using System.IO;
using System.Threading.Tasks;
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.SystemEvents;
using Azure.Core;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
Expand All @@ -26,31 +29,105 @@ private BlobContainerClient GetDefaultContainer()
return container;
}

private BlobContainerClient GetContainer(string containerName)
{
string blobContainerClientId = typeof(BlobContainerClient).FullName;
CloudMachineClient cm = _cm;
BlobContainerClient container = cm.Subclients.Get(() =>
{
ClientConnectionOptions connection = cm.GetConnectionOptions(typeof(BlobContainerClient), containerName);
BlobContainerClient container = new(connection.Endpoint, connection.TokenCredential);
return container;
});
return container;
}

public string UploadBlob(object json, string? name = default)
{
BlobContainerClient container = GetDefaultContainer();

if (name == default) name = $"b{Guid.NewGuid()}";
if (name == default)
name = $"b{Guid.NewGuid()}";

container.UploadBlob(name, BinaryData.FromObjectAsJson(json));

return name;
}

public BinaryData DownloadBlob(string name)
public BinaryData DownloadBlob(string path)
{
BlobContainerClient container = GetDefaultContainer();
BlobClient blob = container.GetBlobClient(name);
BlobClient blob = GetBlobClientFromPath(path, null);
BlobDownloadResult result = blob.DownloadContent();
return result.Content;
}

public void WhenBlobUploaded(Action<string> function)
public void DeleteBlob(string path)
{
BlobClient blob = GetBlobClientFromPath(path, null);
blob.DeleteIfExists();
}

private BlobClient GetBlobClientFromPath(string path, string? containerName)
{
var _blobContainer = GetDefaultContainer();
var blobPath = ConvertPathToBlobPath(path, _blobContainer);
if (containerName is null)
{
return _blobContainer.GetBlobClient(blobPath);
}
else
{
var container = GetContainer(containerName);
container.CreateIfNotExists();
return container.GetBlobClient(blobPath);
}
}

private static string ConvertPathToBlobPath(string path, BlobContainerClient container)
{
throw new NotImplementedException();
if (Uri.TryCreate(path, UriKind.Absolute, out Uri blobUri))
{
if (blobUri.Host == container.Uri.Host)
return blobUri.AbsoluteUri.Substring(container.Uri.AbsoluteUri.Length);
if (!string.IsNullOrEmpty(blobUri.LocalPath))
{
return blobUri.LocalPath.Substring(Path.GetPathRoot(path).Length).Replace('\\', '/');
}
}
return path.Substring(Path.GetPathRoot(path).Length).Replace('\\', '/');
}
public void WhenBlobCreated(Func<string, Task> function)

public void WhenBlobUploaded(Action<StorageFile> function)
{
throw new NotImplementedException();
var processor = _cm.Messaging.GetServiceBusProcessor();
var cm = _cm;

// TODO: How to unsubscribe?
processor.ProcessMessageAsync += async (args) =>
{
EventGridEvent e = EventGridEvent.Parse(args.Message.Body);
if (e.TryGetSystemEventData(out object systemEvent))
{
switch (systemEvent)
{
case StorageBlobCreatedEventData bc:
var blobUri = bc.Url;
var requestId = bc.ClientRequestId;
// _logger.Log.EventReceived(nameof(OnProcessMessage), $"StorageBlobCreatedEventData: blobUri='{blobUri}' requestId='{requestId}'");

var eventArgs = new StorageFile(cm.Storage, blobUri, requestId, default);
function(eventArgs);
await args.CompleteMessageAsync(args.Message).ConfigureAwait(false);
break;
default:
break;
}
}
await Task.CompletedTask.ConfigureAwait(false);
};
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult().
processor.StartProcessingAsync().GetAwaiter().GetResult();
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult().

}
}
Loading