Once the Metainfo file can be read, the client ought to know which files compose the transfer, and how to connect to sources for the transfer. In this post, we will define the elements we need to perform the I/O operations, that is, to read data from (and write it to) the disk.
One important thing is that a BitTorrent transfer is considered as a stream of pieces. So, if you have a 100-byte file, and a 400-byte one, and if the piece size is 200-byte long, the data from the piece need to be appropriately split (both when reading and when writing).
In this library, we use a reference to the AsyncWorker described on Don Syme’s blog.
namespace Common open System open System.Threading open System.IO open Microsoft.FSharp.Control.WebExtensions type AsyncWorker<'T>(jobs: seq<Async<'T>>) = // Capture the synchronization context to allow us to // raise events back on the GUI thread let syncContext = let x = System.Threading.SynchronizationContext.Current if x = null then new System.Threading.SynchronizationContext() else x // A standard helper to raise an event on the GUI thread let raiseEventOnGuiThread (event:Event<_>) args = syncContext.Post((fun _ -> event.Trigger args),state=null) // Each of these lines declares an F# event that we can raise let allCompleted = new Event<'T[]>() let error = new Event<System.Exception>() let canceled = new Event<System.OperationCanceledException>() let jobCompleted = new Event<int * 'T>() let cancellationCapability = new CancellationTokenSource() /// Start an instance of the work member x.Start() = // Mark up the jobs with numbers let jobs = jobs |> Seq.mapi (fun i job -> (job,i+1)) let work = Async.Parallel [ for (job,jobNumber) in jobs do yield async { let! result = job raiseEventOnGuiThread jobCompleted (jobNumber, result) return result } ] Async.StartWithContinuations ( work, (fun res -> raiseEventOnGuiThread allCompleted res), (fun exn -> raiseEventOnGuiThread error exn), (fun exn -> raiseEventOnGuiThread canceled exn ), cancellationCapability.Token ) member x.CancelAsync() = cancellationCapability.Cancel() /// Raised when a particular job completes member x.JobCompleted = jobCompleted.Publish /// Raised when all jobs complete member x.AllCompleted = allCompleted.Publish /// Raised when the composition is cancelled successfully member x.Canceled = canceled.Publish /// Raised when the composition exhibits an error member x.Error = error.Publish
Signature
namespace BitTorrent open Common ///Read data from the disk type ReadRequest = { Piece : int Offset : int Length : int } static member create : piece:int -> offset:int -> length:int -> ReadRequest ///Write data to the disk type WriteRequest = { Piece : int Offset : int Data : byte[] } static member create : piece:int -> offset:int -> data:byte[] -> WriteRequest ///A FileManager is used to compute and perform file I/O operations ///and to check the hashes of the pieces. type FileManager = new : metainfo:Metainfo * ?destinationFolder:string -> FileManager member Metainfo : Metainfo member DestinationFolder: string [<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>] module FileManager = type t = FileManager //------------------------------------------------------------- // Helper //------------------------------------------------------------- ///[fullPath fileManager nthFile] returns the full path to the [nthFile] ///of the [fileManager]. val fullPath : fileManager:t -> nthFile:int -> string ///[asyncGenerateFile fileManager nth] creates the directory and the [nth] ///file of the [fileManager] unless they already exist. val asyncGenerateFile : fileManager:t -> i:int -> Async<string> ///[filesGenerator manager] returns an async worker which tracks ///the progress of the files generation process. val filesGenerator : fileManager:t -> AsyncWorker<string> //------------------------------------------------------------- // I/O operations //------------------------------------------------------------- ///[asyncRead fileManager readRequest] asynchronously returns a byte array containing ///the bytes read from the [fileManager] files considered as a stream. That is, ///[0] will refer to the first byte of the first file while [fileManager.Length - 1] ///will refer to the last byte of the last file of the [filemanager]. val asyncRead : fileManager:t -> request:ReadRequest -> Async<byte[]> ///[requestsReader manager requests] returns an async worker which tracks ///the progress of the processing of the read requests. val requestsReader : fileManager:t -> readRequests:seq<ReadRequest> -> AsyncWorker<byte[]> ///[asyncWrite fileManager writeRequest] asynchronously writes the data from the [writeRequest] ///to the files from the [fileManager] which are considered as a stream. That is, ///[0] will refer to the first byte of the first file while [fileManager.Length - 1] ///will refer to the last byte of the last file of the [filemanager]. val asyncWrite : fileManager:t -> request:WriteRequest -> Async<unit> ///[requestsWriter manager requests] returns an async worker which tracks ///the progress of the processing of the write requests. val requestsWriter : fileManager:t -> writeRequests:seq<WriteRequest> -> AsyncWorker<unit> ///[asyncVerifyPiece fileManager nth] asynchronously checks ///whether the [nth] SHA1 checksum from [fileManager.SHA1s] ///matches the SHA1 checksum of the [nth] piece from the [fileManager] ///read from the files on the hard drive. val asyncVerifyPiece : fileManager:t -> piece:int -> Async<bool> ///[piecesVerifier manager pieces] returns an async worker which tracks ///the progress of the pieces verification process. val piecesVerifier : fileManager:t -> pieces:seq<int> -> AsyncWorker<bool> ///[transferVerifier manager pieces] returns an async worker which tracks ///the progress of the whole transfer pieces verification process. val transferVerifier : fileManager:t -> AsyncWorker<bool>
Implementation
For the various actions, we define both a function, and an async worker so that we can either perform the operation when we need it or call a bunch of it while tracking the progress. This might be particularly useful when checking the hashes of the transfer files.
To perform I/O operations, we use a MailboxProcessor to read from the disk, and another one to write the data. This is meant to avoid starting the increase of CPU usage which could result from starting an asynchronous operation per read or write request.
We also test the validity of each request to make sure that its parameters are consistent with the file information. If they are not, an exception is raised and, in the following example, ignored by the I/O agents.
namespace BitTorrent //--------------------------------------------------------------- // LIBRARIES //--------------------------------------------------------------- open System open System.IO open System.Threading open Common open Common.IO open BitTorrent open BitTorrent.BValue //--------------------------------------------------------------- // FILE MANAGER //--------------------------------------------------------------- type ReadRequest = { Piece : int Offset : int Length : int } static member create p o l = { Piece = p Offset = o Length = l } type WriteRequest = { Piece : int Offset : int Data : byte[] } static member create p o d = { Piece = p Offset = o Data = d } type FileManager( metainfo:Metainfo, ?destinationFolder ) = let mutable destinationFolder = destinationFolder let defaultFolder = Path.Combine(App.Settings.DownloadsDirectory, metainfo.BaseDirectory) member x.Metainfo = metainfo member x.DestinationFolder with get() = let baseDir = match destinationFolder with | Some folder -> folder | None -> App.Settings.DownloadsDirectory Filename.concat baseDir x.Metainfo.BaseDirectory [<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>] module FileManager = type t = FileManager let fullPath (fileManager:t) filePos = Filename.concat fileManager.DestinationFolder fileManager.Metainfo.Files.[filePos].Path let asyncGenerateFile (fileManager:t) i = async { let path = fullPath fileManager i Path.GetDirectoryName path |> Directory.create let len = fileManager.Metainfo.Files.[i].Length let overwrite = File.Exists path && File.length path <> len File.create path len overwrite return path } let filesGenerator (fileManager:t) = new AsyncWorker<_>( Array.init fileManager.Metainfo.Files.Length (asyncGenerateFile fileManager) ) //------------------------------------------------------------- // I/O operations //------------------------------------------------------------- let coordinatesToStartIdx (fileManager:t) piece offset = int64 piece * (Metainfo.pieceLength piece fileManager.Metainfo) + int64 offset let locks : ref<File.LockStore> = ref Map.empty // // reading // let testReadRequest (fileManager:t) (req:ReadRequest) = if req.Piece < 0 then invalidArg "readRequest" "piece was negative" req if req.Piece >= fileManager.Metainfo.SHA1s.Length then invalidArg "readRequest" "piece was out of bounds" req if req.Offset < 0 then invalidArg "readRequest" "offset was negative" req if req.Length < 0 then invalidArg "readRequest" "length was negative" req if int64 (req.Offset + req.Length) > Metainfo.pieceLength req.Piece fileManager.Metainfo then invalidArg "readRequest" "out of bounds request" req let rec asyncReadRange (fileManager:t) (ms:MemoryStream) start toRead currentFile currentPos = async { if toRead > 0 && currentFile < fileManager.Metainfo.Files.Length then let file = fileManager.Metainfo.Files.[currentFile] let nextFile = currentFile + 1 let nextPos = currentPos + file.Length if currentPos + file.Length > start then let path = fullPath fileManager currentFile let! ok = File.asyncAcquireLock locks path use! fs = File.AsyncOpenRead path fs.Seek(start - currentPos, SeekOrigin.Begin) |> ignore let nToEndOfFile = file.Length - fs.Position let n = min toRead (int nToEndOfFile) if n > 0 then let! dataRead = fs.AsyncRead(n) if dataRead.Length <> n then invalidArg "file" ("could not read enough data from " + path) do! ms.AsyncWrite(dataRead, 0, dataRead.Length) try File.releaseLock !locks path with _ -> () return! asyncReadRange fileManager ms (start + int64 n) (toRead - n) nextFile nextPos else return! asyncReadRange fileManager ms start toRead nextFile nextPos else return ms.ToArray() } let readerAgent = MailboxProcessor<_>.Start(fun inbox -> async { while true do let! ((replyChannel:AsyncReplyChannel<_>), fileManager, req) = inbox.Receive() try testReadRequest fileManager req use memoryStream = new MemoryStream() let start = coordinatesToStartIdx fileManager req.Piece req.Offset let! readBytes = asyncReadRange fileManager memoryStream start req.Length 0 0L replyChannel.Reply readBytes with e -> replyChannel.Reply [||] do! Async.Sleep 10 } ) let asyncRead (fileManager:t) req = readerAgent.PostAndAsyncReply (fun replyChannel -> (replyChannel, fileManager, req)) let requestsReader fileManager reqs = new AsyncWorker<_>(reqs |> Seq.map (asyncRead fileManager)) // // writing // let testWriteRequest (fileManager:t) (req:WriteRequest) = if req.Piece < 0 then invalidArg "writeRequest" "piece was negative" req if req.Piece >= fileManager.Metainfo.SHA1s.Length then invalidArg "writeRequest" "piece was out of bounds" req if req.Offset < 0 then invalidArg "writeRequest" "offset was negative" req let rec asyncWriteRange (fileManager:t) data start toWrite written currentFile currentPos = async { if toWrite > 0 && currentFile < fileManager.Metainfo.Files.Length then let file = fileManager.Metainfo.Files.[currentFile] let nextFile = currentFile + 1 let nextPos = currentPos + file.Length if currentPos + file.Length > start then let path = fullPath fileManager currentFile let! ok = File.asyncAcquireLock locks path use! fs = File.AsyncOpenWrite path fs.Seek(start - currentPos, SeekOrigin.Begin) |> ignore let nToEndOfFile = file.Length - fs.Position let n = min toWrite (int nToEndOfFile) if n > 0 then try do! fs.AsyncWrite(data, written, n) with _ -> invalidArg "file" ("data could not be written to " + path) try File.releaseLock !locks path with _ -> () //update arguments let start = start + int64 n let toWrite = toWrite - n let written = written + n return! asyncWriteRange fileManager data start toWrite written nextFile nextPos else return! asyncWriteRange fileManager data start toWrite written nextFile nextPos } let writerAgent = MailboxProcessor<_>.Start(fun inbox -> async { while true do let! ((replyChannel:AsyncReplyChannel<_>), fileManager, req) = inbox.Receive() try testWriteRequest fileManager req let start = coordinatesToStartIdx fileManager req.Piece req.Offset do! asyncWriteRange fileManager req.Data start req.Data.Length 0 0 0L with _ -> () replyChannel.Reply () do! Async.Sleep 10 } ) let asyncWrite (fileManager:t) req = writerAgent.PostAndAsyncReply (fun replyChannel -> (replyChannel, fileManager, req)) let requestsWriter fileManager reqs = new AsyncWorker<_>(reqs |> Seq.map (asyncWrite fileManager)) // // hash checking // let asyncVerifyPiece (fileManager:t) piece = async { if piece >= fileManager.Metainfo.SHA1s.Length then invalidArg "piece" "piece index is out of bounds" piece let len = Metainfo.pieceLength piece fileManager.Metainfo |> int let req : ReadRequest = { Piece = piece Offset = 0 Length = len } let! pieceToCheck = asyncRead fileManager req return Digest.SHA1.hexdigest pieceToCheck = fileManager.Metainfo.SHA1s.[piece] } let piecesVerifier fileManager pieces = let tasks = [for piece in pieces -> asyncVerifyPiece fileManager piece] new AsyncWorker<_>(tasks) let transferVerifier fileManager = piecesVerifier fileManager [0..fileManager.Metainfo.SHA1s.Length - 1]
Example
let destinationFolder = @"C:\test" let torrentFile = System.IO.Path.Combine(destinationFolder, "openofficefr.torrent") let metainfo = Metainfo.fromFile torrentFile let fileManager = FileManager(metainfo, destinationFolder) for i in 0 .. fileManager.Metainfo.Files.Length - 1 do let file = fileManager.Metainfo.Files.[i] let path = FileManager.fullPath fileManager i printfn "file : %A" file printfn "path : %A" path printfn "================" let gen = FileManager.filesGenerator fileManager gen.JobCompleted.Add(fun (jobNumber, path) -> printfn "created file with path : %A" path) gen.AllCompleted.Add(fun paths -> printfn "all files created : %A" paths) gen.Start() let task = async { let! bytes = FileManager.asyncRead fileManager {Piece = 7; Offset = 0 ; Length = 34840} printfn "before :%A" bytes do! FileManager.asyncWrite fileManager {Piece = 7; Offset = 0 ; Data = [|for i in 1 .. 34840 -> i &&& 255 |> byte|] } let! bytes = FileManager.asyncRead fileManager {Piece = 7; Offset = 0 ; Length = 34840} printfn "after : %A" bytes let! ok = FileManager.asyncVerifyPiece fileManager 0 printfn "is ok ? %A" ok } Async.RunSynchronously task let pv = FileManager.piecesVerifier fileManager [0..3..17] pv.JobCompleted.Add(fun (jobNumber, res) -> printfn "is piece %d ok ? %A" jobNumber res) pv.AllCompleted.Add(fun ress -> printfn "all files created : %A" ress) pv.Start() let tv = FileManager.transferVerifier fileManager tv.JobCompleted.Add(fun (jobNumber, res) -> printfn "is piece %d ok ? %A" jobNumber res) tv.AllCompleted.Add(fun ress -> printfn "all files created : %A" ress) tv.Start()