open System module Observable = /// Creates an observable that returns windows of size 'n' (or smaller at the start) /// containing 'n' past values produced by observable 'source'. The order of items in /// the returned buffers is not guaranteed (it's a circular buffer). let windowed n (source:IObservable<'T>) = { new IObservable<'T[]> with member x.Subscribe(obs) = let agent = MailboxProcessor.Start(fun inbox -> let buffer = Array.zeroCreate n let rec loop index count = async { let! v = inbox.Receive() buffer.[index] <- v buffer |> Seq.truncate (count+1) |> Array.ofSeq |> obs.OnNext return! loop (if index + 1 = n then 0 else index + 1) (count + 1) } loop 0 0) { new IObserver<'T> with member x.OnNext(v) = agent.Post(v) member x.OnError(e) = obs.OnError(e) member x.OnCompleted() = obs.OnCompleted() } |> source.Subscribe }