Skip to content

Commit fa5ca50

Browse files
authored
Add failure handling for BatchingSqlJournal SelectCurrentPersistenceIds message batching (#5094)
* Add failure handling for BatchingSqlJournal SelectCurrentPersistenceIds message batching * Do not throw an exception inside a failure handler, throwing will swallow the original failure cause, create a confusing error log, and blow up BatchingSqlJournal failure handling. * Add missing using to make sure reader is properly disposed
1 parent 0210e21 commit fa5ca50

File tree

1 file changed

+14
-4
lines changed

1 file changed

+14
-4
lines changed

src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs

+14-4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using System.Diagnostics;
1414
using System.Linq;
1515
using System.Runtime.CompilerServices;
16+
using System.Runtime.ExceptionServices;
1617
using System.Runtime.Serialization;
1718
using System.Text;
1819
using System.Threading.Tasks;
@@ -800,8 +801,15 @@ private void FailChunkExecution(ChunkExecutionFailure message)
800801
replayAll.ReplyTo.Tell(new EventReplayFailure(cause));
801802
break;
802803

804+
case SelectCurrentPersistenceIds select:
805+
// SqlJournal handled this failure case by using the default PipeTo failure
806+
// handler which sends a Status.Failure message back to the sender.
807+
select.ReplyTo.Tell(new Status.Failure(cause));
808+
break;
809+
803810
default:
804-
throw new Exception($"Unknown persistence journal request type [{request.GetType()}]");
811+
Log.Error(cause, $"Batching failure not reported to original sender. Unknown batched persistence journal request type [{request.GetType()}].");
812+
break;
805813
}
806814
}
807815

@@ -1096,10 +1104,12 @@ protected virtual async Task HandleSelectCurrentPersistenceIds(SelectCurrentPers
10961104
command.Parameters.Clear();
10971105
AddParameter(command, "@Ordering", DbType.Int64, message.Offset);
10981106

1099-
var reader = await command.ExecuteReaderAsync();
1100-
while (await reader.ReadAsync())
1107+
using (var reader = await command.ExecuteReaderAsync())
11011108
{
1102-
result.Add(reader.GetString(0));
1109+
while (await reader.ReadAsync())
1110+
{
1111+
result.Add(reader.GetString(0));
1112+
}
11031113
}
11041114

11051115
message.ReplyTo.Tell(new CurrentPersistenceIds(result, highestOrderingNumber));

0 commit comments

Comments
 (0)