Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/core/Akka.FSharp/Akka.FSharp.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<Compile Include="FsApi.fs" />
<None Include="packages.config" />
<None Include="Akka.FSharp.nuspec" />
<None Include="README.md" />
</ItemGroup>
<ItemGroup>
<Reference Include="FSharp.PowerPack">
Expand Down
113 changes: 90 additions & 23 deletions src/core/Akka.FSharp/FsApi.fs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type IO<'T> =
[<Interface>]
type Actor<'Message> =
inherit ActorRefFactory
inherit ICanWatch

/// <summary>
/// Explicitly retrieves next incoming message from the mailbox.
Expand Down Expand Up @@ -218,21 +219,23 @@ type ActorBuilder() =
open Microsoft.FSharp.Quotations
open Microsoft.FSharp.Linq.QuotationEvaluation

type FunActor<'Message, 'Returned>(actor : Actor<'Message> -> Cont<'Message, 'Returned>) as self =
type FunActor<'Message, 'Returned>(actor : Actor<'Message> -> Cont<'Message, 'Returned>) as this =
inherit Actor()

let mutable state =
let self' = self.Self
let self' = this.Self
let context = UntypedActor.Context :> IActorContext
actor { new Actor<'Message> with
member __.Receive() = Input
member __.Self = self'
member __.Context = context
member __.Sender() = self.Sender()
member __.Unhandled msg = self.Unhandled msg
member __.Sender() = this.Sender()
member __.Unhandled msg = this.Unhandled msg
member __.ActorOf(props, name) = context.ActorOf(props, name)
member __.ActorSelection(path : string) = context.ActorSelection(path)
member __.ActorSelection(path : ActorPath) = context.ActorSelection(path)
member __.Watch(aref:ActorRef) = context.Watch aref
member __.Unwatch(aref:ActorRef) = context.Unwatch aref
member __.Log = lazy (Akka.Event.Logging.GetLogger(context)) }

new(actor : Expr<Actor<'Message> -> Cont<'Message, 'Returned>>) = FunActor(actor.Compile () ())
Expand Down Expand Up @@ -294,10 +297,19 @@ module Serialization =
use stream = new System.IO.MemoryStream(bytes)
fsp.Deserialize(t, stream)

[<RequireQualifiedAccess>]
module Configuration =

/// Parses provided HOCON string into a valid Akka configuration object.
let parse = Akka.Configuration.ConfigurationFactory.ParseString

/// Returns default Akka configuration.
let defaultConfig = Akka.Configuration.ConfigurationFactory.Default

/// Loads Akka configuration from the project's .config file.
let load = Akka.Configuration.ConfigurationFactory.Load

[<RequireQualifiedAccess>]
module Strategy =
/// <summary>
/// Returns a supervisor strategy appliable only to child actor which faulted during execution.
Expand All @@ -316,21 +328,21 @@ module Strategy =
upcast OneForOneStrategy(Nullable retries, Nullable timeout, System.Func<_, _>(decider))

/// <summary>
/// Returns a supervisor strategy appliable only each supervised actor when any of them had faulted during execution.
/// Returns a supervisor strategy appliable to each supervised actor when any of them had faulted during execution.
/// </summary>
/// <param name="decider">Used to determine a actor behavior response depending on exception occurred.</param>
let allForOne (decider : exn -> Directive) : SupervisorStrategy =
upcast AllForOneStrategy(System.Func<_, _>(decider))

/// <summary>
/// Returns a supervisor strategy appliable only each supervised actor when any of them had faulted during execution.
/// Returns a supervisor strategy appliable to each supervised actor when any of them had faulted during execution.
/// </summary>
/// <param name="retries">Defines a number of times, an actor could be restarted. If it's a negative value, there is not limit.</param>
/// <param name="timeout">Defines time window for number of retries to occur.</param>
/// <param name="decider">Used to determine a actor behavior response depending on exception occurred.</param>
let allForOne2 (retries : int) (timeout : TimeSpan) (decider : exn -> Directive) : SupervisorStrategy =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the F# API should use static members with optional arguments in cases like this. (An F# API still looks valuable for it's use of F# function types, and removing the subtyping).

Note that F# API design can and should use members when appropriate, especially for metadata-rich APIs (think of List.map as "low metadata" and FSHarp.Charting or this API as "high metadat"). Indeed there are many elements of the language relevant to API design such as optional arguments which are only supported on members.

upcast AllForOneStrategy(Nullable retries, Nullable timeout, System.Func<_, _>(decider))

module System =
/// Creates an actor system with remote deployment serialization enabled.
let create (name : string) (config : Configuration.Config) : ActorSystem =
Expand All @@ -339,20 +351,7 @@ module System =
system.Serialization.AddSerializer(serializer)
system.Serialization.AddSerializationMap(typeof<Expr>, serializer)
system

type SpawnParams =
{ Deploy : Deploy option
Router : Akka.Routing.RouterConfig option
SupervisorStrategy : SupervisorStrategy option
Dispatcher : string option
Mailbox : string option }
static member empty =
{ Deploy = None
Router = None
SupervisorStrategy = None
Dispatcher = None
Mailbox = None }


type SpawnOption =
| Deploy of Deploy
| Router of Akka.Routing.RouterConfig
Expand Down Expand Up @@ -481,6 +480,74 @@ let asyncReceive (i : Inbox) : Async<'Message option> =
}

/// <summary>
/// Orders inbox to watch an actor targeted by provided <paramref name="actorRef"/>.
/// Orders a <paramref name="watcher"/> to monitor an actor targeted by provided <paramref name="subject"/>.
/// When an actor refered by subject dies, a watcher should receive a <see cref="Terminated"/> message.
/// </summary>
let monitor (subject: ActorRef) (watcher: ICanWatch) : ActorRef = watcher.Watch subject
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following typical F# library designs, I would recommend making "Akka.FSharp" a namespace, and put all these let-bindings in a "AutoOpen" module. This reduces the nesting of types in the compiled IL which can help in quite a number of situations.


/// <summary>
/// Orders a <paramref name="watcher"/> to stop monitoring an actor refered by provided <paramref name="subject"/>.
/// </summary>
let demonitor (subject: ActorRef) (watcher: ICanWatch) : ActorRef = watcher.Unwatch subject

/// <summary>
/// Subscribes an actor reference to target channel of the provided event stream.
/// </summary>
let sub (channel: System.Type) (ref: ActorRef) (eventStream: Akka.Event.EventStream) : bool = eventStream.Subscribe(ref, channel)

/// <summary>
/// Unubscribes an actor reference from target channel of the provided event stream.
/// </summary>
let unsub (channel: System.Type) (ref: ActorRef) (eventStream: Akka.Event.EventStream) : bool = eventStream.Unsubscribe(ref, channel)

/// <summary>
/// Publishes an event on the provided event stream. Event channel is resolved from event's type.
/// </summary>
let pub (event: 'Event) (eventStream: Akka.Event.EventStream) : unit = eventStream.Publish event
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of abbreviation is not normal in F# coding. There are some exceptions - e.g. the F# library function "List.iter", or "obj", but these are rare and in an upstack framework we wouldn't expect to see abbreviations like this. Just use a full name unless there's a really, really good reason not to.


let private taskContinuation (task: System.Threading.Tasks.Task) : unit =
match task.IsFaulted with
| true -> raise task.Exception
| _ -> ()

/// <summary>
/// Schedules a function to be invoked repeatedly in the provided time intervals.
/// </summary>
/// <param name="after">Initial delay to first function call.</param>
/// <param name="every">Interval.</param>
/// <param name="fn">Function called by the scheduler.</param>
/// <param name="scheduler"></param>
let schedule (after: TimeSpan) (every: TimeSpan) (fn: unit -> unit) (scheduler: Scheduler): Async<unit> =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little sceptical about introducing new top-level functions for all of these. I'm not sure the resulting code is sweet enough - using the API from F# should be as natural and clear as from C#, or better.

A set of static members Akka.Schedule etc. may be more appropriate here, or even a wholesale wrapper type around "Scheduler" This applies especially if any arguments are optional.

let action = Action fn
Async.AwaitTask (scheduler.Schedule(after, every, action).ContinueWith taskContinuation)

/// <summary>
/// Schedules a single function call using specified sheduler.
/// </summary>
/// <param name="after">Delay before calling the function.</param>
/// <param name="fn">Function called by the scheduler.</param>
/// <param name="scheduler"></param>
let scheduleOnce (after: TimeSpan) (fn: unit -> unit) (scheduler: Scheduler): Async<unit> =
let action = Action fn
Async.AwaitTask (scheduler.ScheduleOnce(after, action).ContinueWith taskContinuation)

/// <summary>
/// Schedules a <paramref name="message"/> to be sent to the provided <paramref name="receiver"/> in specified time intervals.
/// </summary>
/// <param name="after">Initial delay to first function call.</param>
/// <param name="every">Interval.</param>
/// <param name="message">Message to be sent to the receiver by the scheduler.</param>
/// <param name="receiver">Message receiver.</param>
/// <param name="scheduler"></param>
let scheduleTell (after: TimeSpan) (every: TimeSpan) (message: 'Message) (receiver: ActorRef) (scheduler: Scheduler): Async<unit> =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the fact that in the F# API the different schedule operations have different names, rather than lots of overloading

Async.AwaitTask (scheduler.Schedule(after, every, receiver, message).ContinueWith taskContinuation)

/// <summary>
/// Schedules a single <paramref name="message"/> send to the provided <paramref name="receiver"/>.
/// </summary>
let inline watch (actorRef : ActorRef) (i : Inbox) : unit = i.Watch actorRef
/// <param name="after">Delay before sending a message.</param>
/// <param name="message">Message to be sent to the receiver by the scheduler.</param>
/// <param name="receiver">Message receiver.</param>
/// <param name="scheduler"></param>
let scheduleTellOnce (after: TimeSpan) (message: 'Message) (receiver: ActorRef) (scheduler: Scheduler): Async<unit> =
Async.AwaitTask (scheduler.ScheduleOnce(after, receiver, message).ContinueWith taskContinuation)
Loading