Skip to content

Commit

Permalink
Allowing SingletonAttribute to specify a StorageAccount for the lock
Browse files Browse the repository at this point in the history
  • Loading branch information
mathewc committed Oct 16, 2015
1 parent 830d2ad commit 8cd539b
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
Expand All @@ -25,7 +24,6 @@
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Newtonsoft.Json.Serialization;

namespace Microsoft.Azure.WebJobs.Host.Executors
{
Expand Down Expand Up @@ -117,8 +115,7 @@ public static async Task<JobHostContext> CreateAndLogHostStartedAsync(

if (singletonManager == null)
{
IStorageAccount storageAccount = await storageAccountProvider.GetStorageAccountAsync(cancellationToken);
singletonManager = new SingletonManager(storageAccount.CreateBlobClient(), backgroundExceptionDispatcher, config.Singleton, trace, config.NameResolver);
singletonManager = new SingletonManager(storageAccountProvider, backgroundExceptionDispatcher, config.Singleton, trace, config.NameResolver);
}

using (CancellationTokenSource combinedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, shutdownToken))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public async Task ReleaseAsync(CancellationToken cancellationToken)
/// <returns></returns>
public async Task<string> GetOwnerAsync(CancellationToken cancellationToken)
{
return await _singletonManager.GetLockOwnerAsync(_lockId, cancellationToken);
return await _singletonManager.GetLockOwnerAsync(_attribute, _lockId, cancellationToken);
}
}
}
39 changes: 32 additions & 7 deletions src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
Expand All @@ -11,6 +12,7 @@
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Bindings.Path;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Timers;
Expand All @@ -28,7 +30,8 @@ internal class SingletonManager
private readonly INameResolver _nameResolver;
private readonly IBackgroundExceptionDispatcher _backgroundExceptionDispatcher;
private readonly SingletonConfiguration _config;
private IStorageBlobDirectory _directory;
private readonly IStorageAccountProvider _accountProvider;
private ConcurrentDictionary<string, IStorageBlobDirectory> _lockDirectoryMap = new ConcurrentDictionary<string, IStorageBlobDirectory>(StringComparer.OrdinalIgnoreCase);
private TimeSpan _minimumLeaseRenewalInterval = TimeSpan.FromSeconds(1);
private TraceWriter _trace;

Expand All @@ -37,12 +40,11 @@ internal SingletonManager()
{
}

public SingletonManager(IStorageBlobClient blobClient, IBackgroundExceptionDispatcher backgroundExceptionDispatcher, SingletonConfiguration config, TraceWriter trace, INameResolver nameResolver = null)
public SingletonManager(IStorageAccountProvider accountProvider, IBackgroundExceptionDispatcher backgroundExceptionDispatcher, SingletonConfiguration config, TraceWriter trace, INameResolver nameResolver = null)
{
_accountProvider = accountProvider;
_nameResolver = nameResolver;
_backgroundExceptionDispatcher = backgroundExceptionDispatcher;
_directory = blobClient.GetContainerReference(HostContainerNames.Hosts)
.GetDirectoryReference(HostDirectoryNames.SingletonLocks);
_config = config;
_trace = trace;
}
Expand Down Expand Up @@ -85,7 +87,8 @@ public async virtual Task<object> LockAsync(string lockId, string functionInstan

public async virtual Task<object> TryLockAsync(string lockId, string functionInstanceId, SingletonAttribute attribute, CancellationToken cancellationToken)
{
IStorageBlockBlob lockBlob = _directory.GetBlockBlobReference(lockId);
IStorageBlobDirectory lockDirectory = GetLockDirectory(attribute.Account);
IStorageBlockBlob lockBlob = lockDirectory.GetBlockBlobReference(lockId);
await TryCreateAsync(lockBlob, cancellationToken);

_trace.Verbose(string.Format(CultureInfo.InvariantCulture, "Waiting for Singleton lock ({0})", lockId), source: TraceSource.Execution);
Expand Down Expand Up @@ -220,9 +223,10 @@ public static SingletonAttribute GetListenerSingletonOrNull(Type listenerType, M
return singletonAttribute;
}

public async virtual Task<string> GetLockOwnerAsync(string lockId, CancellationToken cancellationToken)
public async virtual Task<string> GetLockOwnerAsync(SingletonAttribute attribute, string lockId, CancellationToken cancellationToken)
{
IStorageBlockBlob lockBlob = _directory.GetBlockBlobReference(lockId);
IStorageBlobDirectory lockDirectory = GetLockDirectory(attribute.Account);
IStorageBlockBlob lockBlob = lockDirectory.GetBlockBlobReference(lockId);

await ReadLeaseBlobMetadata(lockBlob, cancellationToken);

Expand All @@ -240,6 +244,27 @@ public async virtual Task<string> GetLockOwnerAsync(string lockId, CancellationT
return owner;
}

internal IStorageBlobDirectory GetLockDirectory(string accountName)
{
if (string.IsNullOrEmpty(accountName))
{
accountName = ConnectionStringNames.Storage;
}

IStorageBlobDirectory storageDirectory = null;
if (!_lockDirectoryMap.TryGetValue(accountName, out storageDirectory))
{
Task<IStorageAccount> task = _accountProvider.GetAccountAsync(accountName, CancellationToken.None);
IStorageAccount storageAccount = task.Result;
IStorageBlobClient blobClient = storageAccount.CreateBlobClient();
storageDirectory = blobClient.GetContainerReference(HostContainerNames.Hosts)
.GetDirectoryReference(HostDirectoryNames.SingletonLocks);
_lockDirectoryMap[accountName] = storageDirectory;
}

return storageDirectory;
}

private ITaskSeriesTimer CreateLeaseRenewalTimer(IStorageBlockBlob leaseBlob, string leaseId, string lockId, TimeSpan leasePeriod,
IBackgroundExceptionDispatcher backgroundExceptionDispatcher)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace Microsoft.Azure.WebJobs
{
/// <summary>
/// Attribute used to override the default ServiceBus account used.
/// Attribute used to override the default ServiceBus account used by triggers and binders.
/// </summary>
/// <remarks>
/// This attribute can be applied at the parameter/method/class level, and the precedence
Expand Down
36 changes: 20 additions & 16 deletions src/Microsoft.Azure.WebJobs/SingletonAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Microsoft.Azure.WebJobs
/// <summary>
/// This attribute can be applied to a job functions to ensure that only a single
/// instance of the function is executed at any given time (even across host instances).
/// Distributed locks are used to serialize invocations across all host instances.
/// A blob lease is used behind the scenes to implement the lock.
/// <remarks>
/// This attribute can also be used in <see cref="SingletonMode.Listener"/> mode to ensure that
/// the listener for a triggered function is only running on a single instance. Trigger bindings
Expand All @@ -25,11 +25,8 @@ public sealed class SingletonAttribute : Attribute
/// <summary>
/// Constructs a new instance.
/// </summary>
/// <param name="mode">The <see cref="SingletonMode"/> this singleton should use.
/// Defaults to <see cref="SingletonMode.Function"/> if not explicitly specified.
/// </param>
public SingletonAttribute(SingletonMode mode = SingletonMode.Function)
: this(string.Empty, mode)
public SingletonAttribute()
: this(string.Empty)
{
}

Expand All @@ -38,20 +35,12 @@ public SingletonAttribute(SingletonMode mode = SingletonMode.Function)
/// </summary>
/// <param name="scope">The scope for the singleton lock. When applied to triggered
/// job functions, this value can include binding parameters.</param>
/// <param name="mode">The <see cref="SingletonMode"/> this singleton should use.
/// Defaults to <see cref="SingletonMode.Function"/> if not explicitly specified.
/// </param>
public SingletonAttribute(string scope, SingletonMode mode = SingletonMode.Function)
public SingletonAttribute(string scope)
{
Scope = scope;
Mode = mode;
Mode = SingletonMode.Function;
}

/// <summary>
/// Gets the <see cref="SingletonMode"/>.
/// </summary>
public SingletonMode Mode { get; private set; }

/// <summary>
/// Gets the scope identifier for the singleton lock.
/// </summary>
Expand All @@ -61,6 +50,21 @@ public string Scope
private set;
}

/// <summary>
/// Gets or sets the <see cref="SingletonMode"/> this singleton should use.
/// Defaults to <see cref="SingletonMode.Function"/> if not explicitly specified.
/// </summary>
public SingletonMode Mode { get; set; }

/// <summary>
/// Gets the name of the Azure Storage account that the blob lease should be
/// created in.
/// </summary>
/// <remarks>
/// If not specified, the default AzureWebJobs storage account will be used.
/// </remarks>
public string Account { get; set; }

/// <summary>
/// Gets or sets the timeout value in seconds for lock acquisition.
/// If the lock is not obtained within this interval, the invocation will fail.
Expand Down
4 changes: 2 additions & 2 deletions src/Microsoft.Azure.WebJobs/StorageAccountAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace Microsoft.Azure.WebJobs
{
/// <summary>
/// Attribute used to override the default Azure Storage account used.
/// Attribute used to override the default Azure Storage account used by triggers and binders.
/// </summary>
/// <remarks>
/// This attribute can be applied at the parameter/method/class level, and the precedence
Expand All @@ -29,7 +29,7 @@ public StorageAccountAttribute(string account)
}

/// <summary>
/// Gets the Azure Storage account name to use.
/// Gets the name of the Azure Storage account to use.
/// </summary>
public string Account { get; private set; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public async Task AsyncChainEndToEnd_CustomFactories()
Assert.True(queueProcessorFactory.CustomQueueProcessors.Sum(p => p.BeginProcessingCount) >= 2);
Assert.True(queueProcessorFactory.CustomQueueProcessors.Sum(p => p.CompleteProcessingCount) >= 2);

Assert.Equal(14, storageClientFactory.TotalBlobClientCount);
Assert.Equal(13, storageClientFactory.TotalBlobClientCount);
Assert.Equal(6, storageClientFactory.TotalQueueClientCount);
Assert.Equal(0, storageClientFactory.TotalTableClientCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,24 @@ public class SingletonEndToEndTests : IClassFixture<SingletonEndToEndTests.TestF
private const string TestArtifactsPrefix = "singletone2e";
private const string Queue1Name = TestArtifactsPrefix + "q1%rnd%";
private const string Queue2Name = TestArtifactsPrefix + "q2%rnd%";
private const string Secondary = "SecondaryStorage";
private Random _rand = new Random(314159);

private static RandomNameResolver _resolver = new TestNameResolver();
private static CloudBlobDirectory _lockDirectory;
private static CloudBlobDirectory _secondaryLockDirectory;

static SingletonEndToEndTests()
{
JobHostConfiguration config = new JobHostConfiguration();
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(config.StorageConnectionString);
CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
_lockDirectory = blobClient.GetContainerReference("azure-webjobs-hosts").GetDirectoryReference("locks");

string secondaryConnectionString = AmbientConnectionStringProvider.Instance.GetConnectionString(Secondary);
storageAccount = CloudStorageAccount.Parse(secondaryConnectionString);
blobClient = storageAccount.CreateCloudBlobClient();
_secondaryLockDirectory = blobClient.GetContainerReference("azure-webjobs-hosts").GetDirectoryReference("locks");
}

public SingletonEndToEndTests()
Expand Down Expand Up @@ -221,7 +228,7 @@ public async Task SingletonFunction_Exception_LeaseReleasedImmediately()
}

Assert.Equal("Exception while executing function: TestJobs.SingletonJob", exception.Message);
VerifyLeaseState(method, null, LeaseState.Available, LeaseStatus.Unlocked);
VerifyLeaseState(method, "TestValue", LeaseState.Available, LeaseStatus.Unlocked);

host.Stop();
host.Dispose();
Expand All @@ -244,18 +251,52 @@ public async Task QueueFunction_SingletonListener()
VerifyLeaseState(method, "Listener", LeaseState.Available, LeaseStatus.Unlocked);
}

internal static void VerifyLeaseState(MethodInfo method, string scope, LeaseState leaseState, LeaseStatus leaseStatus)
[Fact]
public async Task SingletonFunction_StorageAccountOverride()
{
JobHost host = CreateTestJobHost(1);
await host.StartAsync();

MethodInfo method = typeof(TestJobs).GetMethod("SingletonJob_StorageAccountOverride");

await host.CallAsync(method, new { message = "{}" });

await host.StopAsync();
host.Dispose();

// make sure the lease blob was only created in the secondary account
VerifyLeaseDoesNotExist(method, null);
VerifyLeaseState(method, null, LeaseState.Available, LeaseStatus.Unlocked, directory: _secondaryLockDirectory);
}

internal static void VerifyLeaseState(MethodInfo method, string scope, LeaseState leaseState, LeaseStatus leaseStatus, CloudBlobDirectory directory = null)
{
string lockId = FormatLockId(method, scope);

CloudBlobDirectory lockDirectory = directory ?? _lockDirectory;
CloudBlockBlob lockBlob = lockDirectory.GetBlockBlobReference(lockId);
lockBlob.FetchAttributes();
Assert.Equal(leaseState, lockBlob.Properties.LeaseState);
Assert.Equal(leaseStatus, lockBlob.Properties.LeaseStatus);
}

internal static void VerifyLeaseDoesNotExist(MethodInfo method, string scope, CloudBlobDirectory directory = null)
{
string lockId = FormatLockId(method, scope);

CloudBlobDirectory lockDirectory = directory ?? _lockDirectory;
CloudBlockBlob lockBlob = lockDirectory.GetBlockBlobReference(lockId);
Assert.False(lockBlob.Exists());
}

private static string FormatLockId(MethodInfo method, string scope)
{
string lockId = string.Format("{0}.{1}", method.DeclaringType.FullName, method.Name);
if (!string.IsNullOrEmpty(scope))
{
lockId += "." + scope;
}

CloudBlockBlob lockBlob = _lockDirectory.GetBlockBlobReference(lockId);
lockBlob.FetchAttributes();
Assert.Equal(leaseState, lockBlob.Properties.LeaseState);
Assert.Equal(leaseStatus, lockBlob.Properties.LeaseStatus);
return lockId;
}

public class WorkItem
Expand All @@ -269,6 +310,7 @@ public class WorkItem

public class TestJobs
{
private const string Secondary = "SecondaryStorage";
public const string LeaseBlobRootPath = "Microsoft.Azure.WebJobs.Host.EndToEndTests.SingletonEndToEndTests+TestJobs";
public static int Queue1MessageCount = 0;
public static int Queue2MessageCount = 0;
Expand Down Expand Up @@ -326,6 +368,20 @@ public async Task SingletonJob(WorkItem workItem)
UpdateScopeLock(scope, false);
}

[Singleton(Account = Secondary)]
[NoAutomaticTrigger]
public async Task SingletonJob_StorageAccountOverride()
{
VerifyLeaseState(
GetType().GetMethod("SingletonJob_StorageAccountOverride"),
null,
LeaseState.Leased, LeaseStatus.Locked,
_secondaryLockDirectory);

await Task.Delay(50);
IncrementJobInvocationCount();
}

// Job with an implicit Singleton lock on the trigger listener
public async Task TriggerJob_SingletonListener([TestTrigger] string test)
{
Expand Down Expand Up @@ -362,7 +418,7 @@ public async Task SingletonTriggerJob_SingletonListener([TestTrigger] string tes
// Override the implicit listener Singleton by providing our own
// Singleton using Mode = Listener.
[Singleton("TestScope")]
[Singleton("TestScope%test%", SingletonMode.Listener)]
[Singleton("TestScope%test%", Mode = SingletonMode.Listener)]
public async Task SingletonTriggerJob_SingletonListener_ListenerSingletonOverride([TestTrigger] string test)
{
VerifyLeaseState(
Expand All @@ -379,7 +435,7 @@ public async Task SingletonTriggerJob_SingletonListener_ListenerSingletonOverrid
IncrementJobInvocationCount();
}

[Singleton(SingletonMode.Listener)]
[Singleton(Mode = SingletonMode.Listener)]
public async Task QueueFunction_SingletonListener([QueueTrigger("xyz123")] string message)
{
VerifyLeaseState(
Expand Down Expand Up @@ -530,7 +586,7 @@ public ParameterDescriptor ToParameterDescriptor()
return new ParameterDescriptor();
}

[Singleton(SingletonMode.Listener)]
[Singleton(Mode = SingletonMode.Listener)]
public class TestTriggerListener : IListener
{
public static int StartCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,9 @@ private static IServiceProvider CreateServiceProvider<TResult>(IStorageAccount s
IFunctionOutputLogger functionOutputLogger = task.Result;
FunctionExecutor executor = new FunctionExecutor(functionInstanceLogger, functionOutputLogger, backgroundExceptionDispatcher, new TestTraceWriter(TraceLevel.Verbose));

Task<IStorageAccount> storageAccountTask = storageAccountProvider.GetStorageAccountAsync(CancellationToken.None);
storageAccountTask.Wait();
SingletonConfiguration singletonConfig = new SingletonConfiguration();
TestTraceWriter trace = new TestTraceWriter(TraceLevel.Verbose);
SingletonManager singletonManager = new SingletonManager(storageAccountTask.Result.CreateBlobClient(), backgroundExceptionDispatcher, singletonConfig, trace);
SingletonManager singletonManager = new SingletonManager(storageAccountProvider, backgroundExceptionDispatcher, singletonConfig, trace);

ITypeLocator typeLocator = new FakeTypeLocator(programType);
FunctionIndexProvider functionIndexProvider = new FunctionIndexProvider(
Expand Down
Loading

0 comments on commit 8cd539b

Please sign in to comment.