3 people like it.

Request throttling agent

Request throttling based on a time span and a request count. An `IDistributedCache` implementation is used which integrates nicely into ASP.NET Core. E.g. you can throttle requests from a specific IP address to allow fifty requests within a period of thirty seconds at most.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
72: 
73: 
74: 
75: 
76: 
77: 
78: 
79: 
80: 
81: 
82: 
83: 
84: 
85: 
86: 
87: 
88: 
89: 
90: 
91: 
open Microsoft.Extensions.Options
open Microsoft.Extensions.Caching.Distributed
open Microsoft.Extensions.Caching.Memory
open MBrace.FsPickler

type IDistributedCache with
    member self.SetValue(serializer, key, value, options) =
        self.Set(key, serializer value, options)
    member self.GetValue<'T>(deserializer: byte[] -> 'T, key) =
        match self.Get(key) with
        | null -> None
        | obj -> Some (deserializer obj)
    member self.SetValueAsync(serializer, key, value, options, token) =
        self.SetAsync(key, serializer value, options, token)
    member self.GetValueAsync<'T>(deserializer: byte[] -> 'T, key, token) = task {
        match! self.GetAsync(key, token) with
        | null -> return None
        | obj -> return Some (deserializer(obj))
    }
    member self.SetAsyncValue(serializer, key, value, options) = async {
        let! token = Async.CancellationToken
        return! Async.AwaitTask (self.SetValueAsync(serializer, key, value, options, token))
    }
    member self.GetAsyncValue<'T>(deserializer, key) = async {
        let! token = Async.CancellationToken
        return! Async.AwaitTask (self.GetValueAsync<'T>(deserializer, key, token))
    }

type Throttler (cache: IDistributedCache, cacheDuration, maxRequests) =
    let serializer = FsPickler.CreateBinarySerializer()
    let agent = MailboxProcessor.Start(fun agent ->
        let rec loop () = async {
            let! (cacheKey, reply: AsyncReplyChannel<bool>) = agent.Receive()
            let! value = cache.GetAsyncValue(serializer.UnPickle, cacheKey)
            let count, start, expiration =
                match value with
                | Some (start: System.DateTimeOffset, count) ->
                    let count' = count + 1
                    let expiration = start.Add(cacheDuration)
                    count', start, expiration
                | None ->
                    let now = System.DateTimeOffset.UtcNow
                    let expiration = now.Add(cacheDuration)
                    1, now, expiration
            do! cache.SetAsyncValue(
                serializer.Pickle,
                cacheKey,
                (start, count),
                DistributedCacheEntryOptions(
                    AbsoluteExpiration=expiration
                )
            )
            reply.Reply(count > maxRequests)
            return! loop ()
        }
        loop ()
    )
    member _.Get(cacheKey) =
        agent.PostAndReply(fun channel -> (cacheKey, channel))
    member _.GetAsync(cacheKey) =
        agent.PostAndAsyncReply(fun channel -> (cacheKey, channel))

(* Example *)

// The cache is normally provided by DI in ASP.NET Core
let options = Options.Create(MemoryDistributedCacheOptions())
let cache = new MemoryDistributedCache(options)

let cacheDuration = System.TimeSpan.FromSeconds(1)
let maxRequests = 4

let throttler = Throttler(cache, cacheDuration, maxRequests)

// The cache key could be the IP address or the username or anything you want to provide as the limiting factor for the request.
for _ in 0..10 do
    throttler.Get("hello")
    |> printfn "%A"
    System.Threading.Thread.Sleep 120

// Result:
//false
//false
//false
//false
//true
//true
//true
//false
//false
//false
//false
namespace Microsoft
namespace Microsoft.Extensions
namespace Microsoft.Extensions.Caching
namespace Microsoft.Extensions.Caching.Distributed
namespace Microsoft.Extensions.Caching.Memory
namespace MBrace
namespace MBrace.FsPickler
type IDistributedCache =
  member Get : key:string -> byte[]
  member GetAsync : key:string * ?token:CancellationToken -> Task<byte[]>
  member Refresh : key:string -> unit
  member RefreshAsync : key:string * ?token:CancellationToken -> Task
  member Remove : key:string -> unit
  member RemoveAsync : key:string * ?token:CancellationToken -> Task
  member Set : key:string * value:byte[] * options:DistributedCacheEntryOptions -> unit
  member SetAsync : key:string * value:byte[] * options:DistributedCacheEntryOptions * ?token:CancellationToken -> Task
val self : IDistributedCache
val serializer : ('a -> byte [])
val key : string
val value : 'a
val options : DistributedCacheEntryOptions
(extension) IDistributedCache.Set(key: string, value: byte []) : unit
IDistributedCache.Set(key: string, value: byte [], options: DistributedCacheEntryOptions) : unit
val deserializer : (byte [] -> 'T)
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

--------------------
type byte = System.Byte
IDistributedCache.Get(key: string) : byte []
union case Option.None: Option<'T>
Multiple items
val obj : byte []

--------------------
type obj = System.Object
union case Option.Some: Value: 'T -> Option<'T>
val token : System.Threading.CancellationToken
(extension) IDistributedCache.SetAsync(key: string, value: byte [],?token: System.Threading.CancellationToken) : System.Threading.Tasks.Task
IDistributedCache.SetAsync(key: string, value: byte [], options: DistributedCacheEntryOptions,?token: System.Threading.CancellationToken) : System.Threading.Tasks.Task
IDistributedCache.GetAsync(key: string,?token: System.Threading.CancellationToken) : System.Threading.Tasks.Task<byte []>
type obj = System.Object
val async : AsyncBuilder
Multiple items
type Async =
  static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
  static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
  static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
  static member AwaitTask : task:Task -> Async<unit>
  static member AwaitTask : task:Task<'T> -> Async<'T>
  static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
  static member CancelDefaultToken : unit -> unit
  static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
  static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
  static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
  ...

--------------------
type Async<'T> =
property Async.CancellationToken: Async<System.Threading.CancellationToken> with get
static member Async.AwaitTask : task:System.Threading.Tasks.Task -> Async<unit>
static member Async.AwaitTask : task:System.Threading.Tasks.Task<'T> -> Async<'T>
member IDistributedCache.SetValueAsync : serializer:('a -> byte []) * key:string * value:'a * options:DistributedCacheEntryOptions * token:System.Threading.CancellationToken -> System.Threading.Tasks.Task
member IDistributedCache.GetValueAsync : deserializer:(byte [] -> 'T) * key:string * token:System.Threading.CancellationToken -> System.Threading.Tasks.Task<(System.DateTimeOffset * int) option>
Multiple items
type Throttler =
  new : cache:IDistributedCache * cacheDuration:TimeSpan * maxRequests:int -> Throttler
  member Get : cacheKey:string -> bool
  member GetAsync : cacheKey:string -> Async<bool>

--------------------
new : cache:IDistributedCache * cacheDuration:System.TimeSpan * maxRequests:int -> Throttler
val cache : IDistributedCache
val cacheDuration : System.TimeSpan
val maxRequests : int
val serializer : BinarySerializer
type FsPickler =
  private new : unit -> FsPickler
  static member Clone : value:'T * ?pickler:Pickler<'T> * ?streamingContext:StreamingContext -> 'T
  static member ComputeHash : value:'T * ?hashFactory:IHashStreamFactory -> HashResult
  static member ComputeSize : value:'T * ?pickler:Pickler<'T> -> int64
  static member CreateBinarySerializer : ?forceLittleEndian:bool * ?typeConverter:ITypeNameConverter * ?picklerResolver:IPicklerResolver -> BinarySerializer
  static member CreateObjectSizeCounter : ?encoding:Encoding * ?resetInterval:int64 -> ObjectSizeCounter
  static member CreateXmlSerializer : ?typeConverter:ITypeNameConverter * ?indent:bool * ?picklerResolver:IPicklerResolver -> XmlSerializer
  static member EnsureSerializable : graph:'T * ?failOnCloneableOnlyTypes:bool -> unit
  static member GatherObjectsInGraph : graph:obj -> obj []
  static member GatherTypesInObjectGraph : graph:obj -> Type []
  ...
static member FsPickler.CreateBinarySerializer : ?forceLittleEndian:bool * ?typeConverter:ITypeNameConverter * ?picklerResolver:IPicklerResolver -> BinarySerializer
val agent : MailboxProcessor<string * AsyncReplyChannel<bool>>
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val loop : (unit -> Async<'a>)
val cacheKey : string
val reply : AsyncReplyChannel<bool>
type AsyncReplyChannel<'Reply> =
  member Reply : value:'Reply -> unit
type bool = System.Boolean
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val value : (System.DateTimeOffset * int) option
member IDistributedCache.GetAsyncValue : deserializer:(byte [] -> 'T) * key:string -> Async<(System.DateTimeOffset * int) option>
member FsPicklerSerializer.UnPickle : data:byte [] * ?pickler:Pickler<'T> * ?streamingContext:System.Runtime.Serialization.StreamingContext * ?encoding:System.Text.Encoding -> 'T
val count : int
val start : System.DateTimeOffset
val expiration : System.DateTimeOffset
namespace System
Multiple items
type DateTimeOffset =
  struct
    new : dateTime:DateTime -> DateTimeOffset + 5 overloads
    member Add : timeSpan:TimeSpan -> DateTimeOffset
    member AddDays : days:float -> DateTimeOffset
    member AddHours : hours:float -> DateTimeOffset
    member AddMilliseconds : milliseconds:float -> DateTimeOffset
    member AddMinutes : minutes:float -> DateTimeOffset
    member AddMonths : months:int -> DateTimeOffset
    member AddSeconds : seconds:float -> DateTimeOffset
    member AddTicks : ticks:int64 -> DateTimeOffset
    member AddYears : years:int -> DateTimeOffset
    ...
  end

--------------------
System.DateTimeOffset ()
System.DateTimeOffset(dateTime: System.DateTime) : System.DateTimeOffset
System.DateTimeOffset(ticks: int64, offset: System.TimeSpan) : System.DateTimeOffset
System.DateTimeOffset(dateTime: System.DateTime, offset: System.TimeSpan) : System.DateTimeOffset
System.DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, offset: System.TimeSpan) : System.DateTimeOffset
System.DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, offset: System.TimeSpan) : System.DateTimeOffset
System.DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, calendar: System.Globalization.Calendar, offset: System.TimeSpan) : System.DateTimeOffset
val count' : int
System.DateTimeOffset.Add(timeSpan: System.TimeSpan) : System.DateTimeOffset
val now : System.DateTimeOffset
property System.DateTimeOffset.UtcNow: System.DateTimeOffset with get
member IDistributedCache.SetAsyncValue : serializer:('a -> byte []) * key:string * value:'a * options:DistributedCacheEntryOptions -> Async<unit>
member FsPicklerSerializer.Pickle : value:'T * ?pickler:Pickler<'T> * ?streamingContext:System.Runtime.Serialization.StreamingContext * ?encoding:System.Text.Encoding -> byte []
Multiple items
type DistributedCacheEntryOptions =
  new : unit -> DistributedCacheEntryOptions
  member AbsoluteExpiration : Nullable<DateTimeOffset> with get, set
  member AbsoluteExpirationRelativeToNow : Nullable<TimeSpan> with get, set
  member SlidingExpiration : Nullable<TimeSpan> with get, set

--------------------
DistributedCacheEntryOptions() : DistributedCacheEntryOptions
member AsyncReplyChannel.Reply : value:'Reply -> unit
member MailboxProcessor.PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
val channel : AsyncReplyChannel<bool>
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
val options : obj
Multiple items
type MemoryDistributedCacheOptions =
  inherit MemoryCacheOptions
  new : unit -> MemoryDistributedCacheOptions

--------------------
MemoryDistributedCacheOptions() : MemoryDistributedCacheOptions
val cache : MemoryDistributedCache
Multiple items
type MemoryDistributedCache =
  new : optionsAccessor:IOptions<MemoryDistributedCacheOptions> -> MemoryDistributedCache + 1 overload
  member Get : key:string -> byte[]
  member GetAsync : key:string * ?token:CancellationToken -> Task<byte[]>
  member Refresh : key:string -> unit
  member RefreshAsync : key:string * ?token:CancellationToken -> Task
  member Remove : key:string -> unit
  member RemoveAsync : key:string * ?token:CancellationToken -> Task
  member Set : key:string * value:byte[] * options:DistributedCacheEntryOptions -> unit
  member SetAsync : key:string * value:byte[] * options:DistributedCacheEntryOptions * ?token:CancellationToken -> Task

--------------------
Multiple items
type TimeSpan =
  struct
    new : ticks:int64 -> TimeSpan + 3 overloads
    member Add : ts:TimeSpan -> TimeSpan
    member CompareTo : value:obj -> int + 1 overload
    member Days : int
    member Divide : divisor:float -> TimeSpan + 1 overload
    member Duration : unit -> TimeSpan
    member Equals : value:obj -> bool + 1 overload
    member GetHashCode : unit -> int
    member Hours : int
    member Milliseconds : int
    ...
  end

--------------------
System.TimeSpan ()
System.TimeSpan(ticks: int64) : System.TimeSpan
System.TimeSpan(hours: int, minutes: int, seconds: int) : System.TimeSpan
System.TimeSpan(days: int, hours: int, minutes: int, seconds: int) : System.TimeSpan
System.TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : System.TimeSpan
System.TimeSpan.FromSeconds(value: float) : System.TimeSpan
val throttler : Throttler
member Throttler.Get : cacheKey:string -> bool
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
namespace System.Threading
Multiple items
type Thread =
  inherit CriticalFinalizerObject
  new : start:ThreadStart -> Thread + 3 overloads
  member Abort : unit -> unit + 1 overload
  member ApartmentState : ApartmentState with get, set
  member CurrentCulture : CultureInfo with get, set
  member CurrentUICulture : CultureInfo with get, set
  member DisableComObjectEagerCleanup : unit -> unit
  member ExecutionContext : ExecutionContext
  member GetApartmentState : unit -> ApartmentState
  member GetCompressedStack : unit -> CompressedStack
  member GetHashCode : unit -> int
  ...

--------------------
System.Threading.Thread(start: System.Threading.ThreadStart) : System.Threading.Thread
System.Threading.Thread(start: System.Threading.ParameterizedThreadStart) : System.Threading.Thread
System.Threading.Thread(start: System.Threading.ThreadStart, maxStackSize: int) : System.Threading.Thread
System.Threading.Thread(start: System.Threading.ParameterizedThreadStart, maxStackSize: int) : System.Threading.Thread
System.Threading.Thread.Sleep(timeout: System.TimeSpan) : unit
System.Threading.Thread.Sleep(millisecondsTimeout: int) : unit
Next Version Raw view Test code New version

More information

Link:http://fssnip.net/86p
Posted:2 years ago
Author:toburger
Tags: asp.net , #cache , #throttling