Skip to content

Commit

Permalink
Add timeout / cancellation support to Store apis (#535)
Browse files Browse the repository at this point in the history
* Add cancellation token and timed store

* Execute until cancelled changes

* Fix build

* Cleanup code..

* Adding tests

* Remove private

* Fix InMemoryDbStore

* Fix setup

* Don't dispose the tasks..

* Fix ExecuteUntilCancelled

* Fix formatting
  • Loading branch information
varunpuranik authored Dec 1, 2018
1 parent df85d3f commit 0eb279b
Show file tree
Hide file tree
Showing 29 changed files with 703 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.1.1" />
<PackageReference Include="System.Collections.Immutable" Version="1.3.1" />
<PackageReference Include="System.Collections.Immutable" Version="1.5.0" />
<PackageReference Include="System.IO.FileSystem.Watcher" Version="4.3.0" />
<PackageReference Include="System.Reactive" Version="3.1.1" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core.Storage
{
using System;
Expand All @@ -20,10 +20,9 @@ public class CheckpointStore : ICheckpointStore
this.underlyingStore = underlyingStore;
}

public static CheckpointStore Create(IDbStoreProvider dbStoreProvider)
public static CheckpointStore Create(IStoreProvider storeProvider)
{
IDbStore dbStore = Preconditions.CheckNotNull(dbStoreProvider, nameof(dbStoreProvider)).GetDbStore(Constants.CheckpointStorePartitionKey);
IEntityStore<string, CheckpointEntity> underlyingStore = new EntityStore<string, CheckpointEntity>(dbStore, nameof(CheckpointEntity), 12);
IEntityStore<string, CheckpointEntity> underlyingStore = Preconditions.CheckNotNull(storeProvider, nameof(storeProvider)).GetEntityStore<string, CheckpointEntity>(Constants.CheckpointStorePartitionKey);
return new CheckpointStore(underlyingStore);
}

Expand Down Expand Up @@ -89,4 +88,4 @@ public CheckpointEntity(long offset, DateTime? lastFailedRevivalTime, DateTime?
public DateTime? UnhealthySince { get; }
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,12 @@ protected override void Load(ContainerBuilder builder)
.SingleInstance();

// ICheckpointStore
builder.Register(c => CheckpointStore.Create(c.Resolve<IDbStoreProvider>()))
builder.Register(c =>
{
var dbStoreProvider = c.Resolve<IDbStoreProvider>();
IStoreProvider storeProvider = new StoreProvider(dbStoreProvider);
return CheckpointStore.Create(storeProvider);
})
.As<ICheckpointStore>()
.SingleInstance();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public async Task TestRoutingTwinChangeNotificationFromModule()
var routerConfig = new RouterConfig(endpoints, routesList);
IDbStoreProvider dbStoreProvider = new InMemoryDbStoreProvider();
IStoreProvider storeProvider = new StoreProvider(dbStoreProvider);
IMessageStore messageStore = new MessageStore(storeProvider, CheckpointStore.Create(dbStoreProvider), TimeSpan.MaxValue);
IMessageStore messageStore = new MessageStore(storeProvider, CheckpointStore.Create(storeProvider), TimeSpan.MaxValue);
IEndpointExecutorFactory endpointExecutorFactory = new StoringAsyncEndpointExecutorFactory(endpointExecutorConfig, new AsyncEndpointExecutorOptions(1, TimeSpan.FromMilliseconds(10)), messageStore);
Router router = await Router.CreateAsync(Guid.NewGuid().ToString(), iotHubName, routerConfig, endpointExecutorFactory);
ITwinManager twinManager = new TwinManager(connectionManager, new TwinCollectionMessageConverter(), new TwinMessageConverter(), Option.None<IEntityStore<string, TwinInfo>>());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core.Test.Storage
{
using System;
Expand All @@ -18,7 +18,7 @@ public class CheckpointStoreTest
[Fact]
public async Task CheckpointStoreBasicTest()
{
ICheckpointStore checkpointStore = CheckpointStore.Create(new InMemoryDbStoreProvider());
ICheckpointStore checkpointStore = CheckpointStore.Create(new StoreProvider(new InMemoryDbStoreProvider()));

for (long i = 0; i < 10; i++)
{
Expand Down Expand Up @@ -88,4 +88,4 @@ public void GetCheckpointDataTest()
Assert.Equal(unhealthySinceTime, checkpointData2.UnhealthySince.OrDefault());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public async Task MessageStoreAddRemoveEndpointTest()
// Arrange
var dbStoreProvider = new InMemoryDbStoreProvider();
IStoreProvider storeProvider = new StoreProvider(dbStoreProvider);
ICheckpointStore checkpointStore = CheckpointStore.Create(dbStoreProvider);
ICheckpointStore checkpointStore = CheckpointStore.Create(storeProvider);
IMessageStore messageStore = new MessageStore(storeProvider, checkpointStore, TimeSpan.FromHours(1));

// Act
Expand Down Expand Up @@ -278,7 +278,7 @@ IMessage GetMessage(int i)
{
var dbStoreProvider = new InMemoryDbStoreProvider();
IStoreProvider storeProvider = new StoreProvider(dbStoreProvider);
ICheckpointStore checkpointStore = CheckpointStore.Create(dbStoreProvider);
ICheckpointStore checkpointStore = CheckpointStore.Create(storeProvider);
IMessageStore messageStore = new MessageStore(storeProvider, checkpointStore, TimeSpan.FromSeconds(ttlSecs));
await messageStore.AddEndpoint("module1");
await messageStore.AddEndpoint("module2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Microsoft.Azure.Devices.Edge.Storage.RocksDb
{
using System;
using System.Threading;
using System.Threading.Tasks;
using App.Metrics;
using App.Metrics.Timer;
Expand All @@ -13,108 +14,129 @@ namespace Microsoft.Azure.Devices.Edge.Storage.RocksDb
class ColumnFamilyDbStore : IDbStore
{
readonly IRocksDb db;

public ColumnFamilyDbStore(IRocksDb db, ColumnFamilyHandle handle)
{
this.db = Preconditions.CheckNotNull(db, nameof(db));
this.Handle = Preconditions.CheckNotNull(handle, nameof(handle));
}

internal ColumnFamilyHandle Handle { get; }
internal ColumnFamilyHandle Handle { get; }

public Task Put(byte[] key, byte[] value) => this.Put(key, value, CancellationToken.None);

public Task<Option<byte[]>> Get(byte[] key) => this.Get(key, CancellationToken.None);

public Task Remove(byte[] key) => this.Remove(key, CancellationToken.None);

public Task<bool> Contains(byte[] key) => this.Contains(key, CancellationToken.None);

public Task<Option<(byte[] key, byte[] value)>> GetFirstEntry() => this.GetFirstEntry(CancellationToken.None);

public Task<Option<byte[]>> Get(byte[] key)
public Task<Option<(byte[] key, byte[] value)>> GetLastEntry() => this.GetLastEntry(CancellationToken.None);

public Task IterateBatch(int batchSize, Func<byte[], byte[], Task> perEntityCallback) => this.IterateBatch(batchSize, perEntityCallback, CancellationToken.None);

public Task IterateBatch(byte[] startKey, int batchSize, Func<byte[], byte[], Task> perEntityCallback) => this.IterateBatch(startKey, batchSize, perEntityCallback, CancellationToken.None);

public async Task<Option<byte[]>> Get(byte[] key, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(key, nameof(key));

Option<byte[]> returnValue;
using (Metrics.DbGetLatency("all"))
{
byte[] value = this.db.Get(key, this.Handle);
Func<byte[]> operation = () => this.db.Get(key, this.Handle);
byte[] value = await operation.ExecuteUntilCancelled(cancellationToken);
returnValue = value != null ? Option.Some(value) : Option.None<byte[]>();
}
return Task.FromResult(returnValue);

return returnValue;
}

public Task Put(byte[] key, byte[] value)
public Task Put(byte[] key, byte[] value, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(key, nameof(key));
Preconditions.CheckNotNull(value, nameof(value));

using (Metrics.DbPutLatency("all"))
{
this.db.Put(key, value, this.Handle);
Action operation = () => this.db.Put(key, value, this.Handle);
return operation.ExecuteUntilCancelled(cancellationToken);
}
return Task.CompletedTask;
}

public Task Remove(byte[] key)
public Task Remove(byte[] key, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(key, nameof(key));
this.db.Remove(key, this.Handle);
return Task.CompletedTask;
}
Action operation = () => this.db.Remove(key, this.Handle);
return operation.ExecuteUntilCancelled(cancellationToken);
}

public Task<Option<(byte[] key, byte[] value)>> GetLastEntry()
public async Task<Option<(byte[] key, byte[] value)>> GetLastEntry(CancellationToken cancellationToken)
{
using (Iterator iterator = this.db.NewIterator(this.Handle))
{
iterator.SeekToLast();
Action operation = () => iterator.SeekToLast();
await operation.ExecuteUntilCancelled(cancellationToken);
if (iterator.Valid())
{
byte[] key = iterator.Key();
byte[] value = iterator.Value();
return Task.FromResult(Option.Some((key, value)));
return Option.Some((key, value));
}
else
{
return Task.FromResult(Option.None<(byte[], byte[])>());
return Option.None<(byte[], byte[])>();
}
}
}

public Task<Option<(byte[] key, byte[] value)>> GetFirstEntry()
public async Task<Option<(byte[] key, byte[] value)>> GetFirstEntry(CancellationToken cancellationToken)
{
using (Iterator iterator = this.db.NewIterator(this.Handle))
{
iterator.SeekToFirst();
Action operation = () => iterator.SeekToFirst();
await operation.ExecuteUntilCancelled(cancellationToken);
if (iterator.Valid())
{
byte[] key = iterator.Key();
byte[] value = iterator.Value();
return Task.FromResult(Option.Some((key, value)));
return Option.Some((key, value));
}
else
{
return Task.FromResult(Option.None<(byte[], byte[])>());
return Option.None<(byte[], byte[])>();
}
}
}

public Task<bool> Contains(byte[] key)
public async Task<bool> Contains(byte[] key, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(key, nameof(key));
byte[] value = this.db.Get(key, this.Handle);
return Task.FromResult(value != null);
Func<byte[]> operation = () => this.db.Get(key, this.Handle);
byte[] value = await operation.ExecuteUntilCancelled(cancellationToken);
return value != null;
}

public Task IterateBatch(byte[] startKey, int batchSize, Func<byte[], byte[], Task> callback)
public Task IterateBatch(byte[] startKey, int batchSize, Func<byte[], byte[], Task> callback, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(startKey, nameof(startKey));
Preconditions.CheckRange(batchSize, 1, nameof(batchSize));
Preconditions.CheckNotNull(callback, nameof(callback));

return this.IterateBatch(iterator => iterator.Seek(startKey), batchSize, callback);
return this.IterateBatch(iterator => iterator.Seek(startKey), batchSize, callback, cancellationToken);
}

public Task IterateBatch(int batchSize, Func<byte[], byte[], Task> callback)
public Task IterateBatch(int batchSize, Func<byte[], byte[], Task> callback, CancellationToken cancellationToken)
{
Preconditions.CheckRange(batchSize, 1, nameof(batchSize));
Preconditions.CheckNotNull(callback, nameof(callback));

return this.IterateBatch(iterator => iterator.SeekToFirst(), batchSize, callback);
return this.IterateBatch(iterator => iterator.SeekToFirst(), batchSize, callback, cancellationToken);
}

async Task IterateBatch(Action<Iterator> seeker, int batchSize, Func<byte[], byte[], Task> callback)
async Task IterateBatch(Action<Iterator> seeker, int batchSize, Func<byte[], byte[], Task> callback, CancellationToken cancellationToken)
{
// Use tailing iterator to prevent creating a snapshot.
var readOptions = new ReadOptions();
Expand All @@ -128,7 +150,7 @@ async Task IterateBatch(Action<Iterator> seeker, int batchSize, Func<byte[], byt
byte[] key = iterator.Key();
byte[] value = iterator.Value();
await callback(key, value);
}
}
}
}

Expand All @@ -150,7 +172,7 @@ static class Metrics
RateUnit = TimeUnit.Seconds
};

internal static MetricTags GetTags(string id)
static MetricTags GetTags(string id)
{
return new MetricTags("EndpointId", id);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Edge.Storage.RocksDb
{
using System;
Expand All @@ -21,6 +22,7 @@ sealed class ColumnFamilyStorageRocksDbWrapper : IRocksDb
{
const string DbFolderName = "db";
const string ColumnFamiliesFileName = "columnfamilies";

static readonly DbOptions Options = new DbOptions()
.SetCreateIfMissing()
.SetCreateMissingColumnFamilies();
Expand Down Expand Up @@ -69,7 +71,7 @@ public IEnumerable<string> ListColumnFamilies()
{
return this.columnFamiliesProvider.ListColumnFamilies();
}
}
}

public ColumnFamilyHandle GetColumnFamily(string columnFamilyName) => this.db.GetColumnFamily(columnFamilyName);

Expand All @@ -78,10 +80,10 @@ public ColumnFamilyHandle CreateColumnFamily(ColumnFamilyOptions columnFamilyOpt
lock (ColumnFamiliesLock)
{
this.columnFamiliesProvider.AddColumnFamily(entityName);
ColumnFamilyHandle handle = this.db.CreateColumnFamily(columnFamilyOptions, entityName);
ColumnFamilyHandle handle = this.db.CreateColumnFamily(columnFamilyOptions, entityName);
return handle;
}
}
}

public void DropColumnFamily(string columnFamilyName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public class DbStoreProvider : IDbStoreProvider
this.compactionTimer = new Timer(this.RunCompaction, null, CompactionPeriod, CompactionPeriod);
}

private void RunCompaction(object state)
void RunCompaction(object state)
{
Events.StartingCompaction();
foreach (KeyValuePair<string, IDbStore> entityDbStore in this.entityDbStoreDictionary)
{
if(entityDbStore.Value is ColumnFamilyDbStore cfDbStore)
if (entityDbStore.Value is ColumnFamilyDbStore cfDbStore)
{
Events.CompactingStore(entityDbStore.Key);
this.db.Compact(cfDbStore.Handle);
Expand All @@ -53,6 +53,7 @@ public static DbStoreProvider Create(IRocksDbOptionsProvider optionsProvider, st
var dbStorePartition = new ColumnFamilyDbStore(db, handle);
entityDbStoreDictionary[columnFamilyName] = dbStorePartition;
}

var dbStore = new DbStoreProvider(optionsProvider, db, entityDbStoreDictionary);
return dbStore;
}
Expand All @@ -61,11 +62,12 @@ public IDbStore GetDbStore(string partitionName)
{
Preconditions.CheckNonWhiteSpace(partitionName, nameof(partitionName));
if (!this.entityDbStoreDictionary.TryGetValue(partitionName, out IDbStore entityDbStore))
{
{
ColumnFamilyHandle handle = this.db.CreateColumnFamily(this.optionsProvider.GetColumnFamilyOptions(), partitionName);
entityDbStore = new ColumnFamilyDbStore(this.db, handle);
entityDbStore = this.entityDbStoreDictionary.GetOrAdd(partitionName, entityDbStore);
}

return entityDbStore;
}

Expand Down Expand Up @@ -102,6 +104,7 @@ public void Dispose()
static class Events
{
static readonly ILogger Log = Logger.Factory.CreateLogger<DbStoreProvider>();

// Use an ID not used by other components
const int IdStart = 4000;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Edge.Storage.RocksDb
{
using System;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Edge.Storage.RocksDb
{
using System;
Expand All @@ -14,7 +15,6 @@ namespace Microsoft.Azure.Devices.Edge.Storage.RocksDb
/// </summary>
sealed class RocksDbWrapper : IRocksDb
{

readonly AtomicBoolean isDisposed = new AtomicBoolean(false);
readonly RocksDb db;
readonly string path;
Expand All @@ -27,10 +27,8 @@ sealed class RocksDbWrapper : IRocksDb
this.dbOptions = dbOptions;
}


public static RocksDbWrapper Create(IRocksDbOptionsProvider optionsProvider, string path, IEnumerable<string> partitionsList)
{

Preconditions.CheckNonWhiteSpace(path, nameof(path));
Preconditions.CheckNotNull(optionsProvider, nameof(optionsProvider));
DbOptions dbOptions = Preconditions.CheckNotNull(optionsProvider.GetDbOptions());
Expand Down
Loading

0 comments on commit 0eb279b

Please sign in to comment.