Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to specify scheduler implementation in configuration #994

Merged
merged 9 commits into from
May 26, 2015
48 changes: 48 additions & 0 deletions src/core/Akka.Tests/Actor/ActorSystemSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ public void AnActorSystem_Must_Support_Dynamically_Registered_Extensions()
Assert.Equal(Sys, otherTestExtension.System);
}

[Fact]
public void AnActorSystem_Must_Setup_The_Default_Scheduler()
{
Assert.True(Sys.Scheduler.GetType() == typeof(DedicatedThreadScheduler));
}

[Fact]
public void AnActorSystem_Must_Support_Using_A_Customer_Scheduler()
{
var actorSystem = ActorSystem.Create(Guid.NewGuid().ToString(), DefaultConfig.WithFallback("akka.scheduler.implementation = \"Akka.Tests.Actor.TestScheduler, Akka.Tests\""));
Assert.True(actorSystem.Scheduler.GetType() == typeof(TestScheduler));
}

#endregion
}

Expand Down Expand Up @@ -127,5 +140,40 @@ public TestExtensionImpl(ActorSystem system)

public ActorSystem System { get; private set; }
}

public class TestScheduler : IScheduler
{
public TestScheduler(ActorSystem system)
{

}

public void ScheduleTellOnce(TimeSpan delay, ICanTell receiver, object message, IActorRef sender)
{
throw new NotImplementedException();
}

public void ScheduleTellOnce(TimeSpan delay, ICanTell receiver, object message, IActorRef sender, ICancelable cancelable)
{
throw new NotImplementedException();
}

public void ScheduleTellRepeatedly(TimeSpan initialDelay, TimeSpan interval, ICanTell receiver, object message,
IActorRef sender)
{
throw new NotImplementedException();
}

public void ScheduleTellRepeatedly(TimeSpan initialDelay, TimeSpan interval, ICanTell receiver, object message,
IActorRef sender, ICancelable cancelable)
{
throw new NotImplementedException();
}

public DateTimeOffset Now { get; private set; }
public TimeSpan MonotonicClock { get; private set; }
public TimeSpan HighResMonotonicClock { get; private set; }
public IAdvancedScheduler Advanced { get; private set; }
}
}

5 changes: 3 additions & 2 deletions src/core/Akka/Actor/Internals/ActorSystemImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public ActorSystemImpl(string name, Config config)

_name = name;
ConfigureSettings(config);
ConfigureScheduler();
ConfigureEventStream();
ConfigureProvider();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the problem, the scheduler needs to be configured after the provider so the dedicated thread scheduler can access the provider

ConfigureScheduler();
ConfigureSerialization();
ConfigureMailboxes();
ConfigureDispatchers();
Expand Down Expand Up @@ -127,7 +127,8 @@ public override ActorSelection ActorSelection(string actorPath)

private void ConfigureScheduler()
{
_scheduler = new DedicatedThreadScheduler(this);;// new TaskBasedScheduler();
var schedulerType = Type.GetType(_settings.SchedulerClass, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw/log if scheduler type resolves to null as this would break the entire system

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm @rogeralsing the logger is not initialized until the system is Started, and that is too late to be able to log the scheduler failure.

It will throw if it cannot create the scheduler so the system should not start, it will just not log.

_scheduler = (IScheduler) Activator.CreateInstance(schedulerType, this);
}

/// <summary>
Expand Down
8 changes: 1 addition & 7 deletions src/core/Akka/Actor/Scheduler/SchedulerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ void ITellScheduler.ScheduleTellOnce(TimeSpan delay, ICanTell receiver, object m
{
ValidateDelay(delay, "delay");
InternalScheduleTellOnce(delay, receiver, message, sender, cancelable);

}

void ITellScheduler.ScheduleTellRepeatedly(TimeSpan initialDelay, TimeSpan interval, ICanTell receiver, object message, IActorRef sender)
Expand All @@ -38,7 +37,6 @@ void ITellScheduler.ScheduleTellRepeatedly(TimeSpan initialDelay, TimeSpan inter
InternalScheduleTellRepeatedly(initialDelay, interval, receiver, message, sender, cancelable);
}


void IActionScheduler.ScheduleOnce(TimeSpan delay, Action action)
{
ValidateDelay(delay, "delay");
Expand Down Expand Up @@ -66,22 +64,18 @@ void IActionScheduler.ScheduleRepeatedly(TimeSpan initialDelay, TimeSpan interva
}

IAdvancedScheduler IScheduler.Advanced { get { return this; } }

DateTimeOffset ITimeProvider.Now { get { return TimeNow; } }



protected abstract DateTimeOffset TimeNow { get; }
public abstract TimeSpan MonotonicClock { get; }
public abstract TimeSpan HighResMonotonicClock { get; }

protected abstract void InternalScheduleTellOnce(TimeSpan delay, ICanTell receiver, object message, IActorRef sender, ICancelable cancelable);

protected abstract void InternalScheduleTellRepeatedly(TimeSpan initialDelay, TimeSpan interval, ICanTell receiver, object message, IActorRef sender, ICancelable cancelable);

protected abstract void InternalScheduleOnce(TimeSpan delay, Action action, ICancelable cancelable);
protected abstract void InternalScheduleRepeatedly(TimeSpan initialDelay, TimeSpan interval, Action action, ICancelable cancelable);


protected static void ValidateInterval(TimeSpan interval, string parameterName)
{
if(interval <= TimeSpan.Zero)
Expand Down
7 changes: 7 additions & 0 deletions src/core/Akka/Actor/Settings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public Settings(ActorSystem system, Config config)
DebugRouterMisconfiguration = Config.GetBoolean("akka.actor.debug.router-misconfiguration");
Home = Config.GetString("akka.home") ?? "";
DefaultVirtualNodesFactor = Config.GetInt("akka.actor.deployment.default.virtual-nodes-factor");

SchedulerClass = Config.GetString("akka.scheduler.implementation");
//TODO: dunno.. we dont have FiniteStateMachines, dont know what the rest is
/*
final val SchedulerClass: String = getString("akka.scheduler.implementation")
Expand Down Expand Up @@ -260,6 +262,11 @@ public Settings(ActorSystem system, Config config)
/// </summary>
public int DefaultVirtualNodesFactor { get; private set; }

/// <summary>
/// Gets the scheduler implementation used by this system.
/// </summary>
public string SchedulerClass { get; private set; }

/// <summary>
/// Returns a <see cref="string" /> that represents this instance.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Configuration/Pigeon.conf
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ akka {
# 1) com.typesafe.config.Config
# 2) akka.event.LoggingAdapter
# 3) java.util.concurrent.ThreadFactory
implementation = akka.actor.LightArrayRevolverScheduler
implementation = "Akka.Actor.DedicatedThreadScheduler"

# When shutting down the scheduler, there will typically be a thread which
# needs to be stopped, and this timeout determines how long to wait for
Expand Down