5 people like it.

Receive-only SMTP server

An agent is used to act as SMTP server and receive emails. Another agent receives emails from the SMTP server and also responds to requests for all emails received. A type is exposed that wraps this behaviour in a single method to get the list of emails.

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
module SMTP

open System.Net.Sockets
open System.Net
open System.IO

type private Agent<'T> = MailboxProcessor<'T>

type EMail = { Body:string list; Subject:string; From:string; To:string; }

let private emptyEmail = { Body=[];Subject="";From="";To=""; }

type private CheckInbox = 
    | Get of AsyncReplyChannel<EMail list>
    | Add of EMail

let private (|From|To|Subject|Body|) (input:string) =
    let regexes = ["From: ";"To: ";"Subject: "]
    let matches = regexes |> List.map(fun r -> (input.StartsWith(r),r))
    let trimStart (start:string) = input.TrimStart(start.ToCharArray())
    match matches with
    | [(true,r);_;_] -> From (trimStart r)
    | [_;(true,r);_] -> To (trimStart r)
    | [_;_;(true,r)] -> Subject (trimStart r)
    | _ -> Body input

let private receiveEmail() = async {
    let endPoint = new IPEndPoint(IPAddress.Any, 25)
    let listener = new TcpListener(endPoint)
    listener.Start()

    use! client = listener.AcceptTcpClientAsync() |> Async.AwaitTask
            
    let stream = client.GetStream()
    use sr = new StreamReader(stream)
    use wr = new StreamWriter(stream)
    wr.NewLine <- "\r\n"
    wr.AutoFlush <- true

    wr.WriteLine("220 localhost -- Fake proxy server")
   
    let rec readlines email =
        let line = sr.ReadLine()
                
        match line with
        | "DATA" -> 
            wr.WriteLine("354 Start input, end data with <CRLF>.<CRLF>")
            let rec readdata email =
                let line = sr.ReadLine()
                if line = null || line = "." 
                then email
                else 
                    let email =
                        match line with
                        | From l -> {email with From=l}
                        | To l -> {email with To=l}
                        | Subject l -> {email with Subject=l}
                        | Body l when l <> "" -> {email with Body=l::email.Body}
                        | _ -> email
                    readdata email

            let newlines = readdata email
            wr.WriteLine("250 OK")
            readlines newlines
        | "QUIT" -> 
            wr.WriteLine("250 OK")
            email
        | _ -> 
            wr.WriteLine("250 OK")
            readlines email 
                
    let newMessage = readlines emptyEmail

    client.Close()
    listener.Stop()
    return newMessage }

let private smtpAgent (cachingAgent: Agent<CheckInbox>) = 
    Agent<unit>.Start(fun _ -> 
        let rec loop() = async {
            let! newMessage = receiveEmail()
            cachingAgent.Post (Add newMessage)
            return! loop() }

        loop())

let private cachingAgent() =
    Agent.Start(fun inbox -> 
        let rec loop messages = async {
            let! newMessage = inbox.Receive()
            match newMessage with
            | Get channel -> 
                channel.Reply(messages)
                return! loop messages
            | Add message -> 
                return! loop (message::messages) }
        loop [])

type SmtpServer() =
    let cache = cachingAgent()
    let server = smtpAgent cache
    
    member this.GetEmails() = cache.PostAndReply Get
module SMTP
namespace System
namespace System.Net
namespace System.Net.Sockets
namespace System.IO
type private Agent<'T> = MailboxProcessor<'T>

Full name: SMTP.Agent<_>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
type EMail =
  {Body: string list;
   Subject: string;
   From: string;
   To: string;}

Full name: SMTP.EMail
EMail.Body: string list
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>
EMail.Subject: string
EMail.From: string
EMail.To: string
val private emptyEmail : EMail

Full name: SMTP.emptyEmail
type private CheckInbox =
  | Get of AsyncReplyChannel<EMail list>
  | Add of EMail

Full name: SMTP.CheckInbox
union case CheckInbox.Get: AsyncReplyChannel<EMail list> -> CheckInbox
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
union case CheckInbox.Add: EMail -> CheckInbox
val input : string
val regexes : string list
val matches : (bool * string) list
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
  interface IEnumerable
  interface IEnumerable<'T>
  member Head : 'T
  member IsEmpty : bool
  member Item : index:int -> 'T with get
  member Length : int
  member Tail : 'T list
  static member Cons : head:'T * tail:'T list -> 'T list
  static member Empty : 'T list

Full name: Microsoft.FSharp.Collections.List<_>
val map : mapping:('T -> 'U) -> list:'T list -> 'U list

Full name: Microsoft.FSharp.Collections.List.map
val r : string
System.String.StartsWith(value: string) : bool
System.String.StartsWith(value: string, comparisonType: System.StringComparison) : bool
System.String.StartsWith(value: string, ignoreCase: bool, culture: System.Globalization.CultureInfo) : bool
val trimStart : (string -> string)
val start : string
System.String.TrimStart([<System.ParamArray>] trimChars: char []) : string
System.String.ToCharArray() : char []
System.String.ToCharArray(startIndex: int, length: int) : char []
val private receiveEmail : unit -> Async<EMail>

Full name: SMTP.receiveEmail
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val endPoint : IPEndPoint
Multiple items
type IPEndPoint =
  inherit EndPoint
  new : address:int64 * port:int -> IPEndPoint + 1 overload
  member Address : IPAddress with get, set
  member AddressFamily : AddressFamily
  member Create : socketAddress:SocketAddress -> EndPoint
  member Equals : comparand:obj -> bool
  member GetHashCode : unit -> int
  member Port : int with get, set
  member Serialize : unit -> SocketAddress
  member ToString : unit -> string
  static val MinPort : int
  ...

Full name: System.Net.IPEndPoint

--------------------
IPEndPoint(address: int64, port: int) : unit
IPEndPoint(address: IPAddress, port: int) : unit
Multiple items
type IPAddress =
  new : newAddress:int64 -> IPAddress + 2 overloads
  member Address : int64 with get, set
  member AddressFamily : AddressFamily
  member Equals : comparand:obj -> bool
  member GetAddressBytes : unit -> byte[]
  member GetHashCode : unit -> int
  member IsIPv6LinkLocal : bool
  member IsIPv6Multicast : bool
  member IsIPv6SiteLocal : bool
  member IsIPv6Teredo : bool
  ...

Full name: System.Net.IPAddress

--------------------
IPAddress(newAddress: int64) : unit
IPAddress(address: byte []) : unit
IPAddress(address: byte [], scopeid: int64) : unit
field IPAddress.Any
val listener : TcpListener
Multiple items
type TcpListener =
  new : localEP:IPEndPoint -> TcpListener + 2 overloads
  member AcceptSocket : unit -> Socket
  member AcceptTcpClient : unit -> TcpClient
  member AllowNatTraversal : allowed:bool -> unit
  member BeginAcceptSocket : callback:AsyncCallback * state:obj -> IAsyncResult
  member BeginAcceptTcpClient : callback:AsyncCallback * state:obj -> IAsyncResult
  member EndAcceptSocket : asyncResult:IAsyncResult -> Socket
  member EndAcceptTcpClient : asyncResult:IAsyncResult -> TcpClient
  member ExclusiveAddressUse : bool with get, set
  member LocalEndpoint : EndPoint
  ...

Full name: System.Net.Sockets.TcpListener

--------------------
TcpListener(localEP: IPEndPoint) : unit
TcpListener(localaddr: IPAddress, port: int) : unit
TcpListener.Start() : unit
TcpListener.Start(backlog: int) : unit
val client : System.IDisposable
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.AwaitTask : task:System.Threading.Tasks.Task<'T> -> Async<'T>
val stream : obj
val sr : StreamReader
Multiple items
type StreamReader =
  inherit TextReader
  new : stream:Stream -> StreamReader + 9 overloads
  member BaseStream : Stream
  member Close : unit -> unit
  member CurrentEncoding : Encoding
  member DiscardBufferedData : unit -> unit
  member EndOfStream : bool
  member Peek : unit -> int
  member Read : unit -> int + 1 overload
  member ReadLine : unit -> string
  member ReadToEnd : unit -> string
  ...

Full name: System.IO.StreamReader

--------------------
StreamReader(stream: Stream) : unit
StreamReader(path: string) : unit
StreamReader(stream: Stream, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(stream: Stream, encoding: System.Text.Encoding) : unit
StreamReader(path: string, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(path: string, encoding: System.Text.Encoding) : unit
StreamReader(stream: Stream, encoding: System.Text.Encoding, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(path: string, encoding: System.Text.Encoding, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(stream: Stream, encoding: System.Text.Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
StreamReader(path: string, encoding: System.Text.Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
val wr : StreamWriter
Multiple items
type StreamWriter =
  inherit TextWriter
  new : stream:Stream -> StreamWriter + 6 overloads
  member AutoFlush : bool with get, set
  member BaseStream : Stream
  member Close : unit -> unit
  member Encoding : Encoding
  member Flush : unit -> unit
  member Write : value:char -> unit + 3 overloads
  static val Null : StreamWriter

Full name: System.IO.StreamWriter

--------------------
StreamWriter(stream: Stream) : unit
StreamWriter(path: string) : unit
StreamWriter(stream: Stream, encoding: System.Text.Encoding) : unit
StreamWriter(path: string, append: bool) : unit
StreamWriter(stream: Stream, encoding: System.Text.Encoding, bufferSize: int) : unit
StreamWriter(path: string, append: bool, encoding: System.Text.Encoding) : unit
StreamWriter(path: string, append: bool, encoding: System.Text.Encoding, bufferSize: int) : unit
property TextWriter.NewLine: string
property StreamWriter.AutoFlush: bool
TextWriter.WriteLine() : unit
   (+0 other overloads)
TextWriter.WriteLine(value: obj) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: string) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: decimal) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: float) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: float32) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: uint64) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: int64) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: uint32) : unit
   (+0 other overloads)
TextWriter.WriteLine(value: int) : unit
   (+0 other overloads)
val readlines : (EMail -> EMail)
val email : EMail
val line : string
StreamReader.ReadLine() : string
val readdata : (EMail -> EMail)
active recognizer From: string -> Choice<string,string,string,string>

Full name: SMTP.( |From|To|Subject|Body| )
val l : string
active recognizer To: string -> Choice<string,string,string,string>

Full name: SMTP.( |From|To|Subject|Body| )
active recognizer Subject: string -> Choice<string,string,string,string>

Full name: SMTP.( |From|To|Subject|Body| )
active recognizer Body: string -> Choice<string,string,string,string>

Full name: SMTP.( |From|To|Subject|Body| )
val newlines : EMail
val newMessage : EMail
TcpListener.Stop() : unit
val private smtpAgent : cachingAgent:Agent<CheckInbox> -> MailboxProcessor<unit>

Full name: SMTP.smtpAgent
val cachingAgent : Agent<CheckInbox>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val loop : (unit -> Async<'a>)
member MailboxProcessor.Post : message:'Msg -> unit
val private cachingAgent : unit -> MailboxProcessor<CheckInbox>

Full name: SMTP.cachingAgent
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<CheckInbox>
val loop : (EMail list -> Async<'a>)
val messages : EMail list
val newMessage : CheckInbox
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val channel : AsyncReplyChannel<EMail list>
member AsyncReplyChannel.Reply : value:'Reply -> unit
val message : EMail
Multiple items
type SmtpServer =
  new : unit -> SmtpServer
  member GetEmails : unit -> EMail list

Full name: SMTP.SmtpServer

--------------------
new : unit -> SmtpServer
val cache : MailboxProcessor<CheckInbox>
val server : MailboxProcessor<unit>
val this : SmtpServer
member SmtpServer.GetEmails : unit -> EMail list

Full name: SMTP.SmtpServer.GetEmails
member MailboxProcessor.PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
Raw view Test code New version

More information

Link:http://fssnip.net/tn
Posted:8 years ago
Author:Nick Lydon
Tags: smtp , email , agents , asynchronous