Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<MongoDbVersion>3.0.0</MongoDbVersion>
<AkkaVersion>1.5.40</AkkaVersion>
<AkkaHostingVersion>1.5.40</AkkaHostingVersion>
<AkkaVersion>1.5.42</AkkaVersion>
<AkkaHostingVersion>1.5.42</AkkaHostingVersion>
</PropertyGroup>
<!-- App dependencies -->
<ItemGroup>
Expand Down
19 changes: 11 additions & 8 deletions src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ await _metadataCollection_DoNotUseDirectly.Indexes
return _metadataCollection_DoNotUseDirectly;
}

private CancellationTokenSource CreatePerCallCts()
private CancellationTokenSource CreatePerCallCts(CancellationToken? token = null)
{
var unitedCts = CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token);
var unitedCts = token is null
? CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token)
: CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token, token.Value);
unitedCts.CancelAfter(_settings.CallTimeout);
return unitedCts;
}
Expand Down Expand Up @@ -295,10 +297,11 @@ await cursor.ForEachAsync(entry =>
/// </summary>
/// <param name="persistenceId">TBD</param>
/// <param name="fromSequenceNr">TBD</param>
/// <param name="cancellationToken">The cancellation token</param>
/// <returns>long</returns>
public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr)
public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken)
{
using var unitedCts = CreatePerCallCts();
using var unitedCts = CreatePerCallCts(cancellationToken);
var token = unitedCts.Token;

return await MaybeReadWithTransaction(
Expand Down Expand Up @@ -331,7 +334,7 @@ private async Task<long> ReadHighestSequenceNrOperation(
return Math.Max(journalHighestSequenceNr, metadataHighestSequenceNr);
}

protected override async Task<IImmutableList<Exception?>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
protected override async Task<IImmutableList<Exception?>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken)
{
var writeMessages = messages.Select(message => ((IImmutableList<IPersistentRepresentation>)message.Payload)
.Select(ToJournalEntry).ToArray()
Expand All @@ -341,7 +344,7 @@ private async Task<long> ReadHighestSequenceNrOperation(
if(writeMessages.Length == 0)
return ImmutableList<Exception?>.Empty;

using var unitedCts = CreatePerCallCts();
using var unitedCts = CreatePerCallCts(cancellationToken);
var journalCollection = await GetJournalCollection(unitedCts.Token);

if (writeMessages.Length == 1 && writeMessages[0].Length == 1)
Expand Down Expand Up @@ -397,9 +400,9 @@ private async Task<long> ReadHighestSequenceNrOperation(
}, unitedCts.Token);
}

protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken)
{
using var unitedCts = CreatePerCallCts();
using var unitedCts = CreatePerCallCts(cancellationToken);
var journalCollection = await GetJournalCollection(unitedCts.Token);
var metadataCollection = await GetMetadataCollection(unitedCts.Token);

Expand Down
22 changes: 12 additions & 10 deletions src/Akka.Persistence.MongoDb/Snapshot/MongoDbGridFSSnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ public MongoDbGridFsSnapshotStore(MongoDbSnapshotSettings settings)
};
}

private CancellationTokenSource CreatePerCallCts()
private CancellationTokenSource CreatePerCallCts(CancellationToken? token = null)
{
var unitedCts = CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token);
var unitedCts = token is null
? CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token)
: CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token, token.Value);
unitedCts.CancelAfter(_settings.CallTimeout);
return unitedCts;
}
Expand Down Expand Up @@ -108,9 +110,9 @@ protected override void PostStop()
base.PostStop();
}

protected override async Task<SelectedSnapshot?> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task<SelectedSnapshot?> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken)
{
using var unitedCts = CreatePerCallCts();
using var unitedCts = CreatePerCallCts(cancellationToken);
var token = unitedCts.Token;

var filter = CreateRangeFilter(persistenceId, criteria);
Expand All @@ -129,9 +131,9 @@ protected override void PostStop()
return ToSelectedSnapshot(info.Metadata, data);
}

protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot)
protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken)
{
using var unitedCts = CreatePerCallCts();
using var unitedCts = CreatePerCallCts(cancellationToken);
var token = unitedCts.Token;

var (fileName, option, bytes) = ToSnapshotFileMetadata(metadata, snapshot);
Expand All @@ -149,7 +151,7 @@ protected override async Task SaveAsync(SnapshotMetadata metadata, object snapsh
await bucket.UploadFromBytesAsync(fileName, bytes, option, token);
}

protected override async Task DeleteAsync(SnapshotMetadata metadata)
protected override async Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken)
{
var builder = Builders<GridFSFileInfo>.Filter;
var filters = new List<FilterDefinition<GridFSFileInfo>>
Expand All @@ -165,13 +167,13 @@ protected override async Task DeleteAsync(SnapshotMetadata metadata)

var filter = builder.And(filters);

using var unitedCts = CreatePerCallCts();
using var unitedCts = CreatePerCallCts(cancellationToken);
await DeleteFileAsync(filter, GetFilesCollection(), GetGridFSBucket(), unitedCts.Token);
}

protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken)
{
using var unitedCts = CreatePerCallCts();
using var unitedCts = CreatePerCallCts(cancellationToken);
var token = unitedCts.Token;

var filter = CreateRangeFilter(persistenceId, criteria);
Expand Down
23 changes: 12 additions & 11 deletions src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ public MongoDbSnapshotStore(MongoDbSnapshotSettings settings)
_serialization = Context.System.Serialization;
}

private CancellationTokenSource CreatePerCallCts()
private CancellationTokenSource CreatePerCallCts(CancellationToken? token = null)
{
var unitedCts =
CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token);
var unitedCts = token is null
? CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token)
: CancellationTokenSource.CreateLinkedTokenSource(_pendingCommandsCancellation.Token, token.Value);
unitedCts.CancelAfter(_settings.CallTimeout);
return unitedCts;
}
Expand Down Expand Up @@ -135,9 +136,9 @@ protected override void PostStop()
base.PostStop();
}

protected override async Task<SelectedSnapshot?> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task<SelectedSnapshot?> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken)
{
using var unitedCts = CreatePerCallCts();
using var unitedCts = CreatePerCallCts(cancellationToken);
var snapshotCollection = await GetSnapshotCollection(unitedCts.Token);

return await MaybeReadWithTransaction(async (session, token) =>
Expand All @@ -152,9 +153,9 @@ protected override void PostStop()
}, unitedCts.Token);
}

protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot)
protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot, CancellationToken cancellationToken)
{
using var unitedCts = CreatePerCallCts();
using var unitedCts = CreatePerCallCts(cancellationToken);
var snapshotCollection = await GetSnapshotCollection(unitedCts.Token);

var snapshotEntry = ToSnapshotEntry(metadata, snapshot);
Expand All @@ -170,9 +171,9 @@ await snapshotCollection.ReplaceOneAsync(
cancellationToken: unitedCts.Token);
}

protected override async Task DeleteAsync(SnapshotMetadata metadata)
protected override async Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken)
{
using var unitedCts = CreatePerCallCts();
using var unitedCts = CreatePerCallCts(cancellationToken);
var snapshotCollection = await GetSnapshotCollection(unitedCts.Token);

var builder = Builders<SnapshotEntry>.Filter;
Expand All @@ -191,9 +192,9 @@ protected override async Task DeleteAsync(SnapshotMetadata metadata)
await snapshotCollection.FindOneAndDeleteAsync(filter, cancellationToken: unitedCts.Token);
}

protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria, CancellationToken cancellationToken)
{
using var unitedCts = CreatePerCallCts();
using var unitedCts = CreatePerCallCts(cancellationToken);
var snapshotCollection = await GetSnapshotCollection(unitedCts.Token);

await MaybeWriteWithTransaction(async (session, token) =>
Expand Down
Loading