2 people like it.

Async workflow with asynchronous "finally" clause

The F# Core library offers async.TryFinally which where a synchronous compensation function (of type unit -> unit) is run after an error or cancellation. However, it offers no way to start an asynchronous compensation. The TryFinallyAsync method defined below offers a way around this.

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
106: 
107: 
108: 
109: 
110: 
111: 
112: 
113: 
114: 
115: 
116: 
117: 
118: 
119: 
120: 
121: 
122: 
123: 
124: 
125: 
126: 
127: 
128: 
129: 
130: 
131: 
132: 
133: 
134: 
135: 
136: 
137: 
138: 
139: 
140: 
141: 
142: 
143: 
144: 
145: 
146: 
147: 
148: 
149: 
150: 
151: 
152: 
153: 
154: 
155: 
156: 
157: 
158: 
159: 
160: 
161: 
162: 
163: 
164: 
165: 
166: 
167: 
module Application

open System
open System.Threading

type Microsoft.FSharp.Control.Async with 
    static member TryFinallyAsync comp deferred =

        let finish (compResult, deferredResult) (cont, econt, ccont) =
            match (compResult, deferredResult) with
            | (Choice1Of3 (),      Choice1Of3 ())          -> cont ()
            | (Choice2Of3 compExn, Choice1Of3 ())          -> econt compExn
            | (Choice3Of3 compExn, Choice1Of3 ())          -> ccont compExn
            | (Choice1Of3 (),      Choice2Of3 deferredExn) -> econt deferredExn
            | (Choice2Of3 compExn, Choice2Of3 deferredExn) -> econt <| new Exception(deferredExn.Message, compExn)
            | (Choice3Of3 compExn, Choice2Of3 deferredExn) -> econt deferredExn
            | (_,                  Choice3Of3 deferredExn) -> econt <| new Exception("Unexpected cancellation.", deferredExn)

        let startDeferred compResult (cont, econt, ccont) =
            Async.StartWithContinuations(deferred,
                (fun ()  -> finish (compResult, Choice1Of3 ())  (cont, econt, ccont)),
                (fun exn -> finish (compResult, Choice2Of3 exn) (cont, econt, ccont)),
                (fun exn -> finish (compResult, Choice3Of3 exn) (cont, econt, ccont)))

        let startComp ct (cont, econt, ccont) =
            Async.StartWithContinuations(comp,
                (fun ()  -> startDeferred (Choice1Of3 ())  (cont, econt, ccont)),
                (fun exn -> startDeferred (Choice2Of3 exn) (cont, econt, ccont)),
                (fun exn -> startDeferred (Choice3Of3 exn) (cont, econt, ccont)),
                ct)
                
        async { let! ct = Async.CancellationToken
                do! Async.FromContinuations (startComp ct) }

let continueMsg = "\nPress enter to continue..."

[<EntryPoint>]
let main _ =
    printfn "Demo - no cancellation or error"
    let workflow1 =
        async { printfn "Starting work..."
                do! Async.Sleep 1000
                printfn "Finished work." }
        |> Async.TryFinallyAsync <|
        async {
            printfn "Starting 'finally' clause."
            do! Async.Sleep 1000
            printfn "Completed 'finally' clause." }

    Async.StartWithContinuations(workflow1,
        (fun ()  -> printfn "Success!%s" continueMsg),
        (fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
        (fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg))

    Console.ReadLine() |> ignore

    printfn "Demo - error in main workflow"
    let workflow2 =
        async { do! Async.Sleep 1000
                printfn "Starting work..."
                failwith "Failed to do the work." }
        |> Async.TryFinallyAsync <|
        async { printfn "Starting 'finally' clause."
                do! Async.Sleep 1000
                printfn "Completed with 'finally' clause." }

    Async.StartWithContinuations(workflow2,
        (fun ()  -> printfn "Success!%s" continueMsg),
        (fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
        (fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg))

    Console.ReadLine() |> ignore

    printfn "Demo - cancellation in main workflow:"
    let workflow3 =
        async { printfn "Starting work...."
                do! Async.Sleep 1000
                printfn "Finished the work." }
        |> Async.TryFinallyAsync <|
        async { printfn "Starting 'finally' clause."
                do! Async.Sleep 1000
                printfn "Completed with 'finally' clause." }

    let cancellationCapability1 = new CancellationTokenSource()
    Async.StartWithContinuations(workflow3,
        (fun ()  -> printfn "Success!%s" continueMsg),
        (fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
        (fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg),
        cancellationCapability1.Token)

    Thread.Sleep 500
    cancellationCapability1.Cancel()
    Console.ReadLine() |> ignore

    printfn "Demo - cancellation after main workflow:"
    let workflow4 =
        async { printfn "Starting work...."
                do! Async.Sleep 1000
                printfn "Finished the work." }
        |> Async.TryFinallyAsync <|
        async { printfn "Starting 'finally' clause."
                do! Async.Sleep 1000
                printfn "Completed with 'finally' clause." }

    let cancellationCapability2 = new CancellationTokenSource()
    Async.StartWithContinuations(workflow4,
        (fun ()  -> printfn "Success!%s" continueMsg),
        (fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
        (fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg),
        cancellationCapability2.Token)

    Thread.Sleep 1900
    cancellationCapability2.Cancel()
    Console.ReadLine() |> ignore

    printfn "Demo - error during finally clause:"
    let workflow5 =
        async { printfn "Starting work...."
                do! Async.Sleep 2000
                printfn "Finished the work." }
        |> Async.TryFinallyAsync <|
        async { printfn "Starting 'finally' clause."
                failwith "Failed during 'finally' clause." }

    Async.StartWithContinuations(workflow5,
        (fun ()  -> printfn "Success!%s" continueMsg),
        (fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
        (fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg))

    Console.ReadLine() |> ignore

    printfn "Demo - error during both clauses:"
    let workflow6 =
        async { printfn "Starting work...."
                failwith "Failed to do the work." }
        |> Async.TryFinallyAsync <|
        async { printfn "Starting 'finally' clause."
                failwith "Failed during 'finally' clause." }

    Async.StartWithContinuations(workflow6,
        (fun ()  -> printfn "Success!%s" continueMsg),
        (fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
        (fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg))

    Console.ReadLine() |> ignore

    printfn "Demo - cancellation during main workflow and error in 'finally' clause:"
    let workflow7 =
        async { printfn "Starting work...."
                do! Async.Sleep 1000
                printfn "Finished the work." }
        |> Async.TryFinallyAsync <|
        async { printfn "Starting 'finally' clause."
                failwith "Failed during 'finally' clause." }

    let cancellationCapability1 = new CancellationTokenSource()
    Async.StartWithContinuations(workflow7,
        (fun ()  -> printfn "Success!%s" continueMsg),
        (fun exn -> printfn "Error: %s%s" exn.Message continueMsg),
        (fun exn -> printfn "Cancelled: %s%s" exn.Message continueMsg),
        cancellationCapability1.Token)

    Thread.Sleep 500
    cancellationCapability1.Cancel()
    Console.ReadLine() |> ignore

    0
module Application
namespace System
namespace System.Threading
namespace Microsoft
namespace Microsoft.FSharp
namespace Microsoft.FSharp.Control
Multiple items
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>

--------------------
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async
static member Async.TryFinallyAsync : comp:Async<unit> -> deferred:Async<unit> -> Async<unit>

Full name: Application.TryFinallyAsync
val comp : Async<unit>
val deferred : Async<unit>
val finish : (Choice<unit,Exception,'a> * Choice<unit,Exception,#exn> -> (unit -> 'c) * (Exception -> 'c) * ('a -> 'c) -> 'c)
val compResult : Choice<unit,Exception,'a>
val deferredResult : Choice<unit,Exception,#exn>
val cont : (unit -> 'c)
val econt : (Exception -> 'c)
val ccont : ('a -> 'c)
union case Choice.Choice1Of3: 'T1 -> Choice<'T1,'T2,'T3>
union case Choice.Choice2Of3: 'T2 -> Choice<'T1,'T2,'T3>
val compExn : Exception
union case Choice.Choice3Of3: 'T3 -> Choice<'T1,'T2,'T3>
val compExn : 'a
val deferredExn : Exception
Multiple items
type Exception =
  new : unit -> Exception + 2 overloads
  member Data : IDictionary
  member GetBaseException : unit -> Exception
  member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
  member GetType : unit -> Type
  member HelpLink : string with get, set
  member InnerException : Exception
  member Message : string
  member Source : string with get, set
  member StackTrace : string
  ...

Full name: System.Exception

--------------------
Exception() : unit
Exception(message: string) : unit
Exception(message: string, innerException: exn) : unit
property Exception.Message: string
val deferredExn : #exn
val startDeferred : (Choice<unit,Exception,'a> -> (unit -> unit) * (Exception -> unit) * ('a -> unit) -> unit)
val cont : (unit -> unit)
val econt : (Exception -> unit)
val ccont : ('a -> unit)
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
Multiple items
val exn : exn

--------------------
type exn = Exception

Full name: Microsoft.FSharp.Core.exn
Multiple items
val exn : OperationCanceledException

--------------------
type exn = Exception

Full name: Microsoft.FSharp.Core.exn
val startComp : (CancellationToken -> (unit -> unit) * (Exception -> unit) * (OperationCanceledException -> unit) -> unit)
val ct : CancellationToken
val ccont : (OperationCanceledException -> unit)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
property Async.CancellationToken: Async<CancellationToken>
static member Async.FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
val continueMsg : string

Full name: Application.continueMsg
Multiple items
type EntryPointAttribute =
  inherit Attribute
  new : unit -> EntryPointAttribute

Full name: Microsoft.FSharp.Core.EntryPointAttribute

--------------------
new : unit -> EntryPointAttribute
val main : string [] -> int

Full name: Application.main
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val workflow1 : Async<unit>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
static member Async.TryFinallyAsync : comp:Async<unit> -> deferred:Async<unit> -> Async<unit>
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
Console.ReadLine() : string
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
val workflow2 : Async<unit>
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
val workflow3 : Async<unit>
val cancellationCapability1 : CancellationTokenSource
Multiple items
type CancellationTokenSource =
  new : unit -> CancellationTokenSource
  member Cancel : unit -> unit + 1 overload
  member Dispose : unit -> unit
  member IsCancellationRequested : bool
  member Token : CancellationToken
  static member CreateLinkedTokenSource : [<ParamArray>] tokens:CancellationToken[] -> CancellationTokenSource + 1 overload

Full name: System.Threading.CancellationTokenSource

--------------------
CancellationTokenSource() : unit
property CancellationTokenSource.Token: CancellationToken
Multiple items
type Thread =
  inherit CriticalFinalizerObject
  new : start:ThreadStart -> Thread + 3 overloads
  member Abort : unit -> unit + 1 overload
  member ApartmentState : ApartmentState with get, set
  member CurrentCulture : CultureInfo with get, set
  member CurrentUICulture : CultureInfo with get, set
  member DisableComObjectEagerCleanup : unit -> unit
  member ExecutionContext : ExecutionContext
  member GetApartmentState : unit -> ApartmentState
  member GetCompressedStack : unit -> CompressedStack
  member GetHashCode : unit -> int
  ...

Full name: System.Threading.Thread

--------------------
Thread(start: ThreadStart) : unit
Thread(start: ParameterizedThreadStart) : unit
Thread(start: ThreadStart, maxStackSize: int) : unit
Thread(start: ParameterizedThreadStart, maxStackSize: int) : unit
Thread.Sleep(timeout: TimeSpan) : unit
Thread.Sleep(millisecondsTimeout: int) : unit
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
val workflow4 : Async<unit>
val cancellationCapability2 : CancellationTokenSource
val workflow5 : Async<unit>
val workflow6 : Async<unit>
val workflow7 : Async<unit>
Raw view Test code New version

More information

Link:http://fssnip.net/ru
Posted:8 years ago
Author:Anton Tcholakov
Tags: asynchronous programming , cancellation , error handling