5 people like it.

Sliding window for Observable

Implements the Observable.windowed function that creates an observable returning a sliding window. The function is an observable version of Seq.observable. The implementation uses a simple F# agent that keeps partial windows and sends them to an observer.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
open System

module Observable =

  /// Returns an observable that yields sliding windows of 
  /// containing elements drawn from the input observable. 
  /// Each window is returned as a fresh array.
  let windowed (count:int) (source:IObservable<_>) =
    { new IObservable<_> with
        member x.Subscribe(observer) =
          // Start an agent that remembers partial windows of length 
          // smaller than the count (new agent for every observer)
          let agent = MailboxProcessor.Start(fun agent ->
            // The parameter 'lists' contains partial lists and their lengths
            let rec loop lists = async { 
              // Receive the next value
              let! value = agent.Receive()

              // Add new empty list and then the new element to all lists.
              // Then split the lists into 'full' that should be sent
              // to the observer and 'partial' which need more elements.
              let full, partial =
                ((0, []) :: lists)
                |> List.map (fun (length, l) -> length + 1, value::l)
                |> List.partition (fun (length, l) -> length = count)
              
              // Send all full lists to the observer (as arrays)
              for (_, l) in full do
                observer.OnNext(l |> Array.ofSeq |> Array.rev) 
              // Continue looping with incomplete lists
              return! loop partial }

            // Start with an empty list of partial lists
            loop [])

          // Send incoming values to the agent
          source.Subscribe(agent.Post) }
namespace System
Multiple items
module Observable

from Script

--------------------
module Observable

from Microsoft.FSharp.Control
val windowed : count:int -> source:IObservable<'a> -> IObservable<'a []>

Full name: Script.Observable.windowed


 Returns an observable that yields sliding windows of
 containing elements drawn from the input observable.
 Each window is returned as a fresh array.
val count : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val source : IObservable<'a>
type IObservable<'T> =
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: System.IObservable<_>
val x : IObservable<'a []>
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<'a []>) : IDisposable
val observer : IObserver<'a []>
val agent : MailboxProcessor<'a>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val loop : ((int * 'a list) list -> Async<'b>)
val lists : (int * 'a list) list
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val value : 'a
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val full : (int * 'a list) list
val partial : (int * 'a list) list
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
  interface IEnumerable
  interface IEnumerable<'T>
  member Head : 'T
  member IsEmpty : bool
  member Item : index:int -> 'T with get
  member Length : int
  member Tail : 'T list
  static member Cons : head:'T * tail:'T list -> 'T list
  static member Empty : 'T list

Full name: Microsoft.FSharp.Collections.List<_>
val map : mapping:('T -> 'U) -> list:'T list -> 'U list

Full name: Microsoft.FSharp.Collections.List.map
val length : int
val l : 'a list
val partition : predicate:('T -> bool) -> list:'T list -> 'T list * 'T list

Full name: Microsoft.FSharp.Collections.List.partition
IObserver.OnNext(value: 'a []) : unit
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val ofSeq : source:seq<'T> -> 'T []

Full name: Microsoft.FSharp.Collections.Array.ofSeq
val rev : array:'T [] -> 'T []

Full name: Microsoft.FSharp.Collections.Array.rev
member IObservable.Subscribe : callback:('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<'a>) : IDisposable
member MailboxProcessor.Post : message:'Msg -> unit
Raw view Test code New version

More information

Link:http://fssnip.net/7Z
Posted:12 years ago
Author:Tomas Petricek
Tags: observable , agent , windowed , sliding window