This post describes how to implement two-way (TCP for instance) traffic control in F# by modifying slightly a Trolltech article available at http://doc.trolltech.com/qq/qq17-ratecontrol.html.

Remote Peer

type RemotePeer =
  interface
    abstract NumCanRead : int
    abstract NumCanWrite : int  
    abstract Read : int -> int
    abstract Write : int -> int
  end
   
let canReadFrom (peer:RemotePeer) = peer.NumCanRead > 0
let canWriteTo (peer:RemotePeer) = peer.NumCanWrite > 0 
let canTransferMore peer = canReadFrom peer || canWriteTo peer

NumCanRead and NumCanWrite return the number of bytes the peer can (respectively) send and receive from the client. We chose to use abstract definitions so that we can add some filter conditions. For instance, as in Trolltech’s article, we might not want to return a positive NumCanWrite if the peer’s buffer is already n-byte long where n would be, in this example, twice the upload limit.

Traffic control

open System.Diagnostics

let rec allocate (stopwatch:Stopwatch) bytesToWrite bytesToRead peers =
  //can we exchange anything ?
  if bytesToWrite > 0 || bytesToRead > 0 then
   
    //are there some peers who can exchange ?
    let pool = Seq.filter canTransferMore peers
    let ncandidates = Seq.length pool   
    if ncandidates > 0 then
      //restart the timer to reset the computation of the number of
      //bytes the client can exchange, see later
      stopwatch.Reset() 
   
      //number of bytes per peer if we were to split it evenly
     
      let toReadPerPeer = max 1 (bytesToRead / ncandidates)
      let mutable alreadyRead = 0
     
      let toWritePerPeer = max 1 (bytesToWrite / ncandidates)
      let mutable alreadyWritten = 0      
           
      for peer in pool do
        //the client reads the minimum og :
        // a) number fo bytes allocated per peer (from the bytes the client can exchange)
        // b) number of bytes the peer can exchange
        // c) number of bytes left
        // e.g. : we have 1 byte to allocate, and 2 peers which can exchange 5 bytes each
        // First peer :
        // a) max 1 (1 / 10) = 1 --- b) 5 --- c) 1 
        // Second peer :
        // a) max 1 (1 / 10) = 1 --- b) 5 --- c) 0 (because the first peer read the byte)
       
        if canReadFrom peer then         
          let toRead = min toReadPerPeer peer.NumCanRead |> min (bytesToRead - alreadyRead)
          if toRead > 0 then
            alreadyRead <- alreadyRead + peer.Read toRead
       
        if canWriteTo peer then
          let toWrite = min toWritePerPeer peer.NumCanWrite |> min (bytesToWrite - alreadyWritten)
          if toWrite > 0 then
            alreadyWritten <- alreadyWritten + peer.Write toWrite    
     
      //if we have managed to exchange data and there is more to exchange, we go on
      if (alreadyRead > 0 || alreadyWritten > 0) && (bytesToWrite > 0 || bytesToRead > 0) then
        allocate stopwatch (bytesToWrite - alreadyWritten) (bytesToRead - alreadyRead) peers 

let create() = MailboxProcessor.Start(fun inbox ->       
  let stopwatch =
    let sw = new Stopwatch()     
    sw.Start()
    sw
  let rec loop() =
    async
      { let! (maxWriteRate, maxReadRate, peers) = inbox.Receive()
        if not <| Seq.isEmpty peers then       
          let msecs = ref 1000
          if stopwatch.IsRunning then
            let elapsed = stopwatch.ElapsedMilliseconds
            if elapsed < 1000L then msecs := int stopwatch.ElapsedMilliseconds     
          let bytesToWrite = max 1 (maxWriteRate / 1000 * !msecs)           
          let bytesToRead = max 1 (maxReadRate / 1000 * !msecs)
          allocate stopwatch bytesToWrite bytesToRead peers  
        else
          stopwatch.Start()
        do! System.Threading.Thread.AsyncSleep 50
        return! loop()
      }
  loop()
 ) 

An example

 

let printPeer (x:RemotePeer) =
  printfn "RemotePeer {NumCanRead :%d; NumCanWrite : %d}" x.NumCanRead x.NumCanWrite
 
let myController = create()

let createRemotePeer i =
  let r = 750 * i |> ref
  let w = max 500 (3000 - 10 * i) |> ref
  { new RemotePeer with
      override x.NumCanRead = !r
      override x.NumCanWrite = !w
      override x.Read n =
        let tmp =  min !r n
        r := !r - tmp
        tmp
      override x.Write n =
        let tmp =  min !w n
        w := !w - tmp
        tmp     
  }

let nIterations = 3
 
let peers =
  List.init 5 createRemotePeer

let rec transfer i  =
  async
    { printfn "Iteration #%d" i
      let maxUploadRate = 2000
      let maxDownloadRate = 1000
      myController.Post (maxUploadRate, maxDownloadRate, peers) 
      do! System.Threading.Thread.AsyncSleep 3000
      List.iter printPeer peers
      printfn "========================================"
      if i < nIterations then return! transfer(i+1)
    }

List.iter printPeer peers

transfer 0 |> Async.Start

Comments are closed.