Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add nullable analysis to blob manager #1153

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ namespace DurableTask.AzureStorage
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Data.Tables;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Queues.Models;
using DurableTask.AzureStorage.Messaging;
using DurableTask.AzureStorage.Monitoring;
Expand All @@ -35,7 +33,6 @@ namespace DurableTask.AzureStorage
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Newtonsoft.Json;

/// <summary>
/// Orchestration service provider for the Durable Task Framework which uses Azure Storage as the durable store.
Expand Down
32 changes: 20 additions & 12 deletions src/DurableTask.AzureStorage/Partitioning/AppLeaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable
namespace DurableTask.AzureStorage.Partitioning
{
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Storage;
using Newtonsoft.Json;

/// <summary>
/// Class responsible for starting and stopping the partition manager. Also implements the app lease feature to ensure a single app's partition manager is started at a time.
Expand All @@ -45,10 +44,10 @@ sealed class AppLeaseManager

bool isLeaseOwner;
int appLeaseIsStarted;
Task renewTask;
Task acquireTask;
CancellationTokenSource starterTokenSource;
CancellationTokenSource leaseRenewerCancellationTokenSource;
Task? renewTask;
Task? acquireTask;
CancellationTokenSource? starterTokenSource;
CancellationTokenSource? leaseRenewerCancellationTokenSource;

public AppLeaseManager(
AzureStorageClient azureStorageClient,
Expand Down Expand Up @@ -271,7 +270,7 @@ async Task StopAppLeaseAsync()

if (this.renewTask != null)
{
this.leaseRenewerCancellationTokenSource.Cancel();
this.leaseRenewerCancellationTokenSource!.Cancel();
await this.renewTask;
}

Expand All @@ -289,6 +288,10 @@ async Task<bool> TryAcquireAppLeaseAsync()
bool leaseAcquired;
if (appLeaseInfo.DesiredSwapId == this.appLeaseId)
{
if (appLeaseInfo.OwnerId == null)
{
throw new Exception("App lease info is in an invalid state. DesiredSwapId is set but OwnerId is null.");
}
leaseAcquired = await this.ChangeLeaseAsync(appLeaseInfo.OwnerId);
}
else
Expand Down Expand Up @@ -428,7 +431,7 @@ async Task LeaseRenewer(CancellationToken cancellationToken)
break;
}

await Task.Delay(this.options.RenewInterval, this.leaseRenewerCancellationTokenSource.Token);
await Task.Delay(this.options.RenewInterval, this.leaseRenewerCancellationTokenSource!.Token);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -564,19 +567,24 @@ async Task UpdateAppLeaseInfoBlob(AppLeaseInfo appLeaseInfo)

async Task<AppLeaseInfo> GetAppLeaseInfoAsync()
{
AppLeaseInfo? appLeaseInfo = null;
if (await this.appLeaseInfoBlob.ExistsAsync())
{
string serializedEventHubInfo = await this.appLeaseInfoBlob.DownloadTextAsync();
return Utils.DeserializeFromJson<AppLeaseInfo>(serializedEventHubInfo);
appLeaseInfo = Utils.DeserializeFromJson<AppLeaseInfo>(serializedEventHubInfo);
}

return null;
if (appLeaseInfo == null)
{
throw new Exception("App lease info blob does not exist.");
}
return appLeaseInfo;
}

private class AppLeaseInfo
{
public string OwnerId { get; set; }
public string DesiredSwapId { get; set; }
public string? OwnerId { get; set; }
public string? DesiredSwapId { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable
namespace DurableTask.AzureStorage.Partitioning
{
using System;
Expand Down Expand Up @@ -55,7 +55,8 @@ public BlobPartitionLeaseManager(
this.blobDirectoryName = leaseType;
this.leaseInterval = this.settings.LeaseInterval;

this.Initialize();
this.taskHubContainer = this.azureStorageClient.GetBlobContainerReference(this.leaseContainerName);
this.taskHubInfoBlob = this.taskHubContainer.GetBlobReference(TaskHubInfoBlobName);
}

public Task<bool> LeaseStoreExistsAsync(CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -129,7 +130,7 @@ public async Task CreateLeaseIfNotExistAsync(string partitionId, CancellationTok
}
}

public async Task<BlobPartitionLease> GetLeaseAsync(string partitionId, CancellationToken cancellationToken = default)
public async Task<BlobPartitionLease?> GetLeaseAsync(string partitionId, CancellationToken cancellationToken = default)
{
Blob leaseBlob = this.taskHubContainer.GetBlobReference(partitionId, this.blobDirectoryName);
if (await leaseBlob.ExistsAsync(cancellationToken))
Expand Down Expand Up @@ -260,7 +261,7 @@ public async Task CreateTaskHubInfoIfNotExistAsync(TaskHubInfo taskHubInfo, Canc

internal async Task<TaskHubInfo> GetOrCreateTaskHubInfoAsync(TaskHubInfo newTaskHubInfo, bool checkIfStale, CancellationToken cancellationToken = default)
{
TaskHubInfo currentTaskHubInfo = await this.GetTaskHubInfoAsync(cancellationToken);
TaskHubInfo? currentTaskHubInfo = await this.GetTaskHubInfoAsync(cancellationToken);
if (currentTaskHubInfo != null)
{
if (checkIfStale && IsStale(currentTaskHubInfo, newTaskHubInfo))
Expand Down Expand Up @@ -297,14 +298,7 @@ private bool IsStale(TaskHubInfo currentTaskHubInfo, TaskHubInfo newTaskHubInfo)
|| !currentTaskHubInfo.PartitionCount.Equals(newTaskHubInfo.PartitionCount);
}

void Initialize()
{
this.taskHubContainer = this.azureStorageClient.GetBlobContainerReference(this.leaseContainerName);

this.taskHubInfoBlob = this.taskHubContainer.GetBlobReference(TaskHubInfoBlobName);
}

async Task<TaskHubInfo> GetTaskHubInfoAsync(CancellationToken cancellationToken)
async Task<TaskHubInfo?> GetTaskHubInfoAsync(CancellationToken cancellationToken)
{
if (await this.taskHubInfoBlob.ExistsAsync(cancellationToken))
{
Expand All @@ -318,10 +312,17 @@ async Task<TaskHubInfo> GetTaskHubInfoAsync(CancellationToken cancellationToken)
async Task<BlobPartitionLease> DownloadLeaseBlob(Blob blob, CancellationToken cancellationToken)
{
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken);
BlobPartitionLease deserializedLease = Utils.DeserializeFromJson<BlobPartitionLease>(result.Content);
deserializedLease.Blob = blob;
BlobPartitionLease? deserializedLease = Utils.DeserializeFromJson<BlobPartitionLease>(result.Content);

return deserializedLease;
if (deserializedLease == null)
{
throw new Exception($"Failed to deserialize lease blob: {blob.Name}");
}
else
{
deserializedLease.Blob = blob;
return deserializedLease;
}
}

static Exception HandleStorageException(Lease lease, DurableTaskStorageException storageException, bool ignoreLeaseLost = false)
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.AzureStorage/Partitioning/ILeaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable
namespace DurableTask.AzureStorage.Partitioning
{
using System.Collections.Generic;
Expand All @@ -27,7 +27,7 @@ interface ILeaseManager<T> where T : Lease

Task CreateLeaseIfNotExistAsync(string partitionId, CancellationToken cancellationToken = default);

Task<T> GetLeaseAsync(string partitionId, CancellationToken cancellationToken = default);
Task<T?> GetLeaseAsync(string partitionId, CancellationToken cancellationToken = default);

Task<bool> RenewAsync(T lease, CancellationToken cancellationToken = default);

Expand Down
20 changes: 10 additions & 10 deletions src/DurableTask.AzureStorage/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable
namespace DurableTask.AzureStorage
{
using System;
Expand Down Expand Up @@ -143,7 +143,7 @@ public static bool TryGetTaskScheduledId(HistoryEvent historyEvent, out int task
/// should be "SayHelloActivity"
/// </summary>
/// <param name="s"></param>
public static string GetTargetClassName(this string s)
public static string? GetTargetClassName(this string s)
{
if (s == null)
{
Expand Down Expand Up @@ -191,9 +191,9 @@ public static string SerializeToJson(JsonSerializer serializer, object payload)
/// <param name="serializer">The serializer whose config will guide the deserialization.</param>
/// <param name="jsonString">The JSON-string to deserialize.</param>
/// <returns></returns>
public static T DeserializeFromJson<T>(JsonSerializer serializer, string jsonString)
public static T? DeserializeFromJson<T>(JsonSerializer serializer, string jsonString)
{
T obj;
T? obj;
using (var reader = new StringReader(jsonString))
using (var jsonReader = new JsonTextReader(reader))
{
Expand All @@ -209,7 +209,7 @@ public static T DeserializeFromJson<T>(JsonSerializer serializer, string jsonStr
/// <typeparam name="T">The type to deserialize the JSON string into.</typeparam>
/// <param name="stream">A stream of UTF-8 JSON.</param>
/// <returns>The deserialized value.</returns>
public static T DeserializeFromJson<T>(Stream stream)
public static T? DeserializeFromJson<T>(Stream stream)
{
return DeserializeFromJson<T>(DefaultJsonSerializer, stream);
}
Expand All @@ -222,7 +222,7 @@ public static T DeserializeFromJson<T>(Stream stream)
/// <param name="serializer">The serializer whose config will guide the deserialization.</param>
/// <param name="stream">A stream of UTF-8 JSON.</param>
/// <returns>The deserialized value.</returns>
public static T DeserializeFromJson<T>(JsonSerializer serializer, Stream stream)
public static T? DeserializeFromJson<T>(JsonSerializer serializer, Stream stream)
{
using var reader = new StreamReader(stream, Encoding.UTF8);
using var jsonReader = new JsonTextReader(reader);
Expand All @@ -237,7 +237,7 @@ public static T DeserializeFromJson<T>(JsonSerializer serializer, Stream stream)
/// <typeparam name="T">The type to deserialize the JSON string into.</typeparam>
/// <param name="jsonString">The JSON-string to deserialize.</param>
/// <returns></returns>
public static T DeserializeFromJson<T>(string jsonString)
public static T? DeserializeFromJson<T>(string jsonString)
{
return DeserializeFromJson<T>(DefaultJsonSerializer, jsonString);
}
Expand All @@ -249,7 +249,7 @@ public static T DeserializeFromJson<T>(string jsonString)
/// <param name="jsonString">The JSON-string to deserialize.</param>
/// <param name="type">The expected de-serialization type.</param>
/// <returns></returns>
public static object DeserializeFromJson(string jsonString, Type type)
public static object? DeserializeFromJson(string jsonString, Type type)
{
return DeserializeFromJson(DefaultJsonSerializer, jsonString, type);
}
Expand All @@ -262,9 +262,9 @@ public static object DeserializeFromJson(string jsonString, Type type)
/// <param name="jsonString">The JSON-string to deserialize.</param>
/// <param name="type">The expected de-serialization type.</param>
/// <returns></returns>
public static object DeserializeFromJson(JsonSerializer serializer, string jsonString, Type type)
public static object? DeserializeFromJson(JsonSerializer serializer, string jsonString, Type type)
{
object obj;
object? obj;
using (var reader = new StringReader(jsonString))
using (var jsonReader = new JsonTextReader(reader))
{
Expand Down