#r "System.Messaging" #r "System.Transactions" open FSharp.Control open System open System.IO open System.Messaging open System.Text open System.Threading open System.Transactions type MsmqPropertyFilter = | NoProperties | AllProperties | IdOnly | BodyOnly | IdAndBody | CustomFilter of MessagePropertyFilter type MsmqFormatName = | MsmqDirect of string type MsmqQueue = private { MsmqFormatName: MsmqFormatName MessageQueue: MessageQueue } member queue.FormatName = match queue.MsmqFormatName with | MsmqDirect queuePath -> sprintf "DIRECT=OS:%s" queuePath member queue.Queue = queue.MessageQueue type MsmqMessageBody = | Stream of Stream | Binary of byte [] | Text of string type MsmqTransaction = | Local of MessageQueueTransaction | Distributed | Ambient | AutoCommit type MsmqMode = | NonTransactional | Transactional of MsmqTransaction type MsmqError = | MsmqReceiveTimedOut | MsmqException of MessageQueueException | NonMsmqException of exn type ICommittable = abstract member Commit: unit -> Result abstract member Abort: unit -> Result type MsmqMessage = | Committed of Message | Uncommitted of Message * ICommittable [] [] module FormatName = let direct queue = MsmqDirect queue [] [] module Msmq = let private getDirectFormatName queuePath = sprintf "FORMATNAME:DIRECT=OS:%s" queuePath let connect filter = function | MsmqDirect queuePath -> let formatName = getDirectFormatName queuePath match filter with | NoProperties -> new MessageQueue() | AllProperties -> let propertyFilter = let pf = MessagePropertyFilter() pf.SetAll() pf new MessageQueue(formatName, MessageReadPropertyFilter = propertyFilter) | IdOnly -> new MessageQueue(formatName, MessageReadPropertyFilter = MessagePropertyFilter(Id = true, LookupId = true)) | BodyOnly -> new MessageQueue(formatName, MessageReadPropertyFilter = MessagePropertyFilter(Body = true)) | IdAndBody -> new MessageQueue(formatName, MessageReadPropertyFilter = MessagePropertyFilter(Id = true, LookupId = true, Body = true)) | CustomFilter propertyFilter -> new MessageQueue(formatName, MessageReadPropertyFilter = propertyFilter) |> fun queue -> {MsmqFormatName = MsmqDirect queuePath; MessageQueue = queue} let private bodyToStream = function | Stream stream -> stream | Binary bytes -> new MemoryStream(bytes) :> Stream | Text str -> new MemoryStream(UTF8Encoding(false).GetBytes str) :> Stream let private sendStream (f: Stream -> Message) (g: Message -> unit) (stream: Stream) = use s = stream s |> f |> g let send mode message (msmq: MsmqQueue) = let queue = msmq.Queue let sendMessage f = message |> bodyToStream |> sendStream (fun s -> new Message(BodyStream = s)) f match mode with | NonTransactional -> sendMessage queue.Send | Transactional transaction -> match transaction with | Local tx -> sendMessage (fun message -> queue.Send(message, tx)) | Distributed -> use tx = new TransactionScope(TransactionScopeOption.Required, TransactionScopeAsyncFlowOption.Enabled) sendMessage (fun message -> queue.Send(message, MessageQueueTransactionType.Automatic)) tx.Complete() | Ambient -> sendMessage (fun message -> queue.Send(message, MessageQueueTransactionType.Automatic)) | AutoCommit -> sendMessage (fun message -> queue.Send(message, MessageQueueTransactionType.Single)) let private tryAsync f = async { try let! message = f return Ok message with | :? MessageQueueException as ex -> return if ex.MessageQueueErrorCode <> MessageQueueErrorCode.IOTimeout then Error MsmqReceiveTimedOut else Error (MsmqException ex) | ex -> return Error (NonMsmqException ex) } let receive timeout mode (msmq: MsmqQueue) = let queue = msmq.Queue match mode with | NonTransactional -> Async.FromBeginEnd ((fun _ -> queue.BeginReceive(timeout)), queue.EndReceive) | Transactional tx -> match tx with | Local transaction -> async { return queue.Receive(timeout, transaction) } | Distributed -> async { use transaction = new TransactionScope(TransactionScopeOption.Required, TransactionScopeAsyncFlowOption.Enabled) let message = queue.Receive(timeout, MessageQueueTransactionType.Automatic) transaction.Complete() return message } | Ambient -> async { return queue.Receive(timeout, MessageQueueTransactionType.Automatic) } | AutoCommit -> async { return queue.Receive(timeout, MessageQueueTransactionType.Single) } |> tryAsync let peek timeout (msmq: MsmqQueue) = let queue = msmq.Queue Async.FromBeginEnd ((fun _ -> queue.BeginPeek(timeout)), queue.EndPeek) |> tryAsync let private tryMsmqOp f = try f() |> Ok with | :? MessageQueueException as ex -> Error (if ex.MessageQueueErrorCode = MessageQueueErrorCode.IOTimeout then MsmqReceiveTimedOut else MsmqException ex) | ex -> Error (NonMsmqException ex) let subscribe (cancellation: CancellationToken) mode msmq = let timeout = TimeSpan.FromSeconds 10.0 let rec getMessages () = asyncSeq { let! message = match mode with | NonTransactional -> async.Bind(receive timeout mode msmq, Result.bind (Committed >> Ok) >> async.Return) | Transactional tx -> match tx with | Local transaction -> let committable = { new ICommittable with member __.Commit () = tryMsmqOp transaction.Commit member __.Abort () = tryMsmqOp transaction.Abort } async.Bind(receive timeout mode msmq, Result.bind (fun message -> Ok (Uncommitted (message, committable))) >> async.Return) | Distributed -> async { let transaction = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled) let! result = receive timeout (Transactional Ambient) msmq match result with | Ok message -> let committable = {new ICommittable with member __.Commit () = tryMsmqOp (transaction.Complete >> transaction.Dispose) member __.Abort () = tryMsmqOp (transaction.Dispose) } return Ok <| Uncommitted (message, committable) | Error e -> return Error e } | Ambient -> async { use transaction = let tx = new MessageQueueTransaction() tx.Begin() tx let! result = receive timeout (Transactional Ambient) msmq match result with | Ok message -> let committable = {new ICommittable with member __.Commit () = tryMsmqOp transaction.Commit member __.Abort () = tryMsmqOp transaction.Dispose } return Ok <| Uncommitted (message, committable) | Error e -> return Error e } | AutoCommit -> async.Bind(receive timeout mode msmq, Result.bind (Committed >> Ok) >> async.Return) match message with | Ok msg -> yield msg | Error _ -> () // Note: Swallows Errors if not cancellation.IsCancellationRequested then yield! getMessages () } getMessages () // Example Usage: """.\private$\test.msmq""" |> FormatName.direct |> Msmq.connect IdAndBody |> Msmq.subscribe cancellation.Token (Transactional Distributed) |> AsyncSeq.iterAsyncParallel (fun message -> async { match message with | Committed msg -> printfn "%A: Received Committed Message %A" DateTime.Now msg.Id | Uncommitted (msg, tx) -> printfn "%A: Received Unommitted Message %A" DateTime.Now msg.Id match tx.Commit() with | Ok _ -> printfn "%A: Committed Message %A" DateTime.Now msg.Id | Error e -> printfn "%A: Error Committing Message %A -- %A" DateTime.Now msg.Id e })