This post describes how to implement two-way (TCP for instance) traffic control in F# by modifying slightly a Trolltech article available at http://doc.trolltech.com/qq/qq17-ratecontrol.html.
Remote Peer
type RemotePeer = interface abstract NumCanRead : int abstract NumCanWrite : int abstract Read : int -> int abstract Write : int -> int end let canReadFrom (peer:RemotePeer) = peer.NumCanRead > 0 let canWriteTo (peer:RemotePeer) = peer.NumCanWrite > 0 let canTransferMore peer = canReadFrom peer || canWriteTo peer
NumCanRead and NumCanWrite return the number of bytes the peer can (respectively) send and receive from the client. We chose to use abstract definitions so that we can add some filter conditions. For instance, as in Trolltech’s article, we might not want to return a positive NumCanWrite if the peer’s buffer is already n-byte long where n would be, in this example, twice the upload limit.
Traffic control
open System.Diagnostics let rec allocate (stopwatch:Stopwatch) bytesToWrite bytesToRead peers = //can we exchange anything ? if bytesToWrite > 0 || bytesToRead > 0 then //are there some peers who can exchange ? let pool = Seq.filter canTransferMore peers let ncandidates = Seq.length pool if ncandidates > 0 then //restart the timer to reset the computation of the number of //bytes the client can exchange, see later stopwatch.Reset() //number of bytes per peer if we were to split it evenly let toReadPerPeer = max 1 (bytesToRead / ncandidates) let mutable alreadyRead = 0 let toWritePerPeer = max 1 (bytesToWrite / ncandidates) let mutable alreadyWritten = 0 for peer in pool do //the client reads the minimum og : // a) number fo bytes allocated per peer (from the bytes the client can exchange) // b) number of bytes the peer can exchange // c) number of bytes left // e.g. : we have 1 byte to allocate, and 2 peers which can exchange 5 bytes each // First peer : // a) max 1 (1 / 10) = 1 --- b) 5 --- c) 1 // Second peer : // a) max 1 (1 / 10) = 1 --- b) 5 --- c) 0 (because the first peer read the byte) if canReadFrom peer then let toRead = min toReadPerPeer peer.NumCanRead |> min (bytesToRead - alreadyRead) if toRead > 0 then alreadyRead <- alreadyRead + peer.Read toRead if canWriteTo peer then let toWrite = min toWritePerPeer peer.NumCanWrite |> min (bytesToWrite - alreadyWritten) if toWrite > 0 then alreadyWritten <- alreadyWritten + peer.Write toWrite //if we have managed to exchange data and there is more to exchange, we go on if (alreadyRead > 0 || alreadyWritten > 0) && (bytesToWrite > 0 || bytesToRead > 0) then allocate stopwatch (bytesToWrite - alreadyWritten) (bytesToRead - alreadyRead) peers let create() = MailboxProcessor.Start(fun inbox -> let stopwatch = let sw = new Stopwatch() sw.Start() sw let rec loop() = async { let! (maxWriteRate, maxReadRate, peers) = inbox.Receive() if not <| Seq.isEmpty peers then let msecs = ref 1000 if stopwatch.IsRunning then let elapsed = stopwatch.ElapsedMilliseconds if elapsed < 1000L then msecs := int stopwatch.ElapsedMilliseconds let bytesToWrite = max 1 (maxWriteRate / 1000 * !msecs) let bytesToRead = max 1 (maxReadRate / 1000 * !msecs) allocate stopwatch bytesToWrite bytesToRead peers else stopwatch.Start() do! System.Threading.Thread.AsyncSleep 50 return! loop() } loop() )
An example
let printPeer (x:RemotePeer) = printfn "RemotePeer {NumCanRead :%d; NumCanWrite : %d}" x.NumCanRead x.NumCanWrite let myController = create() let createRemotePeer i = let r = 750 * i |> ref let w = max 500 (3000 - 10 * i) |> ref { new RemotePeer with override x.NumCanRead = !r override x.NumCanWrite = !w override x.Read n = let tmp = min !r n r := !r - tmp tmp override x.Write n = let tmp = min !w n w := !w - tmp tmp } let nIterations = 3 let peers = List.init 5 createRemotePeer let rec transfer i = async { printfn "Iteration #%d" i let maxUploadRate = 2000 let maxDownloadRate = 1000 myController.Post (maxUploadRate, maxDownloadRate, peers) do! System.Threading.Thread.AsyncSleep 3000 List.iter printPeer peers printfn "========================================" if i < nIterations then return! transfer(i+1) } List.iter printPeer peers transfer 0 |> Async.Start