Skip to content

STE: Provide indicators of progress #517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/DotNetty.Common/Concurrency/AbstractEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DotNetty.Common.Concurrency
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Internal.Logging;
Expand Down Expand Up @@ -46,6 +47,11 @@ protected AbstractEventExecutor(IEventExecutorGroup parent)
/// <inheritdoc cref="IEventExecutor"/>
public bool InEventLoop => this.IsInEventLoop(Thread.CurrentThread);

/// <inheritdoc cref="IEventExecutor" />
public IEnumerable<IEventExecutor> Items => this.GetItems();

protected abstract IEnumerable<IEventExecutor> GetItems();

/// <inheritdoc cref="IEventExecutor"/>
public abstract bool IsInEventLoop(Thread thread);

Expand Down
5 changes: 5 additions & 0 deletions src/DotNetty.Common/Concurrency/AbstractEventExecutorGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DotNetty.Common.Concurrency
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -20,6 +21,8 @@ public abstract class AbstractEventExecutorGroup : IEventExecutorGroup

public abstract Task TerminationCompletion { get; }

public IEnumerable<IEventExecutor> Items => this.GetItems();

public abstract IEventExecutor GetNext();

public void Execute(IRunnable task) => this.GetNext().Execute(task);
Expand Down Expand Up @@ -65,5 +68,7 @@ public abstract class AbstractEventExecutorGroup : IEventExecutorGroup
public Task ShutdownGracefullyAsync() => this.ShutdownGracefullyAsync(DefaultShutdownQuietPeriod, DefaultShutdownTimeout);

public abstract Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout);

protected abstract IEnumerable<IEventExecutor> GetItems();
}
}
6 changes: 6 additions & 0 deletions src/DotNetty.Common/Concurrency/IEventExecutorGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
namespace DotNetty.Common.Concurrency
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>
/// Provides an access to a set of <see cref="IEventExecutor"/>s it manages.
/// </summary>
public interface IEventExecutorGroup : IScheduledExecutorService
{
/// <summary>
/// Returns list of owned event executors.
/// </summary>
IEnumerable<IEventExecutor> Items { get; }

/// <summary>
/// Returns <c>true</c> if and only if this executor is being shut down via <see cref="ShutdownGracefullyAsync()" />.
/// </summary>
Expand Down
51 changes: 35 additions & 16 deletions src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
PreciseTimeSpan gracefulShutdownQuietPeriod;
PreciseTimeSpan gracefulShutdownTimeout;
readonly ISet<Action> shutdownHooks = new HashSet<Action>();
long progress;

/// <summary>Creates a new instance of <see cref="SingleThreadEventExecutor"/>.</summary>
public SingleThreadEventExecutor(string threadName, TimeSpan breakoutInterval)
Expand Down Expand Up @@ -86,6 +87,21 @@ protected SingleThreadEventExecutor(IEventExecutorGroup parent, string threadNam
/// </summary>
public TaskScheduler Scheduler => this.scheduler;

/// <summary>
/// Allows to track whether executor is progressing through its backlog. Useful for diagnosing / mitigating stalls due to blocking calls in conjunction with IsBacklogEmpty property.
/// </summary>
public long Progress => Volatile.Read(ref this.progress);

/// <summary>
/// Indicates whether executor's backlog is empty. Useful for diagnosing / mitigating stalls due to blocking calls in conjunction with Progress property.
/// </summary>
public bool IsBacklogEmpty => this.taskQueue.IsEmpty;

/// <summary>
/// Gets length of backlog of tasks queued for immediate execution.
/// </summary>
public int BacklogLength => this.taskQueue.Count;

void Loop()
{
this.SetCurrentExecutor(this);
Expand Down Expand Up @@ -140,6 +156,8 @@ public override void Execute(IRunnable task)
}
}

protected override IEnumerable<IEventExecutor> GetItems() => new[] { this };

protected void WakeUp(bool inEventLoop)
{
if (!inEventLoop || (this.executionState == ST_SHUTTING_DOWN))
Expand All @@ -152,12 +170,12 @@ protected void WakeUp(bool inEventLoop)
/// Adds an <see cref="Action"/> which will be executed on shutdown of this instance.
/// </summary>
/// <param name="action">The <see cref="Action"/> to run on shutdown.</param>
public void AddShutdownHook(Action action)
public void AddShutdownHook(Action action)
{
if (this.InEventLoop)
if (this.InEventLoop)
{
this.shutdownHooks.Add(action);
}
}
else
{
this.Execute(() => this.shutdownHooks.Add(action));
Expand All @@ -169,53 +187,53 @@ public void AddShutdownHook(Action action)
/// executed on shutdown of this instance.
/// </summary>
/// <param name="action">The <see cref="Action"/> to remove.</param>
public void RemoveShutdownHook(Action action)
public void RemoveShutdownHook(Action action)
{
if (this.InEventLoop)
if (this.InEventLoop)
{
this.shutdownHooks.Remove(action);
}
}
else
{
this.Execute(() => this.shutdownHooks.Remove(action));
}
}

bool RunShutdownHooks()
bool RunShutdownHooks()
{
bool ran = false;

// Note shutdown hooks can add / remove shutdown hooks.
while (this.shutdownHooks.Count > 0)
while (this.shutdownHooks.Count > 0)
{
var copy = this.shutdownHooks.ToArray();
this.shutdownHooks.Clear();

for (var i = 0; i < copy.Length; i++)
{
try
try
{
copy[i]();
}
catch (Exception ex)
}
catch (Exception ex)
{
Logger.Warn("Shutdown hook raised an exception.", ex);
}
finally
}
finally
{
ran = true;
}
}
}

if (ran)
if (ran)
{
this.lastExecutionTime = PreciseTimeSpan.FromStart;
}

return ran;
}


/// <inheritdoc cref="IEventExecutor"/>
public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
Expand Down Expand Up @@ -398,6 +416,7 @@ protected bool RunAllTasks()

while (true)
{
Volatile.Write(ref this.progress, this.progress + 1); // volatile write is enough as this is the only thread ever writing
SafeExecute(task);
task = this.PollTask();
if (task == null)
Expand Down
3 changes: 3 additions & 0 deletions src/DotNetty.Transport.Libuv/DispatcherEventLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -69,5 +70,7 @@ internal void Dispatch(NativeHandle handle)
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);

public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;

public new IEnumerable<IEventLoop> Items => new[] { this };
}
}
5 changes: 5 additions & 0 deletions src/DotNetty.Transport.Libuv/DispatcherEventLoopGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetty.Common.Concurrency;
using DotNetty.Transport.Channels;
Expand All @@ -29,6 +30,10 @@ public DispatcherEventLoopGroup()

internal DispatcherEventLoop Dispatcher => this.dispatcherEventLoop;

protected override IEnumerable<IEventExecutor> GetItems() => new[] { this.dispatcherEventLoop };

public new IEnumerable<IEventLoop> Items => new[] { this.dispatcherEventLoop };

IEventLoop IEventLoopGroup.GetNext() => (IEventLoop)this.GetNext();

public override IEventExecutor GetNext() => this.dispatcherEventLoop;
Expand Down
3 changes: 3 additions & 0 deletions src/DotNetty.Transport.Libuv/EventLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace DotNetty.Transport.Libuv
{
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetty.Transport.Channels;

Expand All @@ -19,5 +20,7 @@ public EventLoop(IEventLoopGroup parent, string threadName)
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);

public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;

public new IEnumerable<IEventLoop> Items => new[] { this };
}
}
5 changes: 5 additions & 0 deletions src/DotNetty.Transport.Libuv/EventLoopGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -27,6 +28,8 @@ public sealed class EventLoopGroup : AbstractEventExecutorGroup, IEventLoopGroup

public override Task TerminationCompletion { get; }

public new IEnumerable<IEventLoop> Items => this.eventLoops;

public EventLoopGroup()
: this(DefaultEventLoopCount)
{
Expand Down Expand Up @@ -119,5 +122,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
}
return this.TerminationCompletion;
}

protected override IEnumerable<IEventExecutor> GetItems() => this.eventLoops;
}
}
10 changes: 6 additions & 4 deletions src/DotNetty.Transport.Libuv/LoopExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
Expand All @@ -17,7 +18,6 @@ namespace DotNetty.Transport.Libuv
using System.Threading;
using DotNetty.Common;
using DotNetty.Transport.Libuv.Native;

using Timer = Native.Timer;

class LoopExecutor : AbstractScheduledEventExecutor
Expand Down Expand Up @@ -297,7 +297,7 @@ void RunAllTasks(long timeout)
long runTasks = 0;
long executionTime;
this.wakeUp = false;
for (;;)
for (; ; )
{
SafeExecute(task);

Expand Down Expand Up @@ -402,7 +402,7 @@ static bool RunAllTasksFrom(IQueue<IRunnable> taskQueue)
{
return false;
}
for (;;)
for (; ; )
{
SafeExecute(task);
task = PollTaskFrom(taskQueue);
Expand Down Expand Up @@ -488,7 +488,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
bool inEventLoop = this.InEventLoop;
bool wakeUpLoop;
int oldState;
for (;;)
for (; ; )
{
if (this.IsShuttingDown)
{
Expand Down Expand Up @@ -540,5 +540,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time

return this.TerminationCompletion;
}

protected override IEnumerable<IEventExecutor> GetItems() => new[] { this };
}
}
3 changes: 3 additions & 0 deletions src/DotNetty.Transport.Libuv/WorkerEventLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
Expand Down Expand Up @@ -112,6 +113,8 @@ void OnRead(Pipe handle, int status)

public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;

IEnumerable<IEventLoop> IEventLoopGroup.Items => new[] { this };

sealed class PipeConnect : ConnectRequest
{
const int MaximumRetryCount = 10;
Expand Down
5 changes: 5 additions & 0 deletions src/DotNetty.Transport.Libuv/WorkerEventLoopGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Linq;
Expand Down Expand Up @@ -84,6 +85,8 @@ public WorkerEventLoopGroup(DispatcherEventLoopGroup eventLoopGroup, int eventLo

internal string PipeName { get; }

IEnumerable<IEventLoop> IEventLoopGroup.Items => this.eventLoops;

internal void Accept(NativeHandle handle)
{
Debug.Assert(this.dispatcherLoop != null);
Expand Down Expand Up @@ -126,5 +129,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
}
return this.TerminationCompletion;
}

protected override IEnumerable<IEventExecutor> GetItems() => this.eventLoops;
}
}
5 changes: 5 additions & 0 deletions src/DotNetty.Transport/Channels/AffinitizedEventLoopGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DotNetty.Transport.Channels
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetty.Common.Concurrency;

Expand All @@ -23,6 +24,10 @@ public class AffinitizedEventLoopGroup : AbstractEventExecutorGroup, IEventLoopG
/// <inheritdoc cref="IEventExecutorGroup"/>
public override Task TerminationCompletion => this.innerGroup.TerminationCompletion;

protected override IEnumerable<IEventExecutor> GetItems() => this.innerGroup.Items;

public new IEnumerable<IEventLoop> Items => ((IEventLoopGroup)this.innerGroup).Items;

/// <summary>
/// Creates a new instance of <see cref="AffinitizedEventLoopGroup"/>.
/// </summary>
Expand Down
Loading