Functions to extend the F# Event module.

Signature

module Event =
  val listenOnce : ('a -> unit) -> IEvent<'b,'a> -> unit

  val firstOfTwo : IEvent<'a,'b> -> IEvent<'c,'b> -> IEvent<'b>

  val listenUntil : predicate:('a -> bool) -> IEvent<'b,'a> -> IEvent<'a>

  val listenWhile : predicate:('a -> bool) -> IEvent<'b,'a> -> IEvent<'a>

  val listenN : toListen:int -> IEvent<'a,'b> -> IEvent<'b>

  val skipUntil : predicate:('a -> bool) -> IEvent<'b,'a> -> IEvent<'a>

  val skipWhile : predicate:('a -> bool) -> IEvent<'b,'a> -> IEvent<'a>

  val skipN : toSkip:int -> IEvent<'a,'b> -> IEvent<'b>

  val mapi : f:(int -> 'a -> 'c) -> IEvent<'b,'a> -> IEvent<'c>

  val iter : f:('a -> unit) -> IEvent<'b,'a> -> unit

  val iteri : f:(int -> 'a -> unit) -> IEvent<'b,'a> -> unit

  val mergeAll : seq<IEvent<'a>> -> IEvent<'a>

  val windowed : length:int -> IEvent<'a,'b> -> IEvent<Queue<'b>>

  val reduce : f:('a -> 'a -> 'a) -> IEvent<'b,'a> -> IEvent<'a>

  val fold : f:('a -> 'b -> 'a) -> state:'a -> IEvent<'c,'b> -> IEvent<'a>

Implementation


module Event =
  let listenOnce f evt =
    async {
      let! res = Async.AwaitEvent evt
      f res
    } |> Async.Start

  let firstOfTwo evt evt' =
    let firstOccurred = ref false
    let res = new Event<_>()
    let selectEvt evt args =
      lock firstOccurred (fun _ ->
        if not !firstOccurred then
          firstOccurred := true
          evt |> Event.add (fun args -> res.Trigger(args))
          //make sure the first call triggers the received args too
          res.Trigger(args)
      )
    evt |> listenOnce (selectEvt evt)
    evt' |> listenOnce (selectEvt evt')
    res.Publish

  let listenUntil p evt =
    let ok = ref true
    evt |> Event.filter (fun args -> !ok && (ok := not <| p args; !ok))

  let listenWhile p evt =
    let ok = ref true
    evt |> Event.filter (fun args -> !ok && (ok := p args; !ok))

  let listenN n evt =
    let i = ref 0
    evt |> Event.filter (fun args -> incr i; !i < n)

  let skipUntil p evt =
    let ok = ref false
    evt |> Event.filter (fun args -> !ok || (ok := p args; !ok))

  let skipWhile p evt =
    let ok = ref false
    evt |> Event.filter (fun args -> !ok || (ok := not <| p args; !ok))

  let skipN n evt =
    let i = ref 0
    evt |> Event.filter (fun args -> incr i; !i >= n)

  let mapi f evt =
    let i = ref (-1)
    evt |> Event.map (fun args -> incr i; f !i args )

  let iter f evt =
    Event.add f evt

  let iteri f evt =
    let i = ref 0
    evt |> Event.add (fun args ->f !i args; incr i)

  let mergeAll evts =
    evts |> Seq.reduce Event.merge

  let windowed n evt =
    if n <= 1 then failwith "window length is too small"
    evt
      |> Event.scan (fun acc args ->
          let incomplete =
            if Queue.length acc = n then Queue.tail acc else acc
          Queue.enqueue args incomplete
        ) Queue.empty
      |> Event.filter (fun q -> Queue.length q = n)

  let reduce f evt =
    let shouldPublish = ref false
    let state = ref Unchecked.defaultof<_>
    let res = new Event<_>()
    evt |> listenOnce (fun args ->
      state := args
      evt |> Event.add (fun args ->
        let tmp = f !state args
        res.Trigger(tmp)
        state := tmp
      )
    )
    res.Publish

  let fold f state evt =
    let shouldPublish = ref false
    let state = ref state
    let res = new Event<_>()
    evt |> Event.add (fun args ->
      let tmp = f !state args
      res.Trigger(tmp)
      state := tmp
    )
    res.Publish

Comments are closed.