Once the Metainfo file can be read, the client ought to know which files compose the transfer, and how to connect to sources for the transfer. In this post, we will define the elements we need to perform the I/O operations, that is, to read data from (and write it to) the disk.

One important thing is that a BitTorrent transfer is considered as a stream of pieces. So, if you have a 100-byte file, and a 400-byte one, and if the piece size is 200-byte long, the data from the piece need to be appropriately split (both when reading and when writing).

In this library, we use a reference to the AsyncWorker described on Don Syme’s blog.

namespace Common

open System
open System.Threading
open System.IO
open Microsoft.FSharp.Control.WebExtensions

type AsyncWorker<'T>(jobs: seq<Async<'T>>) =

    // Capture the synchronization context to allow us to
    // raise events back on the GUI thread
    let syncContext =
      let x = System.Threading.SynchronizationContext.Current
      if x = null then new System.Threading.SynchronizationContext() else x

    // A standard helper to raise an event on the GUI thread
    let raiseEventOnGuiThread (event:Event<_>) args =
      syncContext.Post((fun _ -> event.Trigger args),state=null)

    // Each of these lines declares an F# event that we can raise
    let allCompleted  = new Event<'T[]>()
    let error         = new Event<System.Exception>()
    let canceled      = new Event<System.OperationCanceledException>()
    let jobCompleted  = new Event<int * 'T>()

    let cancellationCapability = new CancellationTokenSource() 

    /// Start an instance of the work
    member x.Start() =
        // Mark up the jobs with numbers
        let jobs = jobs |> Seq.mapi (fun i job -> (job,i+1))
        let work =
          Async.Parallel
            [ for (job,jobNumber) in jobs do
                yield async
                  { let! result = job
                    raiseEventOnGuiThread jobCompleted (jobNumber, result)
                    return result
                  }
            ]
        Async.StartWithContinuations
            ( work,
              (fun res -> raiseEventOnGuiThread allCompleted res),
              (fun exn -> raiseEventOnGuiThread error exn),
              (fun exn -> raiseEventOnGuiThread canceled exn ),
              cancellationCapability.Token
            )

    member x.CancelAsync() = cancellationCapability.Cancel()

    /// Raised when a particular job completes
    member x.JobCompleted  = jobCompleted.Publish

    /// Raised when all jobs complete
    member x.AllCompleted  = allCompleted.Publish

    /// Raised when the composition is cancelled successfully
    member x.Canceled   = canceled.Publish

    /// Raised when the composition exhibits an error
    member x.Error      = error.Publish

Signature

namespace BitTorrent

open Common

///Read data from the disk
type ReadRequest =
  { Piece : int
    Offset : int
    Length : int
  }
  static member create : piece:int -> offset:int -> length:int -> ReadRequest

///Write data to the disk
type WriteRequest =
  { Piece : int
    Offset : int
    Data : byte[]
  }
  static member create : piece:int -> offset:int -> data:byte[] -> WriteRequest

///A FileManager is used to compute and perform file I/O operations
///and to check the hashes of the pieces.
type FileManager =
  new : metainfo:Metainfo * ?destinationFolder:string -> FileManager
  member Metainfo : Metainfo
  member DestinationFolder: string

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module FileManager =
  type t = FileManager

  //-------------------------------------------------------------
  // Helper
  //-------------------------------------------------------------

  ///[fullPath fileManager nthFile] returns the full path to the [nthFile]
  ///of the [fileManager].
  val fullPath : fileManager:t -> nthFile:int -> string

  ///[asyncGenerateFile fileManager nth] creates the directory and the [nth]
  ///file of the [fileManager] unless they already exist.
  val asyncGenerateFile : fileManager:t -> i:int -> Async<string>

  ///[filesGenerator manager] returns an async worker which tracks
  ///the progress of the files generation process.
  val filesGenerator : fileManager:t -> AsyncWorker<string>

  //-------------------------------------------------------------
  // I/O operations
  //-------------------------------------------------------------

  ///[asyncRead fileManager readRequest] asynchronously returns a byte array containing
  ///the bytes read from the [fileManager] files considered as a stream. That is,
  ///[0] will refer to the first byte of the first file while [fileManager.Length - 1]
  ///will refer to the last byte of the last file of the [filemanager].
  val asyncRead : fileManager:t -> request:ReadRequest -> Async<byte[]>

  ///[requestsReader manager requests] returns an async worker which tracks
  ///the progress of the processing of the read requests.
  val requestsReader : fileManager:t -> readRequests:seq<ReadRequest> -> AsyncWorker<byte[]>

  ///[asyncWrite fileManager writeRequest] asynchronously writes the data from the [writeRequest]
  ///to the files from the [fileManager] which are considered as a stream. That is,
  ///[0] will refer to the first byte of the first file while [fileManager.Length - 1]
  ///will refer to the last byte of the last file of the [filemanager].
  val asyncWrite : fileManager:t -> request:WriteRequest -> Async<unit>

  ///[requestsWriter manager requests] returns an async worker which tracks
  ///the progress of the processing of the write requests.
  val requestsWriter : fileManager:t -> writeRequests:seq<WriteRequest> -> AsyncWorker<unit>

  ///[asyncVerifyPiece fileManager nth] asynchronously checks
  ///whether the [nth] SHA1 checksum from [fileManager.SHA1s]
  ///matches the SHA1 checksum of the [nth] piece from the [fileManager]
  ///read from the files on the hard drive.
  val asyncVerifyPiece : fileManager:t -> piece:int -> Async<bool>

  ///[piecesVerifier manager pieces] returns an async worker which tracks
  ///the progress of the pieces verification process.
  val piecesVerifier : fileManager:t -> pieces:seq<int> -> AsyncWorker<bool>

  ///[transferVerifier manager pieces] returns an async worker which tracks
  ///the progress of the whole transfer pieces verification process.
  val transferVerifier : fileManager:t -> AsyncWorker<bool>

Implementation

For the various actions, we define both a function, and an async worker so that we can either perform the operation when we need it or call a bunch of it while tracking the progress. This might be particularly useful when checking the hashes of the transfer files.

To perform I/O operations, we use a MailboxProcessor to read from the disk, and another one to write the data. This is meant to avoid starting the increase of CPU usage which could result from starting an asynchronous operation per read or write request.

We also test the validity of each request to make sure that its parameters are consistent with the file information. If they are not, an exception is raised and, in the following example, ignored by the I/O agents.

namespace BitTorrent

//---------------------------------------------------------------
// LIBRARIES
//---------------------------------------------------------------

open System
open System.IO
open System.Threading

open Common
open Common.IO

open BitTorrent
open BitTorrent.BValue

//---------------------------------------------------------------
// FILE MANAGER
//---------------------------------------------------------------

type ReadRequest =
  { Piece : int
    Offset : int
    Length : int
  }
  static member create p o l =
    { Piece = p
      Offset  = o
      Length  = l
    }

type WriteRequest =
  { Piece : int
    Offset : int
    Data : byte[]
  }
  static member create p o d =
    { Piece = p
      Offset  = o
      Data  = d
    }

type FileManager( metainfo:Metainfo, ?destinationFolder ) =

  let mutable destinationFolder = destinationFolder
  let defaultFolder =
    Path.Combine(App.Settings.DownloadsDirectory, metainfo.BaseDirectory)

  member x.Metainfo = metainfo

  member x.DestinationFolder
    with get() =
      let baseDir =
        match destinationFolder with
        | Some folder -> folder
        | None -> App.Settings.DownloadsDirectory
      Filename.concat baseDir x.Metainfo.BaseDirectory 

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module FileManager =

  type t = FileManager

  let fullPath (fileManager:t) filePos =
    Filename.concat fileManager.DestinationFolder fileManager.Metainfo.Files.[filePos].Path

  let asyncGenerateFile (fileManager:t) i =
    async
      { let path = fullPath fileManager i
        Path.GetDirectoryName path |> Directory.create
        let len = fileManager.Metainfo.Files.[i].Length
        let overwrite = File.Exists path && File.length path <> len
        File.create path len overwrite
        return path
      }

  let filesGenerator (fileManager:t) =
    new AsyncWorker<_>(
      Array.init fileManager.Metainfo.Files.Length (asyncGenerateFile fileManager)
    )

  //-------------------------------------------------------------
  // I/O operations
  //-------------------------------------------------------------

  let coordinatesToStartIdx (fileManager:t) piece offset =
    int64 piece * (Metainfo.pieceLength piece fileManager.Metainfo) + int64 offset

  let locks : ref<File.LockStore> = ref Map.empty

  //
  // reading
  //

  let testReadRequest (fileManager:t) (req:ReadRequest) =
    if req.Piece < 0 then invalidArg "readRequest" "piece was negative"  req
    if req.Piece >= fileManager.Metainfo.SHA1s.Length then
      invalidArg "readRequest" "piece was out of bounds"  req
    if req.Offset < 0 then invalidArg "readRequest" "offset was negative"  req
    if req.Length < 0 then invalidArg "readRequest" "length was negative"  req
    if int64 (req.Offset + req.Length) > Metainfo.pieceLength req.Piece fileManager.Metainfo then
      invalidArg "readRequest" "out of bounds request"  req

  let rec asyncReadRange (fileManager:t) (ms:MemoryStream) start toRead currentFile currentPos =
    async
      { if toRead > 0 && currentFile < fileManager.Metainfo.Files.Length then
          let file = fileManager.Metainfo.Files.[currentFile]
          let nextFile = currentFile + 1
          let nextPos = currentPos + file.Length
          if currentPos + file.Length > start then
            let path = fullPath fileManager currentFile
            let! ok = File.asyncAcquireLock locks path
            use! fs = File.AsyncOpenRead path
            fs.Seek(start - currentPos, SeekOrigin.Begin) |> ignore
            let nToEndOfFile = file.Length - fs.Position
            let n = min toRead (int nToEndOfFile)
            if n > 0 then
              let! dataRead = fs.AsyncRead(n)
              if dataRead.Length <> n then
                invalidArg "file" ("could not read enough data from " + path)
              do! ms.AsyncWrite(dataRead, 0, dataRead.Length)
            try File.releaseLock !locks path with _ -> ()
            return! asyncReadRange fileManager ms (start + int64 n) (toRead - n) nextFile nextPos
          else
            return! asyncReadRange fileManager ms start toRead nextFile nextPos
        else
          return ms.ToArray()
      }

  let readerAgent = MailboxProcessor<_>.Start(fun inbox ->
    async
      { while true do
          let! ((replyChannel:AsyncReplyChannel<_>), fileManager, req) = inbox.Receive()
          try
            testReadRequest fileManager req
            use memoryStream = new MemoryStream()
            let start = coordinatesToStartIdx fileManager req.Piece req.Offset
            let! readBytes = asyncReadRange fileManager memoryStream start req.Length 0 0L
            replyChannel.Reply readBytes
          with e -> replyChannel.Reply [||]
          do! Async.Sleep 10
      }
  )

  let asyncRead (fileManager:t) req =
    readerAgent.PostAndAsyncReply (fun replyChannel -> (replyChannel, fileManager, req))

  let requestsReader fileManager reqs =
    new AsyncWorker<_>(reqs |> Seq.map (asyncRead fileManager))

  //
  // writing
  //

  let testWriteRequest (fileManager:t) (req:WriteRequest) =
    if req.Piece < 0 then invalidArg "writeRequest" "piece was negative"  req
    if req.Piece >= fileManager.Metainfo.SHA1s.Length then
      invalidArg "writeRequest" "piece was out of bounds"  req
    if req.Offset < 0 then invalidArg "writeRequest" "offset was negative"  req

  let rec asyncWriteRange (fileManager:t) data start toWrite written currentFile currentPos =
    async
      { if toWrite > 0 && currentFile < fileManager.Metainfo.Files.Length then
          let file = fileManager.Metainfo.Files.[currentFile]
          let nextFile = currentFile + 1
          let nextPos = currentPos + file.Length

          if currentPos + file.Length > start then
            let path = fullPath fileManager currentFile

            let! ok = File.asyncAcquireLock locks path
            use! fs = File.AsyncOpenWrite path
            fs.Seek(start - currentPos, SeekOrigin.Begin) |> ignore
            let nToEndOfFile = file.Length - fs.Position
            let n = min toWrite (int nToEndOfFile)
            if n > 0 then
              try do! fs.AsyncWrite(data, written, n)
              with _ -> invalidArg "file" ("data could not be written to " + path)
            try File.releaseLock !locks path with _ -> ()
            //update arguments
            let start = start + int64 n
            let toWrite = toWrite - n
            let written = written + n
            return! asyncWriteRange fileManager data start toWrite written nextFile nextPos
          else
            return! asyncWriteRange fileManager data start toWrite written nextFile nextPos
      }

  let writerAgent = MailboxProcessor<_>.Start(fun inbox ->
    async
      { while true do
          let! ((replyChannel:AsyncReplyChannel<_>), fileManager, req) = inbox.Receive()
          try
            testWriteRequest fileManager req
            let start = coordinatesToStartIdx fileManager req.Piece req.Offset
            do! asyncWriteRange fileManager req.Data start req.Data.Length 0 0 0L
          with _ -> ()
          replyChannel.Reply ()
          do! Async.Sleep 10
      }
  )

  let asyncWrite (fileManager:t) req =
    writerAgent.PostAndAsyncReply (fun replyChannel -> (replyChannel, fileManager, req))

  let requestsWriter fileManager reqs =
    new AsyncWorker<_>(reqs |> Seq.map (asyncWrite fileManager))

  //
  // hash checking
  //

  let asyncVerifyPiece (fileManager:t) piece =
    async
      { if piece >= fileManager.Metainfo.SHA1s.Length then
          invalidArg "piece" "piece index is out of bounds" piece
        let len = Metainfo.pieceLength piece fileManager.Metainfo |> int
        let req : ReadRequest =
          { Piece = piece
            Offset = 0
            Length = len
          }
        let! pieceToCheck = asyncRead fileManager req
        return Digest.SHA1.hexdigest pieceToCheck  = fileManager.Metainfo.SHA1s.[piece]
      }

  let piecesVerifier fileManager pieces =
    let tasks = [for piece in pieces -> asyncVerifyPiece fileManager piece]
    new AsyncWorker<_>(tasks)

  let transferVerifier fileManager =
    piecesVerifier fileManager [0..fileManager.Metainfo.SHA1s.Length - 1]

Example

let destinationFolder = @"C:\test"
let torrentFile = System.IO.Path.Combine(destinationFolder, "openofficefr.torrent")
let metainfo = Metainfo.fromFile torrentFile
let fileManager = FileManager(metainfo, destinationFolder)

for i in 0 .. fileManager.Metainfo.Files.Length - 1 do
  let file = fileManager.Metainfo.Files.[i]
  let path = FileManager.fullPath fileManager i
  printfn "file : %A" file
  printfn "path : %A"  path
  printfn "================" 

let gen = FileManager.filesGenerator fileManager
gen.JobCompleted.Add(fun (jobNumber, path) -> printfn "created file with path : %A" path)
gen.AllCompleted.Add(fun paths -> printfn "all files created : %A" paths)
gen.Start()

let task =
  async
    { let! bytes = FileManager.asyncRead fileManager {Piece = 7; Offset = 0 ; Length = 34840}
      printfn "before  :%A" bytes

      do! FileManager.asyncWrite fileManager {Piece = 7; Offset = 0 ; Data = [|for i in 1 .. 34840 -> i &&& 255 |> byte|] } 

      let! bytes = FileManager.asyncRead fileManager {Piece = 7; Offset = 0 ; Length = 34840}
      printfn "after : %A" bytes

      let! ok = FileManager.asyncVerifyPiece fileManager 0
      printfn "is ok ? %A" ok
    }
Async.RunSynchronously task    

let pv = FileManager.piecesVerifier fileManager [0..3..17]
pv.JobCompleted.Add(fun (jobNumber, res) -> printfn "is piece %d ok ? %A" jobNumber res)
pv.AllCompleted.Add(fun ress -> printfn "all files created : %A" ress)
pv.Start()

let tv = FileManager.transferVerifier fileManager
tv.JobCompleted.Add(fun (jobNumber, res) -> printfn "is piece %d ok ? %A" jobNumber res)
tv.AllCompleted.Add(fun ress -> printfn "all files created : %A" ress)
tv.Start()

Comments are closed.