0 people like it.

Web Crawler extensions

The snippet extends a web crawler from snippet http://fssnip.net/3K. It synchronizes all printing using an additional agent (so printed text does not interleave) and the crawling function returns an asynchronous workflow that returns when crawling completes.

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
106: 
107: 
108: 
109: 
110: 
111: 
112: 
113: 
114: 
115: 
116: 
117: 
118: 
119: 
120: 
121: 
122: 
123: 
124: 
125: 
126: 
127: 
128: 
129: 
130: 
131: 
132: 
133: 
134: 
135: 
136: 
137: 
138: 
139: 
140: 
141: 
142: 
143: 
144: 
145: 
146: 
147: 
148: 
149: 
150: 
151: 
152: 
153: 
154: 
155: 
156: 
157: 
158: 
159: 
160: 
161: 
162: 
163: 
164: 
165: 
166: 
167: 
168: 
169: 
170: 
171: 
172: 
173: 
174: 
175: 
176: 
177: 
178: 
179: 
180: 
181: 
182: 
183: 
184: 
185: 
186: 
187: 
188: 
189: 
190: 
open System
open System.Collections.Concurrent
open System.Collections.Generic
open System.IO
open System.Net
open System.Text.RegularExpressions

module Helpers =

    type Message =
        | Done
        | Mailbox of MailboxProcessor<Message>
        | Stop
        | Url of string option
        | Start of AsyncReplyChannel<unit>

    // Gates the number of crawling agents.
    [<Literal>]
    let Gate = 5

    // Extracts links from HTML.
    let extractLinks html =
        let pattern1 = "(?i)href\\s*=\\s*(\"|\')/?((?!#.*|/\B|" + 
                       "mailto:|location\.|javascript:)[^\"\']+)(\"|\')"
        let pattern2 = "(?i)^https?"
 
        let links =
            [
                for x in Regex(pattern1).Matches(html) do
                    yield x.Groups.[2].Value
            ]
            |> List.filter (fun x -> Regex(pattern2).IsMatch(x))
        links
    
    // Fetches a Web page.
    let fetch (url : string) =
        try
            let req = WebRequest.Create(url) :?> HttpWebRequest
            req.UserAgent <- "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)"
            req.Timeout <- 5000
            use resp = req.GetResponse()
            let content = resp.ContentType
            let isHtml = Regex("html").IsMatch(content)
            match isHtml with
            | true -> use stream = resp.GetResponseStream()
                      use reader = new StreamReader(stream)
                      let html = reader.ReadToEnd()
                      Some html
            | false -> None
        with
        | _ -> None
    
    let collectLinks url =
        let html = fetch url
        match html with
        | Some x -> extractLinks x
        | None -> []

open Helpers

let crawl url limit = 
    // Concurrent queue for saving collected urls.
    let q = ConcurrentQueue<string>()
    
    // Holds crawled URLs.
    let set = HashSet<string>()

    // Creates a mailbox that synchronizes printing to the console (so 
    // that two calls to 'printfn' do not interleave when printing)
    let printer = 
        MailboxProcessor.Start(fun x -> async {
          while true do 
            let! str = x.Receive()
            printfn "%s" str })
    // Hides standard 'printfn' function (formats the string using 
    // 'kprintf' and then posts the result to the printer agent.
    let printfn fmt = 
        Printf.kprintf printer.Post fmt

    let supervisor =
        MailboxProcessor.Start(fun x -> async {
            // The agent expects to receive 'Start' message first - the message
            // carries a reply channel that is used to notify the caller
            // when the agent completes crawling.
            let! start = x.Receive()
            let repl =
              match start with
              | Start repl -> repl
              | _ -> failwith "Expected Start message!"

            let rec loop run =
                async {
                    let! msg = x.Receive()
                    match msg with
                    | Mailbox(mailbox) -> 
                        let count = set.Count
                        if count < limit - 1 && run then 
                            let url = q.TryDequeue()
                            match url with
                            | true, str -> if not (set.Contains str) then
                                                let set'= set.Add str
                                                mailbox.Post <| Url(Some str)
                                                return! loop run
                                            else
                                                mailbox.Post <| Url None
                                                return! loop run

                            | _ -> mailbox.Post <| Url None
                                   return! loop run
                        else
                            mailbox.Post Stop
                            return! loop run
                    | Stop -> return! loop false
                    | Start _ -> failwith "Unexpected start message!"
                    | Url _ -> failwith "Unexpected URL message!"
                    | Done -> printfn "Supervisor is done."
                              (x :> IDisposable).Dispose()
                              // Notify the caller that the agent has completed
                              repl.Reply(())
                }
            do! loop true })

    
    let urlCollector =
        MailboxProcessor.Start(fun y ->
            let rec loop count =
                async {
                    let! msg = y.TryReceive(6000)
                    match msg with
                    | Some message ->
                        match message with
                        | Url u ->
                            match u with
                            | Some url -> q.Enqueue url
                                          return! loop count
                            | None -> return! loop count
                        | _ ->
                            match count with
                            | Gate -> supervisor.Post Done
                                      (y :> IDisposable).Dispose()
                                      printfn "URL collector is done."
                            | _ -> return! loop (count + 1)
                    | None -> supervisor.Post Stop
                              return! loop count
                }
            loop 1)
    
    /// Initializes a crawling agent.
    let crawler id =
        MailboxProcessor.Start(fun inbox ->
            let rec loop() =
                async {
                    let! msg = inbox.Receive()
                    match msg with
                    | Url x ->
                        match x with
                        | Some url -> 
                                let links = collectLinks url
                                printfn "%s crawled by agent %d." url id
                                for link in links do
                                    urlCollector.Post <| Url (Some link)
                                supervisor.Post(Mailbox(inbox))
                                return! loop()
                        | None -> supervisor.Post(Mailbox(inbox))
                                  return! loop()
                    | _ -> urlCollector.Post Done
                           printfn "Agent %d is done." id
                           (inbox :> IDisposable).Dispose()
                    }
            loop())

    // Send 'Start' message to the main agent. The result
    // is asynchronous workflow that will complete when the
    // agent crawling completes
    let result = supervisor.PostAndAsyncReply(Start)
    // Spawn the crawlers.
    let crawlers = 
        [
            for i in 1 .. Gate do
                yield crawler i
        ]
    
    // Post the first messages.
    crawlers.Head.Post <| Url (Some url)
    crawlers.Tail |> List.iter (fun ag -> ag.Post <| Url None) 
    result

// Example:
crawl "http://news.google.com" 25
|> Async.RunSynchronously
namespace System
namespace System.Collections
namespace System.Collections.Concurrent
namespace System.Collections.Generic
namespace System.IO
namespace System.Net
namespace System.Text
namespace System.Text.RegularExpressions
type Message =
  | Done
  | Mailbox of MailboxProcessor<Message>
  | Stop
  | Url of string option
  | Start of AsyncReplyChannel<unit>
union case Message.Done: Message
union case Message.Mailbox: MailboxProcessor<Message> -> Message
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
union case Message.Stop: Message
union case Message.Url: string option -> Message
Multiple items
val string : value:'T -> string

--------------------
type string = String
type 'T option = Option<'T>
union case Message.Start: AsyncReplyChannel<unit> -> Message
type AsyncReplyChannel<'Reply> =
  member Reply : value:'Reply -> unit
type unit = Unit
Multiple items
type LiteralAttribute =
  inherit Attribute
  new : unit -> LiteralAttribute

--------------------
new : unit -> LiteralAttribute
val Gate : int
val extractLinks : html:string -> string list
val html : string
val pattern1 : string
val pattern2 : string
val links : string list
val x : Match
Multiple items
type Regex =
  new : pattern:string -> Regex + 2 overloads
  member GetGroupNames : unit -> string[]
  member GetGroupNumbers : unit -> int[]
  member GroupNameFromNumber : i:int -> string
  member GroupNumberFromName : name:string -> int
  member IsMatch : input:string -> bool + 1 overload
  member Match : input:string -> Match + 2 overloads
  member MatchTimeout : TimeSpan
  member Matches : input:string -> MatchCollection + 1 overload
  member Options : RegexOptions
  ...

--------------------
Regex(pattern: string) : Regex
Regex(pattern: string, options: RegexOptions) : Regex
Regex(pattern: string, options: RegexOptions, matchTimeout: TimeSpan) : Regex
Multiple items
type List<'T> =
  new : unit -> List<'T> + 2 overloads
  member Add : item:'T -> unit
  member AddRange : collection:IEnumerable<'T> -> unit
  member AsReadOnly : unit -> ReadOnlyCollection<'T>
  member BinarySearch : item:'T -> int + 2 overloads
  member Capacity : int with get, set
  member Clear : unit -> unit
  member Contains : item:'T -> bool
  member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
  member CopyTo : array:'T[] -> unit + 2 overloads
  ...
  nested type Enumerator

--------------------
List() : List<'T>
List(capacity: int) : List<'T>
List(collection: IEnumerable<'T>) : List<'T>
val filter : predicate:('T -> bool) -> list:'T list -> 'T list
val x : string
val fetch : url:string -> string option
val url : string
val req : HttpWebRequest
type WebRequest =
  inherit MarshalByRefObject
  member Abort : unit -> unit
  member AuthenticationLevel : AuthenticationLevel with get, set
  member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
  member BeginGetResponse : callback:AsyncCallback * state:obj -> IAsyncResult
  member CachePolicy : RequestCachePolicy with get, set
  member ConnectionGroupName : string with get, set
  member ContentLength : int64 with get, set
  member ContentType : string with get, set
  member Credentials : ICredentials with get, set
  member EndGetRequestStream : asyncResult:IAsyncResult -> Stream
  ...
WebRequest.Create(requestUri: Uri) : WebRequest
WebRequest.Create(requestUriString: string) : WebRequest
Multiple items
type HttpWebRequest =
  inherit WebRequest
  new : unit -> HttpWebRequest
  member Abort : unit -> unit
  member Accept : string with get, set
  member AddRange : range:int -> unit + 7 overloads
  member Address : Uri
  member AllowAutoRedirect : bool with get, set
  member AllowReadStreamBuffering : bool with get, set
  member AllowWriteStreamBuffering : bool with get, set
  member AutomaticDecompression : DecompressionMethods with get, set
  member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
  ...

--------------------
HttpWebRequest() : HttpWebRequest
val resp : WebResponse
val content : string
val isHtml : bool
val stream : Stream
val reader : StreamReader
Multiple items
type StreamReader =
  inherit TextReader
  new : stream:Stream -> StreamReader + 10 overloads
  member BaseStream : Stream
  member Close : unit -> unit
  member CurrentEncoding : Encoding
  member DiscardBufferedData : unit -> unit
  member EndOfStream : bool
  member Peek : unit -> int
  member Read : unit -> int + 2 overloads
  member ReadAsync : buffer:Memory<char> * ?cancellationToken:CancellationToken -> ValueTask<int> + 1 overload
  member ReadBlock : buffer:Span<char> -> int + 1 overload
  ...

--------------------
StreamReader(stream: Stream) : StreamReader
   (+0 other overloads)
StreamReader(path: string) : StreamReader
   (+0 other overloads)
StreamReader(stream: Stream, detectEncodingFromByteOrderMarks: bool) : StreamReader
   (+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding) : StreamReader
   (+0 other overloads)
StreamReader(path: string, detectEncodingFromByteOrderMarks: bool) : StreamReader
   (+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding) : StreamReader
   (+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool) : StreamReader
   (+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool) : StreamReader
   (+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : StreamReader
   (+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : StreamReader
   (+0 other overloads)
union case Option.Some: Value: 'T -> Option<'T>
union case Option.None: Option<'T>
val collectLinks : url:string -> string list
val html : string option
module Helpers

from Script
val crawl : url:string -> limit:int -> Async<unit>
val limit : int
val q : ConcurrentQueue<string>
Multiple items
type ConcurrentQueue<'T> =
  new : unit -> ConcurrentQueue<'T> + 1 overload
  member Clear : unit -> unit
  member CopyTo : array:'T[] * index:int -> unit
  member Count : int
  member Enqueue : item:'T -> unit
  member GetEnumerator : unit -> IEnumerator<'T>
  member IsEmpty : bool
  member ToArray : unit -> 'T[]
  member TryDequeue : result:'T -> bool
  member TryPeek : result:'T -> bool

--------------------
ConcurrentQueue() : ConcurrentQueue<'T>
ConcurrentQueue(collection: IEnumerable<'T>) : ConcurrentQueue<'T>
val set : HashSet<string>
Multiple items
type HashSet<'T> =
  new : unit -> HashSet<'T> + 5 overloads
  member Add : item:'T -> bool
  member Clear : unit -> unit
  member Comparer : IEqualityComparer<'T>
  member Contains : item:'T -> bool
  member CopyTo : array:'T[] -> unit + 2 overloads
  member Count : int
  member EnsureCapacity : capacity:int -> int
  member ExceptWith : other:IEnumerable<'T> -> unit
  member GetEnumerator : unit -> Enumerator<'T>
  ...
  nested type Enumerator

--------------------
HashSet() : HashSet<'T>
HashSet(comparer: IEqualityComparer<'T>) : HashSet<'T>
HashSet(capacity: int) : HashSet<'T>
HashSet(collection: IEnumerable<'T>) : HashSet<'T>
HashSet(collection: IEnumerable<'T>, comparer: IEqualityComparer<'T>) : HashSet<'T>
HashSet(capacity: int, comparer: IEqualityComparer<'T>) : HashSet<'T>
val printer : MailboxProcessor<string>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val x : MailboxProcessor<string>
val async : AsyncBuilder
val str : string
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
val printfn : (Printf.StringFormat<'a,unit> -> 'a)
val fmt : Printf.StringFormat<'a,unit>
module Printf

from Microsoft.FSharp.Core
val kprintf : continuation:(string -> 'Result) -> format:Printf.StringFormat<'T,'Result> -> 'T
val supervisor : MailboxProcessor<Message>
val x : MailboxProcessor<Message>
val start : Message
val repl : AsyncReplyChannel<unit>
val failwith : message:string -> 'T
val loop : (bool -> Async<unit>)
val run : bool
val msg : Message
val mailbox : MailboxProcessor<Message>
val count : int
val url : bool * string
val not : value:bool -> bool
val set' : bool
type IDisposable =
  member Dispose : unit -> unit
val urlCollector : MailboxProcessor<Message>
val y : MailboxProcessor<Message>
val loop : (int -> Async<unit>)
val msg : Message option
val message : Message
val u : string option
val crawler : (int -> MailboxProcessor<Message>)


 Initializes a crawling agent.
val id : int
val inbox : MailboxProcessor<Message>
val loop : (unit -> Async<unit>)
val x : string option
val link : string
val result : Async<unit>
val crawlers : MailboxProcessor<Message> list
val i : int
val iter : action:('T -> unit) -> list:'T list -> unit
val ag : MailboxProcessor<Message>
Multiple items
type Async =
  static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
  static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
  static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
  static member AwaitTask : task:Task -> Async<unit>
  static member AwaitTask : task:Task<'T> -> Async<'T>
  static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
  static member CancelDefaultToken : unit -> unit
  static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
  static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
  static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  ...

--------------------
type Async<'T> =
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T

More information

Link:http://fssnip.net/65
Posted:1 year ago
Author:Tomas Petricek
Tags: crawling , web , web crawler , agent , mailboxprocessor