In this post, we shall use read/write requests from a group of files considered as a single entity. We will also look at verifying the data. In our model, the group of files is defined by the files, and pieces. A piece is a chunk of data with a standardized size so that read/write requests are consistent. Of course, the last piece of the files stream could see its size differ from others since if the cumulative size of the files is not a multiple of the piece length.

General helpers

open System
open System.Security.Cryptography
open System.IO
open System.Net.NetworkInformation
open System.Threading

module Application =

  let directory() =
    try Path.GetDirectoryName(System.Reflection.Assembly.GetExecutingAssembly().Location)
    with :? System.NotSupportedException -> ""

  let drive (directory:string) =
    let idx = directory.IndexOf(Path.DirectorySeparatorChar)
    if idx = -1 then invalidArg "directory" "drive : invalid spelling"
    directory.[0..idx]

  let diskFreeSpace (drive:string) =
    let driveInfo = new DriveInfo(drive)
    if driveInfo.IsReady then
      driveInfo.TotalFreeSpace
    else
      failwith "diskFreeSpace : drive is not ready" 

// (c) Microsoft Corporation 2005-2009.
// Taken from PowerPack
// Removed OCaml warnings
//changed the dirname function to be consistent with other functions

module Filename =
  let dirName (s:string) =
    if s = "" then "."
    else
      match Path.GetDirectoryName(s) with
      | null -> if Path.IsPathRooted(s) then s else "."
      | res -> if res = "" then "." else res

//Additional functions to the System.IO namespace
module IO =

  module Directory =
    let create dir =
      if not <| Directory.Exists(dir) then
        let dirInfo = Directory.CreateDirectory(dir)
        if not dirInfo.Exists then
          invalidArg "dir" "Directory [%s] could not be created" dir  

  module File =
    let rec create fullPath len overwrite =
      if not <| File.Exists(fullPath) then
        let dir = Filename.dirName fullPath
        Directory.create dir
        let drive = Application.drive fullPath
        let freeSpace = Application.diskFreeSpace drive
        if freeSpace >= len then
          if len > 0L then
            use fs = File.Create fullPath
            fs.Seek(len - 1L, SeekOrigin.Begin) |> ignore
            fs.WriteByte 0uy
        else
          failwith "createFile : not enough space available"
      elif overwrite then
        File.Delete(fullPath)
        if File.Exists(fullPath) then
          failwith "createFile : file could not be deleted to be overwritten"
        create fullPath len false

    let copy fileFrom fileTo overwrite =
      if (overwrite || not <| File.Exists(fileTo)) then
        File.Copy(fileFrom, fileTo, overwrite)

    let length path =
      if not <| File.Exists(path) then
        invalidArg "path" "length : file not found"
      let info = new System.IO.FileInfo(path)
      info.Length

    type LockStore = Map<string, Semaphore>

    let addLock (store:LockStore ref) path =
      if not <| Map.contains path !store then
        store := Map.add path (new Semaphore(1, 1)) !store

    let asyncAcquireLock (store:LockStore ref) path =
      async
        { do addLock store path
          return! (!store).[path].AsyncWaitOne(Timeout.Infinite)
        }

    let releaseLock (store:LockStore) path =
      if Map.contains path store then
        ignore <| store.[path].Release()        

//alias for the sha1 functionality already available
module Digest =
  module SHA1 =
    let hexdigest (bytes : byte[]) =
      let csp = new SHA1CryptoServiceProvider()
      csp.ComputeHash(bytes)

File Manager


open IO

//---------------------------------------------------------------
// FILE MANAGER
//---------------------------------------------------------------

type FileEntry =
  { Path : string
    Length : int64
  } 

type FileManager =
    { Files : FileEntry[]
      SHA1s : byte[][]
      PieceLength : int64
      Length : int64
    }  

type ReadRequest =
  { Piece : int
    Offset : int
    Length : int
  }
  static member create p o l =
    { Piece = p
      Offset  = o
      Length  = l
    }

type WriteRequest =
  { Piece : int
    Offset : int
    Data : byte[]
  }
  static member create p o d =
    { Piece = p
      Offset  = o
      Data  = d
    }

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module FileManager =

  type t = FileManager

  //-------------------------------------------------------------
  // Events
  //-------------------------------------------------------------

  let filesCreated = new Event<FileManager>()
  let triggerFilesCreated = filesCreated.Trigger
  let FilesCreated = filesCreated.Publish

  let dataRead = new Event<FileManager * ReadRequest * byte[]>()
  let triggerDataRead = dataRead.Trigger
  let DataRead = dataRead.Publish

  let dataWritten = new Event<FileManager * WriteRequest>()
  let triggerDataWritten = dataWritten.Trigger
  let DataWritten = dataWritten.Publish

  let verifiedPiece = new Event<FileManager * int * bool>()
  let triggerVerifiedPiece = verifiedPiece.Trigger
  let VerifiedPiece = verifiedPiece.Publish

  let piecesVerificationProgressed = new Event<FileManager * int>()
  let triggerPiecesVerificationProgressed = piecesVerificationProgressed.Trigger
  let PiecesVerificationProgressed = piecesVerificationProgressed.Publish

  let piecesVerificationDone = new Event<FileManager * bool[]>()
  let triggerPiecesVerificationDone = piecesVerificationDone.Trigger
  let PiecesVerificationDone = piecesVerificationDone.Publish

  //-------------------------------------------------------------
  // I/O operations
  //-------------------------------------------------------------

  let create files sha1s pieceLength =
    { Files = files
      SHA1s = sha1s
      PieceLength = pieceLength
      Length = files |> Array.fold (fun acc file -> acc + file.Length) 0L
    }

  let lastPieceLength (fileManager:t) =
    fileManager.Length % fileManager.PieceLength

  let pieceLength piece fileManager =
    let numPieces = fileManager.SHA1s.Length
    if piece < 0 || piece >= numPieces then
        invalidArg "n" "n is too large or negative"
    elif piece = numPieces - 1 then
      lastPieceLength fileManager
    else
      fileManager.PieceLength

  let generateFiles fileManager =
    for i in 0 .. fileManager.Files.Length - 1 do
      let path = fileManager.Files.[i].Path
      Filename.dirName path |> Directory.create
      let len = fileManager.Files.[i].Length
      let overwrite = File.length path <> len
      File.create path len overwrite
    triggerFilesCreated fileManager

  let coordinatesToStartIdx piece offset fileManager =
    int64 piece * (pieceLength piece fileManager) + int64 offset

  let locks : ref<File.LockStore> = ref Map.empty

  //
  // Reading
  //

  let testReadRequest (req:ReadRequest) fileManager =
    if req.Piece < 0 then invalidArg "readRequest" "ReadRequest : piece was negative"
    if req.Piece >= fileManager.SHA1s.Length then
      invalidArg "readRequest" "ReadRequest : piece was out of bounds"
    if req.Offset < 0 then invalidArg "readRequest" "ReadRequest : offset was negative"
    if req.Length < 0 then invalidArg "readRequest" "ReadRequest : length was negative"
    if int64 (req.Offset + req.Length) > pieceLength req.Piece fileManager then
      invalidArg "readRequest" "ReadRequest : request was out of bounds" 

  let rec asyncReadRange fileManager (memoryStream:MemoryStream) start toRead currentFile currentPos =
    async
      { if toRead > 0 && currentFile < fileManager.Files.Length then
          let file = fileManager.Files.[currentFile]
          let nextFile = currentFile + 1
          let nextPos = currentPos + file.Length
          if currentPos + file.Length > start then
            let path = fileManager.Files.[currentFile].Path
            let! ok = File.asyncAcquireLock locks path
            use! fs = File.AsyncOpenRead path
            fs.Seek(start - currentPos, SeekOrigin.Begin) |> ignore
            let nToEndOfFile = file.Length - fs.Position
            let n = min toRead (int nToEndOfFile)
            if n > 0 then
              let! dataRead = fs.AsyncRead(n)
              if dataRead.Length <> n then
                failwith "asyncReadRange : could not read enough data from "
              do! memoryStream.AsyncWrite(dataRead, 0, dataRead.Length)
            try File.releaseLock !locks path with _ -> ()
            return! asyncReadRange fileManager memoryStream (start + int64 n) (toRead - n) nextFile nextPos
          else
            return! asyncReadRange fileManager memoryStream start toRead nextFile nextPos
        else
          return memoryStream.ToArray()
      }

  let asyncRead req fileManager =
    async
      { testReadRequest req fileManager
        use memoryStream = new MemoryStream()
        let start = coordinatesToStartIdx req.Piece req.Offset fileManager
        let! readBytes = asyncReadRange fileManager memoryStream start req.Length 0 0L
        triggerDataRead (fileManager, req, readBytes)
        return readBytes
      }

  //
  // Writing
  //

  let testWriteRequest (req:WriteRequest) fileManager =
    if req.Piece < 0 then invalidArg "writeRequest" "WriteRequest : piece was negative"
    if req.Piece >= fileManager.SHA1s.Length then
      invalidArg "writeRequest" "WriteRequest : piece was out of bounds"
    if req.Offset < 0 then invalidArg "writeRequest" "WriteRequest : offset was negative" 

  let rec asyncWriteRange fileManager data start toWrite written currentFile currentPos =
    async
      { if toWrite > 0 && currentFile < fileManager.Files.Length then
          let file = fileManager.Files.[currentFile]
          let nextFile = currentFile + 1
          let nextPos = currentPos + file.Length

          if currentPos + file.Length > start then
            let path = fileManager.Files.[currentFile].Path

            let! ok = File.asyncAcquireLock locks path
            use! fs = File.AsyncOpenWrite path
            fs.Seek(start - currentPos, SeekOrigin.Begin) |> ignore
            let nToEndOfFile = file.Length - fs.Position
            let n = min toWrite (int nToEndOfFile)
            if n > 0 then
              try do! fs.AsyncWrite(data, written, n)
              with _ -> failwith "asyncWriteRange : data could not be written"
            try File.releaseLock !locks path with _ -> ()
            return! asyncWriteRange fileManager data (start + int64 n) (toWrite - n) (written + n) nextFile nextPos
          else
            return! asyncWriteRange fileManager data start toWrite written nextFile nextPos
      }

  let asyncWrite req fileManager =
    async
      { testWriteRequest req fileManager
        let start = coordinatesToStartIdx req.Piece req.Offset fileManager
        do! asyncWriteRange fileManager req.Data start req.Data.Length 0 0 0L
        triggerDataWritten (fileManager, req)
      }

  //
  // Verifying
  //

  let asyncVerifySinglePieceAux piece fileManager =
    async
      { if piece >= fileManager.SHA1s.Length then
            invalidArg "piece" "asyncVerifySinglePiece : piece index is out of bounds"
        let len = pieceLength piece fileManager |> int
        let req : ReadRequest =
          { Piece = piece
            Offset = 0
            Length = len
          }
        let! pieceToCheck = asyncRead req fileManager
        return Digest.SHA1.hexdigest pieceToCheck  = fileManager.SHA1s.[piece]
      }

  let asyncVerifySinglePiece piece fileManager =
    async
      { let! ok = asyncVerifySinglePieceAux piece fileManager
        triggerVerifiedPiece(fileManager, piece, ok)
        return ok
      }

  let asyncVerifyMultiplePiecesAux pieces fileManager =
    //the lock may make the advantage of parallelism (via Async.Parallel) vanish
    //but the goal is to have results returned in an array, and parallelism
    //when using SHA1 may yield false results.
    let nVerifiedPieces = ref 0
    pieces |> Seq.map (fun piece ->
      async
        { let! res = asyncVerifySinglePieceAux piece fileManager
          lock fileManager (fun _ ->
            nVerifiedPieces := !nVerifiedPieces + 1
            triggerPiecesVerificationProgressed (fileManager, !nVerifiedPieces)
          )
          return res
        }
    ) |> Async.Parallel 

  let asyncVerifyMultiplePieces pieces fileManager =
    async
      { let! res = asyncVerifyMultiplePiecesAux pieces fileManager
        triggerPiecesVerificationDone(fileManager, res)
        return res
      }  

  let asyncVerifyAllPieces fileManager =
    asyncVerifyMultiplePieces [0..fileManager.SHA1s.Length - 1] fileManager
Share :
  • email
  • Facebook
  • Google Buzz
  • Yahoo! Buzz
  • Digg
  • del.icio.us
  • Add to favorites
  • Print

Leave a Reply

You must be logged in to post a comment.