6 people like it.

Heterogeneous Parallel Async

Perform parallel Async returning heterogeneous types. (The solution presented here is based on a gist sent to me by [Anton Tayanovskyy](http://t0yv0.blogspot.com/ "Anton Tayanovskyy"), Twitter: [@t0yv0](https://twitter.com/t0yv0).)

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
72: 
73: 
74: 
type Parallel<'T> =
    private {
        Compute : Async<obj>[]
        Unpack : obj [] -> int -> 'T
    }

    static member ( <*> ) (f: Parallel<'A -> 'B>, x: Parallel<'A>) : Parallel<'B> =
        {
            Compute = Array.append f.Compute x.Compute
            Unpack = fun xs pos ->
                let fv = f.Unpack xs pos
                let xv = x.Unpack xs (pos + f.Compute.Length)
                fv xv
        }

    static member ( <*> ) (f: Parallel<'A ->' B>, x: Async<'A>) : Parallel<'B> =
        f <*> Parallel.Await(x)

and Parallel =

    static member Run<'T> (p: Parallel<'T>) : Async<'T> =
        async {
            let! results =
                match p.Compute.Length with
                | 0 -> async.Return [|box ()|]
                | 1 -> async { let! r = p.Compute.[0] 
                               return [| r |] }
                | _ -> Async.Parallel p.Compute
            return p.Unpack results 0
        }

    static member Await<'T> (x: Async<'T>) : Parallel<'T> =
        {
            Compute =
                [|
                    async {
                        let! v = x
                        return box v
                    }
                |]
            Unpack = fun xs pos -> unbox xs.[pos]
        }

    static member Pure<'T>(x: 'T) : Parallel<'T> =
        {
            Compute = [||]
            Unpack = fun _ _ -> x
        }

let myInt, myChar, myBool, myString =
    Parallel.Pure (fun w x y z -> (w, x, y, z))
    <*> async { return 1 }
    <*> async { return 'b' }
    <*> async { return true }
    <*> async { return "abc" }
    |> Parallel.Run
    |> Async.RunSynchronously

let test1 () =
    Parallel.Pure (fun x y z -> (x, y, z))
    <*> async { return 1 }
        // unit -> Parallel<('b -> 'c -> int * 'b * 'c)>

    <*> async { return 'b' }
        // unit -> Parallel<('c -> int * char * 'c)>

    <*> async { return true }
        // unit -> Parallel<int * char * bool>

    |> Parallel.Run
        // unit -> Async<int * char * bool>

    |> Async.RunSynchronously
        // unit -> int * char * bool
Parallel.Compute: Async<obj> []
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
type obj = System.Object

Full name: Microsoft.FSharp.Core.obj
Parallel.Unpack: obj [] -> int -> 'T
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val f : Parallel<('A -> 'B)>
Multiple items
type Parallel =
  static member Await : x:Async<'T> -> Parallel<'T>
  static member Pure : x:'T -> Parallel<'T>
  static member Run : p:Parallel<'T> -> Async<'T>

Full name: Script.Parallel

--------------------
type Parallel<'T> =
  private {Compute: Async<obj> [];
           Unpack: obj [] -> int -> 'T;}
  static member ( <*> ) : f:Parallel<('A -> 'B)> * x:Parallel<'A> -> Parallel<'B>
  static member ( <*> ) : f:Parallel<('A -> 'B)> * x:Async<'A> -> Parallel<'B>

Full name: Script.Parallel<_>
val x : Parallel<'A>
module Array

from Microsoft.FSharp.Collections
val append : array1:'T [] -> array2:'T [] -> 'T []

Full name: Microsoft.FSharp.Collections.Array.append
val xs : obj []
val pos : int
val fv : ('A -> 'B)
Parallel.Unpack: obj [] -> int -> 'A -> 'B
val xv : 'A
Parallel.Unpack: obj [] -> int -> 'A
property System.Array.Length: int
val x : Async<'A>
static member Parallel.Await : x:Async<'T> -> Parallel<'T>
static member Parallel.Run : p:Parallel<'T> -> Async<'T>

Full name: Script.Parallel.Run
val p : Parallel<'T>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val results : obj []
member AsyncBuilder.Return : value:'T -> Async<'T>
val box : value:'T -> obj

Full name: Microsoft.FSharp.Core.Operators.box
val r : obj
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member Parallel.Await : x:Async<'T> -> Parallel<'T>

Full name: Script.Parallel.Await
val x : Async<'T>
val v : 'T
val unbox : value:obj -> 'T

Full name: Microsoft.FSharp.Core.Operators.unbox
static member Parallel.Pure : x:'T -> Parallel<'T>

Full name: Script.Parallel.Pure
val x : 'T
val myInt : int

Full name: Script.myInt
val myChar : char

Full name: Script.myChar
val myBool : bool

Full name: Script.myBool
val myString : string

Full name: Script.myString
static member Parallel.Pure : x:'T -> Parallel<'T>
val w : int
val x : char
val y : bool
val z : string
static member Parallel.Run : p:Parallel<'T> -> Async<'T>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:System.Threading.CancellationToken -> 'T
val test1 : unit -> int * char * bool

Full name: Script.test1
val x : int
val y : char
val z : bool
Raw view Test code New version

More information

Link:http://fssnip.net/lh
Posted:10 years ago
Author:Jack Fox
Tags: async