Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store absolute blob URIs instead of just the blob name #929

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions src/DurableTask.AzureStorage/MessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ public Task<bool> DeleteBlobAsync(string blobName)
return blob.DeleteIfExistsAsync();
}

public Task<bool> DeleteBlobAsync(Uri blobUri)
{
Blob blob = this.azureStorageClient.GetBlobReference(blobUri);
return blob.DeleteIfExistsAsync();
}

private async Task<string> DownloadAndDecompressAsBytesAsync(Blob blob)
{
using (MemoryStream memory = new MemoryStream(MaxStorageQueuePayloadSizeInBytes * 2))
Expand Down
48 changes: 43 additions & 5 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,17 @@ public override async Task<string> UpdateStateAsync(
var tasks = new List<Task>(blobsToDelete.Count);
foreach (var blobName in blobsToDelete)
{
tasks.Add(this.messageManager.DeleteBlobAsync(blobName));
Task task;
if (Uri.TryCreate(blobName, UriKind.Absolute, out Uri blobUri))
{
task = this.messageManager.DeleteBlobAsync(blobUri);
}
else
{
// backwards compatibility path: construct blob URI from blob name and delete
task = this.messageManager.DeleteBlobAsync(blobName);
}
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
Expand Down Expand Up @@ -1190,8 +1200,21 @@ void SetInstancesTablePropertyFromHistoryProperty(
if (historyEntity.Properties.TryGetValue(blobPropertyName, out EntityProperty blobProperty))
{
// This is a large message
string blobName = blobProperty.StringValue;
string blobUrl = this.messageManager.GetBlobUrl(blobName);
string blobData = blobProperty.StringValue;

// We now store blobs as absolute URIs to minimize chance of 'blob not found' errors
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is too vague. I think it needs to be more specific to help future maintainers understand the implications:

Suggested change
// We now store blobs as absolute URIs to minimize chance of 'blob not found' errors
// As of 1.15.1, we store blobs as absolute URIs so that we don't have to assume what task hub they were created for.
// This is necessary in cases where a common tracking store is shared across multiple task hubs.

// For backwards compatibility to versions where we only stored the blobname, we check if the blob is a URI
string blobUrl;
if (!Uri.TryCreate(blobData, UriKind.Absolute, out Uri _))
{
// backwards compatibility path: we construct the URL from the blobName
blobUrl = this.messageManager.GetBlobUrl(blobData);
}
else
{
blobUrl = blobData;
}

instanceEntity.Properties[instancePropertyName] = new EntityProperty(blobUrl);
}
else
Expand All @@ -1216,7 +1239,10 @@ async Task CompressLargeMessageAsync(DynamicTableEntity entity, List<string> lis
// Clear out the original property value and create a new "*BlobName"-suffixed property.
// The runtime will look for the new "*BlobName"-suffixed column to know if a property is stored in a blob.
string blobPropertyName = GetBlobPropertyName(propertyName);
entity.Properties.Add(blobPropertyName, new EntityProperty(blobName));

// Store blob URL so the blob can always be downloaded from the absolute path
string bloburl = this.messageManager.GetBlobUrl(blobName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C# naming nit: should be blobUrl

entity.Properties.Add(blobPropertyName, new EntityProperty(bloburl));
entity.Properties[propertyName].StringValue = string.Empty;

// if necessary, keep track of all the blobs associated with this execution
Expand All @@ -1234,7 +1260,19 @@ async Task DecompressLargeEntityProperties(DynamicTableEntity entity, List<strin
if (entity.Properties.TryGetValue(blobPropertyName, out EntityProperty property))
{
string blobName = property.StringValue;
string decompressedMessage = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobName);

// We now store blob URIs instead of just blob names to prevent blob not found errors
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about making the code comment more specific. I think the important point here is that older versions store just the blob name, but newer versions store blobs as full URLs. I don't think it's necessary to talk about "blob not found" here because that assumes very specific context of a bug fix which future maintainers won't know about.

string decompressedMessage;
if (Uri.TryCreate(blobName, UriKind.Absolute, out Uri blobUri))
{
decompressedMessage = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUri);
}
else
{
// backwards compatibility path: construct blob URI from blob name and download
decompressedMessage = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobName);
}

entity.Properties[propertyName] = new EntityProperty(decompressedMessage);
entity.Properties.Remove(blobPropertyName);

Expand Down