8 people like it.

Alternative Actor Implementation for Android

Actor implementation (much simplified) intended for use on mobile devices. Seems to provide better memory usage behavior than native F# Mailbox Processors (MPB) (on Android). Though is not as 'smooth' as native F# MBP. Smoothness here refers to how processing is balanced between consumers and producers (most relevant to single core machines).

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
72: 
73: 
74: 
75: 
76: 
77: 
78: 
79: 
80: 
81: 
82: 
83: 
84: 
85: 
86: 
87: 
88: 
89: 
90: 
91: 
92: 
namespace ActorModel
open System.Threading
open System

type ActorReplyChannel<'Reply>(replyf: 'Reply->unit) =
    member x.Reply(reply) = replyf(reply)

type ActorMailbox<'a > () =
    let queue = System.Collections.Generic.Queue<'a>()
    let gotMsg = new AutoResetEvent(false)
    let mutable currentLength = 0

    let incr() = Interlocked.Increment(&currentLength) |> ignore
    let decr() = Interlocked.Decrement(&currentLength) |> ignore

    let dequeue() = 
        let v = lock queue (fun () -> queue.Dequeue())
        decr()
        v

    let enqueue v = 
        lock queue (fun () -> queue.Enqueue(v))
        incr()
        gotMsg.Set() |> ignore
        let sp = new System.Threading.SpinWait()
        sp.SpinOnce()
          
    member x.Receive() =
        let rec loop() = 
            async {
                let sp = new SpinWait()
                sp.SpinOnce()
                if currentLength > 0 then
                    let v = dequeue()
                    return v
                else
                    let! b = Async.AwaitWaitHandle gotMsg
                    return! loop()}
        loop()

    member x.Post msg = enqueue msg

    member x.CurrentQueueLength = currentLength

    member x.PostAndReply (mConstructor, ?timeout:int) = 
        let timeout = defaultArg timeout Timeout.Infinite
        let v = ref Unchecked.defaultof<_>
        use gotReply = new ManualResetEvent(false)
        let msg = mConstructor (new ActorReplyChannel<_>(fun reply ->
            v := reply
            gotReply.Set() |> ignore))
        x.Post(msg) 
        match timeout with
        | Timeout.Infinite ->
            gotReply.WaitOne() |> ignore
            !v
        | _ ->
            let ok = gotReply.WaitOne(timeout)
            if ok then !v 
            else raise (TimeoutException("actor timed out"))

    member x.PostAndAsyncReply (mConstructor, ? timeout:int) =
        let timeout = defaultArg timeout Timeout.Infinite
        let v = ref Unchecked.defaultof<_>
        let gotReply = new ManualResetEvent(false)
        let msg = mConstructor (new ActorReplyChannel<_>(fun reply ->
            v := reply
            gotReply.Set() |> ignore))
        x.Post(msg) 
        match timeout with
        | Timeout.Infinite ->
            async {
                let! _ = Async.AwaitWaitHandle(gotReply)
                gotReply.Dispose()
                return !v }
        | _ ->
            async {
                let! ok = Async.AwaitWaitHandle(gotReply, timeout)
                gotReply.Dispose()
                if ok then return !v 
                else return! raise (TimeoutException("actor timed out"))}

    static member Start(f,t) =
        let mailbox = new ActorMailbox<'a>()
        Async.Start(f mailbox,t)
        mailbox //:> ActorMailbox<'a>

    interface IDisposable with
        member x.Dispose() = gotMsg.Dispose()


           
namespace System
namespace System.Threading
Multiple items
type ActorReplyChannel<'Reply> =
  new : replyf:('Reply -> unit) -> ActorReplyChannel<'Reply>
  member Reply : reply:'Reply -> unit

Full name: ActorModel.ActorReplyChannel<_>

--------------------
new : replyf:('Reply -> unit) -> ActorReplyChannel<'Reply>
val replyf : ('Reply -> unit)
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val x : ActorReplyChannel<'Reply>
member ActorReplyChannel.Reply : reply:'Reply -> unit

Full name: ActorModel.ActorReplyChannel`1.Reply
val reply : 'Reply
Multiple items
type ActorMailbox<'a> =
  interface IDisposable
  new : unit -> ActorMailbox<'a>
  member Post : msg:'a -> unit
  member PostAndAsyncReply : mConstructor:(ActorReplyChannel<'a0> -> 'a) * ?timeout:int -> Async<'a0>
  member PostAndReply : mConstructor:(ActorReplyChannel<'a0> -> 'a) * ?timeout:int -> 'a0
  member Receive : unit -> Async<'a>
  member CurrentQueueLength : int
  static member Start : f:(ActorMailbox<'a> -> Async<unit>) * t:CancellationToken -> ActorMailbox<'a>

Full name: ActorModel.ActorMailbox<_>

--------------------
new : unit -> ActorMailbox<'a>
val queue : Collections.Generic.Queue<'a>
namespace System.Collections
namespace System.Collections.Generic
Multiple items
type Queue<'T> =
  new : unit -> Queue<'T> + 2 overloads
  member Clear : unit -> unit
  member Contains : item:'T -> bool
  member CopyTo : array:'T[] * arrayIndex:int -> unit
  member Count : int
  member Dequeue : unit -> 'T
  member Enqueue : item:'T -> unit
  member GetEnumerator : unit -> Enumerator<'T>
  member Peek : unit -> 'T
  member ToArray : unit -> 'T[]
  ...
  nested type Enumerator

Full name: System.Collections.Generic.Queue<_>

--------------------
Collections.Generic.Queue() : unit
Collections.Generic.Queue(capacity: int) : unit
Collections.Generic.Queue(collection: Collections.Generic.IEnumerable<'T>) : unit
val gotMsg : AutoResetEvent
Multiple items
type AutoResetEvent =
  inherit EventWaitHandle
  new : initialState:bool -> AutoResetEvent

Full name: System.Threading.AutoResetEvent

--------------------
AutoResetEvent(initialState: bool) : unit
val mutable currentLength : int
val incr : (unit -> unit)
type Interlocked =
  static member Add : location1:int * value:int -> int + 1 overload
  static member CompareExchange : location1:int * value:int * comparand:int -> int + 6 overloads
  static member Decrement : location:int -> int + 1 overload
  static member Exchange : location1:int * value:int -> int + 6 overloads
  static member Increment : location:int -> int + 1 overload
  static member Read : location:int64 -> int64

Full name: System.Threading.Interlocked
Interlocked.Increment(location: byref<int64>) : int64
Interlocked.Increment(location: byref<int>) : int
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
val decr : (unit -> unit)
Interlocked.Decrement(location: byref<int64>) : int64
Interlocked.Decrement(location: byref<int>) : int
val dequeue : (unit -> 'a)
val v : 'a
val lock : lockObject:'Lock -> action:(unit -> 'T) -> 'T (requires reference type)

Full name: Microsoft.FSharp.Core.Operators.lock
Collections.Generic.Queue.Dequeue() : 'a
val enqueue : ('a -> unit)
Collections.Generic.Queue.Enqueue(item: 'a) : unit
EventWaitHandle.Set() : bool
val sp : SpinWait
type SpinWait =
  struct
    member Count : int
    member NextSpinWillYield : bool
    member Reset : unit -> unit
    member SpinOnce : unit -> unit
    static member SpinUntil : condition:Func<bool> -> unit + 2 overloads
  end

Full name: System.Threading.SpinWait
SpinWait.SpinOnce() : unit
val x : ActorMailbox<'a>
member ActorMailbox.Receive : unit -> Async<'a>

Full name: ActorModel.ActorMailbox`1.Receive
val loop : (unit -> Async<'a>)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val b : bool
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
member ActorMailbox.Post : msg:'a -> unit

Full name: ActorModel.ActorMailbox`1.Post
val msg : 'a
member ActorMailbox.CurrentQueueLength : int

Full name: ActorModel.ActorMailbox`1.CurrentQueueLength
member ActorMailbox.PostAndReply : mConstructor:(ActorReplyChannel<'a0> -> 'a) * ?timeout:int -> 'a0

Full name: ActorModel.ActorMailbox`1.PostAndReply
val mConstructor : (ActorReplyChannel<'a> -> 'a0)
val timeout : int option
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val timeout : int
val defaultArg : arg:'T option -> defaultValue:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.defaultArg
type Timeout =
  static val Infinite : int

Full name: System.Threading.Timeout
field Timeout.Infinite = -1
val v : 'a ref
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
module Unchecked

from Microsoft.FSharp.Core.Operators
val defaultof<'T> : 'T

Full name: Microsoft.FSharp.Core.Operators.Unchecked.defaultof
val gotReply : ManualResetEvent
Multiple items
type ManualResetEvent =
  inherit EventWaitHandle
  new : initialState:bool -> ManualResetEvent

Full name: System.Threading.ManualResetEvent

--------------------
ManualResetEvent(initialState: bool) : unit
val reply : 'a
member ActorMailbox.Post : msg:'a -> unit
WaitHandle.WaitOne() : bool
WaitHandle.WaitOne(timeout: TimeSpan) : bool
WaitHandle.WaitOne(millisecondsTimeout: int) : bool
WaitHandle.WaitOne(timeout: TimeSpan, exitContext: bool) : bool
WaitHandle.WaitOne(millisecondsTimeout: int, exitContext: bool) : bool
val ok : bool
val raise : exn:Exception -> 'T

Full name: Microsoft.FSharp.Core.Operators.raise
Multiple items
type TimeoutException =
  inherit SystemException
  new : unit -> TimeoutException + 2 overloads

Full name: System.TimeoutException

--------------------
TimeoutException() : unit
TimeoutException(message: string) : unit
TimeoutException(message: string, innerException: exn) : unit
member ActorMailbox.PostAndAsyncReply : mConstructor:(ActorReplyChannel<'a0> -> 'a) * ?timeout:int -> Async<'a0>

Full name: ActorModel.ActorMailbox`1.PostAndAsyncReply
WaitHandle.Dispose() : unit
static member ActorMailbox.Start : f:(ActorMailbox<'a> -> Async<unit>) * t:CancellationToken -> ActorMailbox<'a>

Full name: ActorModel.ActorMailbox`1.Start
val f : (ActorMailbox<'a> -> Async<unit>)
val t : CancellationToken
val mailbox : ActorMailbox<'a>
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
type IDisposable =
  member Dispose : unit -> unit

Full name: System.IDisposable
override ActorMailbox.Dispose : unit -> unit

Full name: ActorModel.ActorMailbox`1.Dispose

More information

Link:http://fssnip.net/m8
Posted:10 years ago
Author:Faisal Waris
Tags: actor , mailboxprocessor , android