This post describes how to implement two-way (TCP for instance) traffic control in F# by modifying slightly a Trolltech article available at http://doc.trolltech.com/qq/qq17-ratecontrol.html.
Remote Peer
type RemotePeer = interface abstract NumCanRead : int abstract NumCanWrite : int abstract Read : int -> int abstract Write : int -> int end let canReadFrom (peer:RemotePeer) = peer.NumCanRead > 0 let canWriteTo (peer:RemotePeer) = peer.NumCanWrite > 0 let canTransferMore peer = canReadFrom peer || canWriteTo peer
NumCanRead and NumCanWrite return the number of bytes the peer can (respectively) send and receive from the client. We chose to use abstract definitions so that we can add some filter conditions. For instance, as in Trolltech’s article, we might not want to return a positive NumCanWrite if the peer’s buffer is already n-byte long where n would be, in this example, twice the upload limit.
Traffic control
open System.Diagnostics
let rec allocate (stopwatch:Stopwatch) bytesToWrite bytesToRead peers =
//can we exchange anything ?
if bytesToWrite > 0 || bytesToRead > 0 then
//are there some peers who can exchange ?
let pool = Seq.filter canTransferMore peers
let ncandidates = Seq.length pool
if ncandidates > 0 then
//restart the timer to reset the computation of the number of
//bytes the client can exchange, see later
stopwatch.Reset()
//number of bytes per peer if we were to split it evenly
let toReadPerPeer = max 1 (bytesToRead / ncandidates)
let mutable alreadyRead = 0
let toWritePerPeer = max 1 (bytesToWrite / ncandidates)
let mutable alreadyWritten = 0
for peer in pool do
//the client reads the minimum og :
// a) number fo bytes allocated per peer (from the bytes the client can exchange)
// b) number of bytes the peer can exchange
// c) number of bytes left
// e.g. : we have 1 byte to allocate, and 2 peers which can exchange 5 bytes each
// First peer :
// a) max 1 (1 / 10) = 1 --- b) 5 --- c) 1
// Second peer :
// a) max 1 (1 / 10) = 1 --- b) 5 --- c) 0 (because the first peer read the byte)
if canReadFrom peer then
let toRead = min toReadPerPeer peer.NumCanRead |> min (bytesToRead - alreadyRead)
if toRead > 0 then
alreadyRead <- alreadyRead + peer.Read toRead
if canWriteTo peer then
let toWrite = min toWritePerPeer peer.NumCanWrite |> min (bytesToWrite - alreadyWritten)
if toWrite > 0 then
alreadyWritten <- alreadyWritten + peer.Write toWrite
//if we have managed to exchange data and there is more to exchange, we go on
if (alreadyRead > 0 || alreadyWritten > 0) && (bytesToWrite > 0 || bytesToRead > 0) then
allocate stopwatch (bytesToWrite - alreadyWritten) (bytesToRead - alreadyRead) peers
let create() = MailboxProcessor.Start(fun inbox ->
let stopwatch =
let sw = new Stopwatch()
sw.Start()
sw
let rec loop() =
async
{ let! (maxWriteRate, maxReadRate, peers) = inbox.Receive()
if not <| Seq.isEmpty peers then
let msecs = ref 1000
if stopwatch.IsRunning then
let elapsed = stopwatch.ElapsedMilliseconds
if elapsed < 1000L then msecs := int stopwatch.ElapsedMilliseconds
let bytesToWrite = max 1 (maxWriteRate / 1000 * !msecs)
let bytesToRead = max 1 (maxReadRate / 1000 * !msecs)
allocate stopwatch bytesToWrite bytesToRead peers
else
stopwatch.Start()
do! System.Threading.Thread.AsyncSleep 50
return! loop()
}
loop()
)
An example
let printPeer (x:RemotePeer) =
printfn "RemotePeer {NumCanRead :%d; NumCanWrite : %d}" x.NumCanRead x.NumCanWrite
let myController = create()
let createRemotePeer i =
let r = 750 * i |> ref
let w = max 500 (3000 - 10 * i) |> ref
{ new RemotePeer with
override x.NumCanRead = !r
override x.NumCanWrite = !w
override x.Read n =
let tmp = min !r n
r := !r - tmp
tmp
override x.Write n =
let tmp = min !w n
w := !w - tmp
tmp
}
let nIterations = 3
let peers =
List.init 5 createRemotePeer
let rec transfer i =
async
{ printfn "Iteration #%d" i
let maxUploadRate = 2000
let maxDownloadRate = 1000
myController.Post (maxUploadRate, maxDownloadRate, peers)
do! System.Threading.Thread.AsyncSleep 3000
List.iter printPeer peers
printfn "========================================"
if i < nIterations then return! transfer(i+1)
}
List.iter printPeer peers
transfer 0 |> Async.Start