In this post, we shall use read/write requests from a group of files considered as a single entity. We will also look at verifying the data. In our model, the group of files is defined by the files, and pieces. A piece is a chunk of data with a standardized size so that read/write requests are consistent. Of course, the last piece of the files stream could see its size differ from others since if the cumulative size of the files is not a multiple of the piece length.
General helpers
open System
open System.Security.Cryptography
open System.IO
open System.Net.NetworkInformation
open System.Threading
module Application =
let directory() =
try Path.GetDirectoryName(System.Reflection.Assembly.GetExecutingAssembly().Location)
with :? System.NotSupportedException -> ""
let drive (directory:string) =
let idx = directory.IndexOf(Path.DirectorySeparatorChar)
if idx = -1 then invalidArg "directory" "drive : invalid spelling"
directory.[0..idx]
let diskFreeSpace (drive:string) =
let driveInfo = new DriveInfo(drive)
if driveInfo.IsReady then
driveInfo.TotalFreeSpace
else
failwith "diskFreeSpace : drive is not ready"
// (c) Microsoft Corporation 2005-2009.
// Taken from PowerPack
// Removed OCaml warnings
//changed the dirname function to be consistent with other functions
module Filename =
let dirName (s:string) =
if s = "" then "."
else
match Path.GetDirectoryName(s) with
| null -> if Path.IsPathRooted(s) then s else "."
| res -> if res = "" then "." else res
//Additional functions to the System.IO namespace
module IO =
module Directory =
let create dir =
if not <| Directory.Exists(dir) then
let dirInfo = Directory.CreateDirectory(dir)
if not dirInfo.Exists then
invalidArg "dir" "Directory [%s] could not be created" dir
module File =
let rec create fullPath len overwrite =
if not <| File.Exists(fullPath) then
let dir = Filename.dirName fullPath
Directory.create dir
let drive = Application.drive fullPath
let freeSpace = Application.diskFreeSpace drive
if freeSpace >= len then
if len > 0L then
use fs = File.Create fullPath
fs.Seek(len - 1L, SeekOrigin.Begin) |> ignore
fs.WriteByte 0uy
else
failwith "createFile : not enough space available"
elif overwrite then
File.Delete(fullPath)
if File.Exists(fullPath) then
failwith "createFile : file could not be deleted to be overwritten"
create fullPath len false
let copy fileFrom fileTo overwrite =
if (overwrite || not <| File.Exists(fileTo)) then
File.Copy(fileFrom, fileTo, overwrite)
let length path =
if not <| File.Exists(path) then
invalidArg "path" "length : file not found"
let info = new System.IO.FileInfo(path)
info.Length
type LockStore = Map<string, Semaphore>
let addLock (store:LockStore ref) path =
if not <| Map.contains path !store then
store := Map.add path (new Semaphore(1, 1)) !store
let asyncAcquireLock (store:LockStore ref) path =
async
{ do addLock store path
return! (!store).[path].AsyncWaitOne(Timeout.Infinite)
}
let releaseLock (store:LockStore) path =
if Map.contains path store then
ignore <| store.[path].Release()
//alias for the sha1 functionality already available
module Digest =
module SHA1 =
let hexdigest (bytes : byte[]) =
let csp = new SHA1CryptoServiceProvider()
csp.ComputeHash(bytes)
File Manager
open IO
//---------------------------------------------------------------
// FILE MANAGER
//---------------------------------------------------------------
type FileEntry =
{ Path : string
Length : int64
}
type FileManager =
{ Files : FileEntry[]
SHA1s : byte[][]
PieceLength : int64
Length : int64
}
type ReadRequest =
{ Piece : int
Offset : int
Length : int
}
static member create p o l =
{ Piece = p
Offset = o
Length = l
}
type WriteRequest =
{ Piece : int
Offset : int
Data : byte[]
}
static member create p o d =
{ Piece = p
Offset = o
Data = d
}
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module FileManager =
type t = FileManager
//-------------------------------------------------------------
// Events
//-------------------------------------------------------------
let filesCreated = new Event<FileManager>()
let triggerFilesCreated = filesCreated.Trigger
let FilesCreated = filesCreated.Publish
let dataRead = new Event<FileManager * ReadRequest * byte[]>()
let triggerDataRead = dataRead.Trigger
let DataRead = dataRead.Publish
let dataWritten = new Event<FileManager * WriteRequest>()
let triggerDataWritten = dataWritten.Trigger
let DataWritten = dataWritten.Publish
let verifiedPiece = new Event<FileManager * int * bool>()
let triggerVerifiedPiece = verifiedPiece.Trigger
let VerifiedPiece = verifiedPiece.Publish
let piecesVerificationProgressed = new Event<FileManager * int>()
let triggerPiecesVerificationProgressed = piecesVerificationProgressed.Trigger
let PiecesVerificationProgressed = piecesVerificationProgressed.Publish
let piecesVerificationDone = new Event<FileManager * bool[]>()
let triggerPiecesVerificationDone = piecesVerificationDone.Trigger
let PiecesVerificationDone = piecesVerificationDone.Publish
//-------------------------------------------------------------
// I/O operations
//-------------------------------------------------------------
let create files sha1s pieceLength =
{ Files = files
SHA1s = sha1s
PieceLength = pieceLength
Length = files |> Array.fold (fun acc file -> acc + file.Length) 0L
}
let lastPieceLength (fileManager:t) =
fileManager.Length % fileManager.PieceLength
let pieceLength piece fileManager =
let numPieces = fileManager.SHA1s.Length
if piece < 0 || piece >= numPieces then
invalidArg "n" "n is too large or negative"
elif piece = numPieces - 1 then
lastPieceLength fileManager
else
fileManager.PieceLength
let generateFiles fileManager =
for i in 0 .. fileManager.Files.Length - 1 do
let path = fileManager.Files.[i].Path
Filename.dirName path |> Directory.create
let len = fileManager.Files.[i].Length
let overwrite = File.length path <> len
File.create path len overwrite
triggerFilesCreated fileManager
let coordinatesToStartIdx piece offset fileManager =
int64 piece * (pieceLength piece fileManager) + int64 offset
let locks : ref<File.LockStore> = ref Map.empty
//
// Reading
//
let testReadRequest (req:ReadRequest) fileManager =
if req.Piece < 0 then invalidArg "readRequest" "ReadRequest : piece was negative"
if req.Piece >= fileManager.SHA1s.Length then
invalidArg "readRequest" "ReadRequest : piece was out of bounds"
if req.Offset < 0 then invalidArg "readRequest" "ReadRequest : offset was negative"
if req.Length < 0 then invalidArg "readRequest" "ReadRequest : length was negative"
if int64 (req.Offset + req.Length) > pieceLength req.Piece fileManager then
invalidArg "readRequest" "ReadRequest : request was out of bounds"
let rec asyncReadRange fileManager (memoryStream:MemoryStream) start toRead currentFile currentPos =
async
{ if toRead > 0 && currentFile < fileManager.Files.Length then
let file = fileManager.Files.[currentFile]
let nextFile = currentFile + 1
let nextPos = currentPos + file.Length
if currentPos + file.Length > start then
let path = fileManager.Files.[currentFile].Path
let! ok = File.asyncAcquireLock locks path
use! fs = File.AsyncOpenRead path
fs.Seek(start - currentPos, SeekOrigin.Begin) |> ignore
let nToEndOfFile = file.Length - fs.Position
let n = min toRead (int nToEndOfFile)
if n > 0 then
let! dataRead = fs.AsyncRead(n)
if dataRead.Length <> n then
failwith "asyncReadRange : could not read enough data from "
do! memoryStream.AsyncWrite(dataRead, 0, dataRead.Length)
try File.releaseLock !locks path with _ -> ()
return! asyncReadRange fileManager memoryStream (start + int64 n) (toRead - n) nextFile nextPos
else
return! asyncReadRange fileManager memoryStream start toRead nextFile nextPos
else
return memoryStream.ToArray()
}
let asyncRead req fileManager =
async
{ testReadRequest req fileManager
use memoryStream = new MemoryStream()
let start = coordinatesToStartIdx req.Piece req.Offset fileManager
let! readBytes = asyncReadRange fileManager memoryStream start req.Length 0 0L
triggerDataRead (fileManager, req, readBytes)
return readBytes
}
//
// Writing
//
let testWriteRequest (req:WriteRequest) fileManager =
if req.Piece < 0 then invalidArg "writeRequest" "WriteRequest : piece was negative"
if req.Piece >= fileManager.SHA1s.Length then
invalidArg "writeRequest" "WriteRequest : piece was out of bounds"
if req.Offset < 0 then invalidArg "writeRequest" "WriteRequest : offset was negative"
let rec asyncWriteRange fileManager data start toWrite written currentFile currentPos =
async
{ if toWrite > 0 && currentFile < fileManager.Files.Length then
let file = fileManager.Files.[currentFile]
let nextFile = currentFile + 1
let nextPos = currentPos + file.Length
if currentPos + file.Length > start then
let path = fileManager.Files.[currentFile].Path
let! ok = File.asyncAcquireLock locks path
use! fs = File.AsyncOpenWrite path
fs.Seek(start - currentPos, SeekOrigin.Begin) |> ignore
let nToEndOfFile = file.Length - fs.Position
let n = min toWrite (int nToEndOfFile)
if n > 0 then
try do! fs.AsyncWrite(data, written, n)
with _ -> failwith "asyncWriteRange : data could not be written"
try File.releaseLock !locks path with _ -> ()
return! asyncWriteRange fileManager data (start + int64 n) (toWrite - n) (written + n) nextFile nextPos
else
return! asyncWriteRange fileManager data start toWrite written nextFile nextPos
}
let asyncWrite req fileManager =
async
{ testWriteRequest req fileManager
let start = coordinatesToStartIdx req.Piece req.Offset fileManager
do! asyncWriteRange fileManager req.Data start req.Data.Length 0 0 0L
triggerDataWritten (fileManager, req)
}
//
// Verifying
//
let asyncVerifySinglePieceAux piece fileManager =
async
{ if piece >= fileManager.SHA1s.Length then
invalidArg "piece" "asyncVerifySinglePiece : piece index is out of bounds"
let len = pieceLength piece fileManager |> int
let req : ReadRequest =
{ Piece = piece
Offset = 0
Length = len
}
let! pieceToCheck = asyncRead req fileManager
return Digest.SHA1.hexdigest pieceToCheck = fileManager.SHA1s.[piece]
}
let asyncVerifySinglePiece piece fileManager =
async
{ let! ok = asyncVerifySinglePieceAux piece fileManager
triggerVerifiedPiece(fileManager, piece, ok)
return ok
}
let asyncVerifyMultiplePiecesAux pieces fileManager =
//the lock may make the advantage of parallelism (via Async.Parallel) vanish
//but the goal is to have results returned in an array, and parallelism
//when using SHA1 may yield false results.
let nVerifiedPieces = ref 0
pieces |> Seq.map (fun piece ->
async
{ let! res = asyncVerifySinglePieceAux piece fileManager
lock fileManager (fun _ ->
nVerifiedPieces := !nVerifiedPieces + 1
triggerPiecesVerificationProgressed (fileManager, !nVerifiedPieces)
)
return res
}
) |> Async.Parallel
let asyncVerifyMultiplePieces pieces fileManager =
async
{ let! res = asyncVerifyMultiplePiecesAux pieces fileManager
triggerPiecesVerificationDone(fileManager, res)
return res
}
let asyncVerifyAllPieces fileManager =
asyncVerifyMultiplePieces [0..fileManager.SHA1s.Length - 1] fileManager