We had previously given a way to implement two-way traffic control on a strict basis using a function which looped constantly to see if any data was available o read/write, and which processed it accordingly.

The following approach is based on a per-request basis. That is, an actors asks for n tokens to read (or write), and the manager sends it an answer when the actor is allowed perform the action. In effect, if the actor asks for 100 bytes and only 50 bytes can be exchanged per second, it will get a green light only after two seconds have elapsed.


open System

type RateControllerRequest = R of (AsyncReplyChannel<bool> * int)

type RateController(?rate, ?areTokensValid) =
  let mutable rate = defaultArg rate 10240
  let mutable lastTime = DateTime.Now
  let mutable tokensInBucket = 0
  let areTokensValid = defaultArg areTokensValid (fun tokens -> true)

  let tokensToAddPerRound rate elapsed =
    let elapsed = min 1000 elapsed
    rate / 1000 * elapsed

  let controller =
    MailboxProcessor.Start(fun inbox ->
      let rec loop() =
        async {
          let! R (replyChannel, tokens) = inbox.Receive()
          let canHandle = areTokensValid tokens
          if canHandle then
            //Add new tokens to the bucket based on elapsed time
            let elapsed = (System.DateTime.Now - lastTime).TotalMilliseconds |> int
            if tokensInBucket < rate then
              let toAdd = tokensToAddPerRound rate elapsed |> min rate
              tokensInBucket <- tokensInBucket + toAdd
            //if there aren't enough tokens, wait
            let tokensNeeded = tokens - tokensInBucket
            if tokensNeeded > 0  then
              let toWait = tokensNeeded * 1000 / rate
              do! Async.Sleep toWait
              tokensInBucket <- 0
            else
              tokensInBucket <- tokensInBucket - tokens
            lastTime <- System.DateTime.Now
          replyChannel.Reply(canHandle)
          return! loop()
        }
      loop()
    )

  member x.Rate
    with get() = rate
    and set(v) = rate <- max v 0

  member x.AsyncRequest(n) =
    controller.PostAndAsyncReply (fun replyChannel -> R (replyChannel, n))

  member x.Reset() =
    tokensInBucket <- 0
    lastTime <- DateTime.Now

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module RateController =   

  type t = RateController

  let create() = RateController()

  let reset (controller:t) = controller.Reset()

  let asyncRequest tokens (controller:t) = controller.AsyncRequest(tokens)

Comments are closed.