Skip to content

Commit

Permalink
Merge pull request #70 from eulerfx/master
Browse files Browse the repository at this point in the history
bufferByTime
  • Loading branch information
eulerfx authored Sep 27, 2017
2 parents 2713edc + 49aab21 commit 6613af0
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 8 deletions.
3 changes: 3 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### 2.0.15 - 27.09.2017
* NEW: AsyncSeq.bufferByTime

### 2.0.14 - 27.09.2017
* BUG: Fixed head of line blocking in AsyncSeq.mapAsyncParallel

Expand Down
8 changes: 4 additions & 4 deletions src/FSharp.Control.AsyncSeq.Profile7/AssemblyInfo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ open System.Reflection
[<assembly: AssemblyTitleAttribute("FSharp.Control.AsyncSeq.Profile7")>]
[<assembly: AssemblyProductAttribute("FSharp.Control.AsyncSeq")>]
[<assembly: AssemblyDescriptionAttribute("Asynchronous sequences for F#")>]
[<assembly: AssemblyVersionAttribute("2.0.13")>]
[<assembly: AssemblyFileVersionAttribute("2.0.13")>]
[<assembly: AssemblyVersionAttribute("2.0.14")>]
[<assembly: AssemblyFileVersionAttribute("2.0.14")>]
do ()

module internal AssemblyVersionInformation =
let [<Literal>] Version = "2.0.13"
let [<Literal>] InformationalVersion = "2.0.13"
let [<Literal>] Version = "2.0.14"
let [<Literal>] InformationalVersion = "2.0.14"
8 changes: 4 additions & 4 deletions src/FSharp.Control.AsyncSeq/AssemblyInfo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ open System.Reflection
[<assembly: AssemblyTitleAttribute("FSharp.Control.AsyncSeq")>]
[<assembly: AssemblyProductAttribute("FSharp.Control.AsyncSeq")>]
[<assembly: AssemblyDescriptionAttribute("Asynchronous sequences for F#")>]
[<assembly: AssemblyVersionAttribute("2.0.13")>]
[<assembly: AssemblyFileVersionAttribute("2.0.13")>]
[<assembly: AssemblyVersionAttribute("2.0.14")>]
[<assembly: AssemblyFileVersionAttribute("2.0.14")>]
do ()

module internal AssemblyVersionInformation =
let [<Literal>] Version = "2.0.13"
let [<Literal>] InformationalVersion = "2.0.13"
let [<Literal>] Version = "2.0.14"
let [<Literal>] InformationalVersion = "2.0.14"
40 changes: 40 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ module internal Utils =
elif i = 1 then return (Choice2Of2 (b.Result, a))
else return! failwith (sprintf "unreachable, i = %d" i) }

static member internal chooseTasks2 (a:Task<'T>) (b:Task) : Async<Choice<'T * Task, Task<'T>>> =
async {
let! ct = Async.CancellationToken
let i = Task.WaitAny( [| (a :> Task);(b) |],ct)
if i = 0 then return (Choice1Of2 (a.Result, b))
elif i = 1 then return (Choice2Of2 (a))
else return! failwith (sprintf "unreachable, i = %d" i) }

type MailboxProcessor<'Msg> with
member __.PostAndAsyncReplyTask (f:TaskCompletionSource<'a> -> 'Msg) : Task<'a> =
let tcs = new TaskCompletionSource<'a>()
Expand All @@ -131,6 +139,9 @@ module internal Utils =
let chooseTask (t:Task<'a>) (a:Async<'a>) : Async<'a> =
chooseTaskAsTask t a |> Async.bind Async.awaitTaskCancellationAsError

let toUnit (t:Task) : Task<unit> =
t.ContinueWith (Func<_, _>(fun (_:Task) -> ()))

let taskFault (t:Task<'a>) : Task<'b> =
t
|> extend (fun t ->
Expand Down Expand Up @@ -1389,6 +1400,35 @@ module AsyncSeq =
yield! loop None timeoutMs
}

let bufferByTime (timeMs:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> = asyncSeq {
if (timeMs < 1) then invalidArg "timeMs" "must be positive"
let buf = new ResizeArray<_>()
use ie = source.GetEnumerator()
let rec loop (next:Task<'T option> option, waitFor:Task option) = asyncSeq {
let! next =
match next with
| Some n -> async.Return n
| None -> ie.MoveNext () |> Async.StartChildAsTask
let waitFor =
match waitFor with
| Some w -> w
| None -> Task.Delay timeMs
let! res = Async.chooseTasks2 next waitFor
match res with
| Choice1Of2 (Some a,waitFor) ->
buf.Add a
yield! loop (None,Some waitFor)
| Choice1Of2 (None,_) ->
let arr = buf.ToArray()
if arr.Length > 0 then
yield arr
| Choice2Of2 next ->
let arr = buf.ToArray()
buf.Clear()
yield arr
yield! loop (Some next, None) }
yield! loop (None, None) }

let private mergeChoiceEnum (ie1:IAsyncEnumerator<'T1>) (ie2:IAsyncEnumerator<'T2>) : AsyncSeq<Choice<'T1,'T2>> = asyncSeq {
let! move1T = Async.StartChildAsTask (ie1.MoveNext())
let! move2T = Async.StartChildAsTask (ie2.MoveNext())
Expand Down
4 changes: 4 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ module AsyncSeq =
/// Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed.
val bufferByCountAndTime : bufferSize:int -> timeoutMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>

/// Buffers items from the async sequence by the specified time interval.
/// If no items are received in an intervel and empty array is emitted.
val bufferByTime : timeMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T[]>

/// Merges two async sequences into an async sequence non-deterministically.
/// The resulting async sequence produces elements when any argument sequence produces an element.
val mergeChoice: source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<Choice<'T1,'T2>>
Expand Down
23 changes: 23 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,29 @@ let ``AsyncSeq.bufferByTimeAndCount empty``() =
let actual = AsyncSeq.bufferByCountAndTime 2 10 s |> AsyncSeq.toList
Assert.True((actual = []))

[<Test>]
let ``AsyncSeq.bufferByTime`` () =

let s = asyncSeq {
yield 1
yield 2
do! Async.Sleep 100
yield 3
yield 4
do! Async.Sleep 100
yield 5
yield 6
}

let actual =
s
|> AsyncSeq.bufferByTime 100
|> AsyncSeq.map (List.ofArray)
|> AsyncSeq.toList

let expected = [ [1;2] ; [3;4] ; [5;6] ]

Assert.True ((actual = expected))

[<Test>]
let ``try finally works no exception``() =
Expand Down

0 comments on commit 6613af0

Please sign in to comment.