We had previously given a way to implement two-way traffic control on a strict basis using a function which looped constantly to see if any data was available o read/write, and which processed it accordingly.
The following approach is based on a per-request basis. That is, an actors asks for n tokens to read (or write), and the manager sends it an answer when the actor is allowed perform the action. In effect, if the actor asks for 100 bytes and only 50 bytes can be exchanged per second, it will get a green light only after two seconds have elapsed.
open System type RateControllerRequest = R of (AsyncReplyChannel<bool> * int) type RateController(?rate, ?areTokensValid) = let mutable rate = defaultArg rate 10240 let mutable lastTime = DateTime.Now let mutable tokensInBucket = 0 let areTokensValid = defaultArg areTokensValid (fun tokens -> true) let tokensToAddPerRound rate elapsed = let elapsed = min 1000 elapsed rate / 1000 * elapsed let controller = MailboxProcessor.Start(fun inbox -> let rec loop() = async { let! R (replyChannel, tokens) = inbox.Receive() let canHandle = areTokensValid tokens if canHandle then //Add new tokens to the bucket based on elapsed time let elapsed = (System.DateTime.Now - lastTime).TotalMilliseconds |> int if tokensInBucket < rate then let toAdd = tokensToAddPerRound rate elapsed |> min rate tokensInBucket <- tokensInBucket + toAdd //if there aren't enough tokens, wait let tokensNeeded = tokens - tokensInBucket if tokensNeeded > 0 then let toWait = tokensNeeded * 1000 / rate do! Async.Sleep toWait tokensInBucket <- 0 else tokensInBucket <- tokensInBucket - tokens lastTime <- System.DateTime.Now replyChannel.Reply(canHandle) return! loop() } loop() ) member x.Rate with get() = rate and set(v) = rate <- max v 0 member x.AsyncRequest(n) = controller.PostAndAsyncReply (fun replyChannel -> R (replyChannel, n)) member x.Reset() = tokensInBucket <- 0 lastTime <- DateTime.Now [<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>] module RateController = type t = RateController let create() = RateController() let reset (controller:t) = controller.Reset() let asyncRequest tokens (controller:t) = controller.AsyncRequest(tokens)