Skip to content

Commit

Permalink
Add support for .net core and better thread syncronization.
Browse files Browse the repository at this point in the history
  • Loading branch information
saleem-mirza committed May 29, 2017
1 parent eeb9b46 commit e77eae1
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 48 deletions.
10 changes: 5 additions & 5 deletions src/Serilog.Sinks.SQLite/Serilog.Sinks.SQLite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@
<PackageProjectUrl>http://serilog.net</PackageProjectUrl>
<PackageLicenseUrl>http://www.apache.org/licenses/LICENSE-2.0</PackageLicenseUrl>
<Copyright>Copyright © Zethian Inc. 2013-2017</Copyright>
<AssemblyVersion>3.8.3.0</AssemblyVersion>
<Version>3.8.3</Version>
<AssemblyVersion>3.8.5.0</AssemblyVersion>
<Version>3.8.5</Version>
<SignAssembly>True</SignAssembly>
<AssemblyOriginatorKeyFile>Serilog.snk</AssemblyOriginatorKeyFile>
<TargetFrameworks>netstandard1.6;net451</TargetFrameworks>
<FileVersion>3.8.3.0</FileVersion>
<TargetFrameworks>netcoreapp1.0;net451</TargetFrameworks>
<FileVersion>3.8.5.0</FileVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(VersionSuffix)'!='' ">
<Version>$(Version)-$(VersionSuffix)</Version>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Data.Sqlite" Version="1.1.0" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="1.1.1" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" />
<PackageReference Include="Serilog" Version="2.4.0" />
<PackageReference Include="SQLite" Version="3.13.0" />
Expand Down
121 changes: 78 additions & 43 deletions src/Serilog.Sinks.SQLite/Sinks/Batch/BatchProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,29 @@ internal abstract class BatchProvider : IDisposable
{
private readonly uint _batchSize;
private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();
private readonly List<LogEvent> _logEventBatch;
private readonly BlockingCollection<IList<LogEvent>> _messageQueue;
private readonly CancellationTokenSource _eventCancellationToken = new CancellationTokenSource();
private readonly ConcurrentQueue<LogEvent> _logEventBatch;
private readonly BlockingCollection<IList<LogEvent>> _batchEventsCollection;
private readonly BlockingCollection<LogEvent> _eventsCollection;
private readonly TimeSpan _thresholdTimeSpan = TimeSpan.FromSeconds(10);
private readonly AutoResetEvent _timerResetEvent = new AutoResetEvent(false);
private readonly Task _timerTask;
private readonly Task _batchTask;
private readonly Task _eventPumpTask;
private readonly Task _cleanupTask;
private readonly List<Task> _workerTasks = new List<Task>();

private bool _canStop;

protected BatchProvider(uint batchSize = 100, int nThreads = 1)
{
_batchSize = batchSize;
_logEventBatch = new List<LogEvent>();
_messageQueue = new BlockingCollection<IList<LogEvent>>();
_batchSize = batchSize == 0 ? 1: batchSize;
_logEventBatch = new ConcurrentQueue<LogEvent>();
_batchEventsCollection = new BlockingCollection<IList<LogEvent>>();
_eventsCollection = new BlockingCollection<LogEvent>();

_eventPumpTask = Task.Factory.StartNew(Pump, TaskCreationOptions.LongRunning);
_batchTask = Task.Factory.StartNew(Pump, TaskCreationOptions.LongRunning);
_timerTask = Task.Factory.StartNew(TimerPump, TaskCreationOptions.LongRunning);
_cleanupTask = new Task(() =>
{
_canStop = true;
_timerResetEvent.Set();
_timerTask.Wait();

IList<LogEvent> eventBatch;
while (_messageQueue.TryTake(out eventBatch))
WriteLogEvent(eventBatch);

FlushLogEventBatch();
});
_eventPumpTask = Task.Factory.StartNew(EventPump, TaskCreationOptions.LongRunning);
}

private void Pump()
Expand All @@ -66,7 +58,7 @@ private void Pump()
{
while (true)
{
var logEvents = _messageQueue.Take(_cancellationToken.Token);
var logEvents = _batchEventsCollection.Take(_cancellationToken.Token);
var task = Task.Factory.StartNew((x) => { WriteLogEvent(x as IList<LogEvent>); }, logEvents);

_workerTasks.Add(task);
Expand All @@ -78,7 +70,7 @@ private void Pump()
}
catch (OperationCanceledException)
{
FluchAndCloseEvents();
SelfLog.WriteLine("Shutting down batch processing");
}
catch (Exception e)
{
Expand All @@ -90,30 +82,59 @@ private void TimerPump()
{
while (!_canStop)
{
_timerResetEvent.WaitOne(_thresholdTimeSpan);
_timerResetEvent.WaitOne(_thresholdTimeSpan);
FlushLogEventBatch();
}
}

private void EventPump()
{
try
{
while (true)
{
var logEvent = _eventsCollection.Take(_eventCancellationToken.Token);
_logEventBatch.Enqueue(logEvent);

if(_logEventBatch.Count >= _batchSize)
{
FlushLogEventBatch();
}
}
}
catch (OperationCanceledException)
{
SelfLog.WriteLine("Shutting down event pump");
}
catch (Exception e)
{
SelfLog.WriteLine(e.Message);
}
}

private void FlushLogEventBatch()
{
if (!_logEventBatch.Any())
{
return;
}

lock (this)
var logEventBatchSize = _logEventBatch.Count >= _batchSize ? (int)_batchSize : _logEventBatch.Count;
var logEventList = new List<LogEvent>();

for (int i = 0; i < logEventBatchSize; i++)
{
_messageQueue.Add(_logEventBatch.ToArray());
_logEventBatch.Clear();
if(_logEventBatch.TryDequeue(out LogEvent logEvent))
{
logEventList.Add(logEvent);
}
}
_batchEventsCollection.Add(logEventList);
}

protected void PushEvent(LogEvent logEvent)
{
_logEventBatch.Add(logEvent);
if (_logEventBatch.Count >= _batchSize)
FlushLogEventBatch();
_eventsCollection.Add(logEvent);
}

protected abstract void WriteLogEvent(ICollection<LogEvent> logEventsBatch);
Expand All @@ -128,12 +149,7 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
SelfLog.WriteLine("Halting sink...");
_cancellationToken.Cancel();

Task.WaitAll(_workerTasks.ToArray());

FluchAndCloseEvents();
FluchAndCloseEventHandlers();

SelfLog.WriteLine("Sink halted successfully.");
}
Expand All @@ -142,19 +158,38 @@ protected virtual void Dispose(bool disposing)
}
}

private void FluchAndCloseEvents()
private void FluchAndCloseEventHandlers()
{
if (_cleanupTask.Status != TaskStatus.Created)
{
return;
}

try
{
_cleanupTask.RunSynchronously();
_cleanupTask.Wait(TimeSpan.FromSeconds(30));
SelfLog.WriteLine("Halting sink...");

_eventCancellationToken.Cancel();
_cancellationToken.Cancel();

Task.WaitAll(_workerTasks.ToArray());

_canStop = true;
_timerResetEvent.Set();

// Flush events collection
while (_eventsCollection.TryTake(out LogEvent logEvent))
{
_logEventBatch.Enqueue(logEvent);

if (_logEventBatch.Count >= _batchSize)
{
FlushLogEventBatch();
}
}

FlushLogEventBatch();

// Flush events batch
while (_batchEventsCollection.TryTake(out IList<LogEvent> eventBatch))
WriteLogEvent(eventBatch);

Task.WaitAll(new[] {_eventPumpTask, _timerTask}, TimeSpan.FromSeconds(30));
Task.WaitAll(new[] { _eventPumpTask, _batchTask, _timerTask}, TimeSpan.FromSeconds(30));
}
catch (Exception ex)
{
Expand Down

0 comments on commit e77eae1

Please sign in to comment.