Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eulerfx committed Mar 13, 2018
1 parent 479df93 commit 091a14c
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 52 deletions.
4 changes: 2 additions & 2 deletions src/kafunk/Chan.fs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ module internal Chan =
socketAgent
|> Resource.injectWithRecovery
RetryPolicy.none
(fun s buf -> Socket.sendAll s buf |> Async.Catch |> Async.map (Result.mapError (ResourceErrorAction.CloseRetry)))
(fun s buf -> Socket.sendAll s buf |> Async.Catch |> Async.map (Result.mapError (Some >> ResourceErrorAction.CloseRetry)))

let receive =
let receive s buf = async {
Expand Down Expand Up @@ -298,7 +298,7 @@ module internal Chan =
let send session req =
Session.send session req
|> Async.map (function
| None -> Failure (CloseRetry (exn "timeout"))
| None -> Failure (CloseRetry None)
| Some res -> Success (Success res))
sessionAgent
|> Resource.injectWithRecovery config.requestRetryPolicy send
Expand Down
3 changes: 2 additions & 1 deletion src/kafunk/Kafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ type private RetryAction =
let ec = p.errorCode
match ec with
| ErrorCode.NoError -> None
| ErrorCode.NotLeaderForPartition -> Some (ec, RetryAction.RefreshMetadataAndRetry [|topicName|])
| ErrorCode.NotLeaderForPartition | ErrorCode.UnknownTopicOrPartition ->
Some (ec, RetryAction.RefreshMetadataAndRetry [|topicName|])
| ec ->
RetryAction.errorRetryAction ec
|> Option.map (fun action -> ec, action)))
Expand Down
16 changes: 9 additions & 7 deletions src/kafunk/Producer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,15 @@ module Producer =
xs <- ResizeArray<_>()
ps.Add (b.partition, xs)
for j = 0 to b.messages.Length - 1 do
let offset = int64 xs.Count // the offset of the message with respect to partition batj
let pm = b.messages.[j]
let m = Message(0, magicByte, 0y, 0L, pm.key, pm.value)
let ms =
if magicByte < 2y then Message.Size m
else Message.SizeRecord 0L 0 m
// TODO: what if more than byte messages?
xs.Add (MessageSetItem(0L, ms, m))
else
// the offset delta is simply the offset of this message in the in-memory batch
Message.SizeRecord 0L (int offset) m
xs.Add (MessageSetItem(offset, ms, m))
let arr = Array.zeroCreate ps.Count
let mutable i = 0
for p in ps do
Expand Down Expand Up @@ -595,10 +597,10 @@ module Producer =
|> Seq.partitionChoices
if recovers.Length > 0 then
let ex = Exn.ofSeq (Seq.append recovers retries)
return Failure (Resource.ResourceErrorAction.CloseRetry ex)
return Failure (Resource.ResourceErrorAction.CloseRetry (Some ex))
else
let ex = Exn.ofSeq retries
return Failure (Resource.ResourceErrorAction.Retry ex)
return Failure (Resource.ResourceErrorAction.Retry (Some ex))
else
return Success oks }

Expand All @@ -611,8 +613,8 @@ module Producer =
| Success res -> return Success res
| Failure err ->
match producerErrorToRetryAction err with
| Choice1Of2 ex -> return Failure <| Resource.ResourceErrorAction.Retry ex
| Choice2Of2 ex -> return Failure <| Resource.ResourceErrorAction.CloseRetry ex }
| Choice1Of2 ex -> return Failure <| Resource.ResourceErrorAction.Retry (Some ex)
| Choice2Of2 ex -> return Failure <| Resource.ResourceErrorAction.CloseRetry (Some ex) }

/// Creates a producer.
let createAsync (conn:KafkaConn) (config:ProducerConfig) : Async<Producer> = async {
Expand Down
6 changes: 3 additions & 3 deletions src/kafunk/Protocol.fs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ module Protocol =
Binary.sizeBytes m.value

/// Returns the size of the record, excluding the size of the length field itself.
static member SizeRecord timestampDelta offsetDelta (m:Message) =
/// NB: vartint encodings are used on timestamp delta and offset delta
static member SizeRecord (timestampDelta:int64) (offsetDelta:int) (m:Message) =
let size =
1 + // attributes
Binary.sizeVarint64 timestampDelta +
Expand All @@ -458,7 +459,6 @@ module Protocol =

static member Read (magicByte:int8, buf:BinaryZipper) =
let crc = buf.ReadInt32 ()
//let _magicByte = buf.ReadInt8 ()
buf.ShiftOffset 1 // magic byte
let attrs = buf.ReadInt8 ()
let timestamp =
Expand Down Expand Up @@ -643,7 +643,7 @@ module Protocol =
let isControl = batchAttributes &&& int16 0x20 > 0s
let isTx = batchAttributes &&& int16 0x10 > 0s
let lastOffsetDelta = buf.ReadInt32()
let lastOffset = firstOffset + int64 lastOffsetDelta
let lastOffset = firstOffset + int64 lastOffsetDelta
let firstTimestamp = buf.ReadInt64()
let maxTimestamp = buf.ReadInt64()
let _producerId = buf.ReadInt64()
Expand Down
6 changes: 3 additions & 3 deletions src/kafunk/Utility/Async.fs
Original file line number Diff line number Diff line change
Expand Up @@ -368,16 +368,16 @@ module Async =
let! r = Task.WhenAny (t, at) |> awaitTaskCancellationAsError
return r.Result }

let cancelWithTaskThrow (t:Task<unit>) (a:Async<'a>) : Async<'a> = async {
let cancelWithTaskThrow (err:exn -> exn) (t:Task<unit>) (a:Async<'a>) : Async<'a> = async {
let! ct = Async.CancellationToken
use cts = CancellationTokenSource.CreateLinkedTokenSource ct
let t =
t
|> Task.extend (fun t ->
cts.Cancel () |> ignore
raise (OperationCanceledException("cancelled!", t.Exception)))
raise (err t.Exception))
let at = Async.StartAsTask (a, cancellationToken = cts.Token)
let! r = Task.WhenAny (t, at) |> awaitTaskCancellationAsError
let! r = Task.WhenAny (at, t) |> awaitTaskCancellationAsError
return r.Result }

let cancelWithTaskTimeout (timeout:TimeSpan) (t:Task<unit>) (a:Async<'a>) : Async<'a> = async {
Expand Down
53 changes: 34 additions & 19 deletions src/kafunk/Utility/Resource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ type ResourceState<'a> =
/// The resource is open.
| Open of resource:'a * state:Task<unit>

//type IResource<'a> =
// abstract Get : unit -> Async<ResourceEpoch<'a>>
// abstract Close : ResourceEpoch<'a> option -> exn option -> Async<unit>
/// An exception raised when attempt is made to access a faulted resource.
type ClosedResourceException () =
inherit Exception ("The resource has been closed.", null)

/// An exception raised when attempt is made to access a faulted resource.
type FaultedResourceException (ex:exn) = inherit Exception ("FaultedResourceException", ex)
type FaultedResourceException (ex:exn) =
inherit Exception ("The resource faulted.", ex)

/// An exception raised when a resource-dependent operation fails and retries are depleted.
type FaultedResourceOperationException (msg:string,exn:exn) = inherit Exception (msg, exn)
type FaultedResourceOperationException (msg:string,exn:exn option) =
inherit Exception (msg, exn |> Option.getOr null)

/// A recoverable resource, supporting resource-dependant operations.
type Resource<'r> internal (id:string, create:Task<unit> -> ResourceEpoch<'r> option -> Async<'r * Async<unit>>, close:ResourceEpoch<'r> * exn option -> Async<unit>) =
Expand Down Expand Up @@ -98,11 +100,14 @@ type Resource<'r> internal (id:string, create:Task<unit> -> ResourceEpoch<'r> op
return { resource = Unchecked.defaultof<_> ; state = state ; version = version ; proc = Task.FromResult () ; cts = cts } })

/// Closes the resource, if not already closed.
member internal __.Close (callingEpoch:ResourceEpoch<'r>, ex:exn option) =
member internal __.CloseFault (callingEpoch:ResourceEpoch<'r>, ex:exn option, fault:bool) =
cell
|> SVar.updateAsync (fun currentEpoch -> async {
try
if currentEpoch.tryCloseOrFault ex then
let closed =
if fault then currentEpoch.tryCloseOrFault ex
else currentEpoch.tryCloseOrFault None
if closed then
Log.trace "closing_resource|type=%s version=%i" name currentEpoch.version
currentEpoch.cts.Dispose ()
do! close (currentEpoch, ex)
Expand All @@ -117,18 +122,24 @@ type Resource<'r> internal (id:string, create:Task<unit> -> ResourceEpoch<'r> op
return raise (exn(errMsg, ex)) })
|> Async.Ignore

member internal __.Fault (callingEpoch:ResourceEpoch<'r>, ex:exn option) =
__.CloseFault (callingEpoch, ex, true)

member internal __.Close (callingEpoch:ResourceEpoch<'r>, ex:exn option) =
__.CloseFault (callingEpoch, ex, false) |> Async.Ignore

member internal __.Close (callingEpoch:ResourceEpoch<'r>) =
__.Close (callingEpoch, None) |> Async.Ignore
__.CloseFault (callingEpoch, None, false) |> Async.Ignore

member internal __.CloseCurrent (ex:exn option) = async {
member internal __.CloseCurrent (ex:exn option, fault:bool) = async {
let ep = SVar.getFastUnsafe cell
match ep with
| None ->
return ()
| Some ep when ep.isClosed ->
return ()
| Some ep ->
return! __.Close (ep, ex) |> Async.Ignore }
return! __.CloseFault (ep, ex, fault) |> Async.Ignore }

/// Gets an instances of the resource, opening if needed.
/// Throws: FaultedResourceException if the resource is faulted.
Expand Down Expand Up @@ -164,24 +175,24 @@ and ResourceErrorAction<'a, 'e> =
| CloseResume of 'e option * 'a

/// Close the resource, re-open and retry the operation.
| CloseRetry of 'e
| CloseRetry of 'e option

/// Retry without closing.
| Retry of 'e
| Retry of 'e option

/// Operations on resources.
module Resource =

let ensureOpen (r:Resource<'r>) =
r.Get () |> Async.Ignore

/// Closes the resource.
/// Permanently closes the resource.
let close (r:Resource<'r>) =
r.CloseCurrent None
r.CloseCurrent (None,true)

/// Faults the resource.
/// Faults the resource, which closes it permanently.
let fault (r:Resource<'r>) (ex:exn) =
r.CloseCurrent (Some ex)
r.CloseCurrent (Some ex,true)

let create
(name:string)
Expand All @@ -197,10 +208,14 @@ module Resource =
let getResource (r:Resource<'r>) =
r.Get () |> Async.map (fun ep -> ep.resource)

let private resourceError (ex:exn) =
if isNull ex then ClosedResourceException() :> exn
else FaultedResourceException(ex) :> exn

let injectWithRecovery (rp:RetryPolicy) (op:'r -> ('a -> Async<Result<'b, ResourceErrorAction<'b, exn>>>)) (r:Resource<'r>) (a:'a) : Async<'b> =
let rec go (rs:RetryState) = async {
let! ep = r.Get ()
let! b = Async.cancelWithTaskThrow ep.state.Task (op ep.resource a)
let! b = Async.cancelWithTaskThrow resourceError ep.state.Task (op ep.resource a)
match b with
| Success b ->
return b
Expand All @@ -218,13 +233,13 @@ module Resource =
| Failure (CloseResume (ex,b)) ->
Log.trace "closing_and_resuming_after_failure|type=%s version=%i attempt=%i error=\"%O\""
r.Name ep.version rs.attempt ex
do! r.Close (ep)
do! r.Close (ep,ex)
return b
| Failure (CloseRetry ex) ->
// TODO: collect errors?
Log.trace "closing_and_retrying_after_failure|type=%s version=%i attempt=%i error=\"%O\""
r.Name ep.version rs.attempt ex
do! r.Close (ep)
do! r.Close (ep,ex)
let! rs' = RetryPolicy.awaitNextState rp rs
match rs' with
| None ->
Expand Down
8 changes: 4 additions & 4 deletions tests/kafunk.Tests/Consumer.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ let go = async {
//[KafkaUri.parse "localhost:9092" ; KafkaUri.parse "localhost:9093" ; KafkaUri.parse "localhost:9094"],
tcpConfig = chanConfig,
requestRetryPolicy = KafkaConfig.DefaultRequestRetryPolicy,
//version = Versions.V_0_10_1,
//autoApiVersions = true,
version = Versions.V_0_9_0,
autoApiVersions = false,
version = Versions.V_0_10_1,
autoApiVersions = true,
//version = Versions.V_0_9_0,
//autoApiVersions = false,
clientId = "leo")
Kafka.connAsync connConfig
let consumerConfig =
Expand Down
56 changes: 43 additions & 13 deletions tests/kafunk.Tests/Fetch.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,29 @@ open Kafunk
open System
open Refs

Log.MinLevel <- LogLevel.Trace
//Log.MinLevel <- LogLevel.Trace
let Log = Log.create __SOURCE_FILE__
let host = argiDefault 1 "localhost"
let topic = argiDefault 2 "absurd-topic"

let tcpConfig =
ChanConfig.create (
requestTimeout = TimeSpan.FromSeconds 60.0,
receiveBufferSize = 8192 * 50,
sendBufferSize = 8192 * 50,
connectRetryPolicy = ChanConfig.DefaultConnectRetryPolicy,
requestRetryPolicy = ChanConfig.DefaultRequestRetryPolicy)

let connCfg =
KafkaConfig.create (
[KafkaUri.parse host],
tcpConfig = tcpConfig,
version=Versions.V_0_10_1,
//autoApiVersions=true,
//version=Versions.V_0_9_0,
clientId = "leo"
)

let conn = Kafka.conn connCfg

//let offsetRange =
Expand Down Expand Up @@ -45,14 +56,14 @@ let conn = Kafka.conn connCfg

//let ps = [| (0,0L,0L,10000) ; (1,0L,0L,10000); (2,0L,0L,10000) ; (3,0L,0L,10000) |]
//let ps = [| (0,0L,-1L,10000000) ; (3,0L,-1L,10000000) |]
let ps = [| (3,0L,0L,10000) |]
//let ps = [| (3,0L,0L,10000) |]
//let ps = [| (3,0L,0L,100000) ; (0,0L,0L,100000) |]
//let ps = [| (0,25000L,0L,10000) ; (3,0L,0L,10000) |]
let fetchReq = FetchRequest(-1, 100, 0, [| topic, ps |], 10000000, 0y)
//let fetchReq = FetchRequest(-1, 100, 0, [| topic, ps |], 10000000, 0y)

let fetchRes =
Kafka.fetch conn fetchReq
|> Async.RunSynchronously
//let fetchRes =
// Kafka.fetch conn fetchReq
// |> Async.RunSynchronously

//for (tn,pmds) in fetchRes.topics do
// for p in pmds do
Expand All @@ -67,24 +78,43 @@ let fetchRes =
// //printfn "p=%i o=%i size=%i timestamp=%O key=%s value=%s" p o ms (DateTime.FromUnixMilliseconds m.timestamp) (m.key |> Binary.toString) (m.value |> Binary.toString)
// ()

let storm = async {
let go = async {

let! offsetRange = Offsets.offsetRange conn topic []

let maxBytesTotal = 10000000
let maxBytesPartition = 10000
let maxBytesPartition = 10000000

let rec fetchProc p o = async {
let rec fetch p o = async {
let ps = [| (p,o,0L,maxBytesPartition) |]
let req = FetchRequest(-1, 100, 1, [| topic, ps |], maxBytesTotal, 0y)
let! res = Kafka.fetch conn req
let (_,ps) = res.topics.[0]
let p = ps.[0]
let count = p.messageSet.messages.Length
if count = 0 then return (None,0) else
let lastOffset = p.messageSet.messages.[count - 1].offset
let nextOffset = lastOffset + 1L
return (Some nextOffset,count) }

use counter = Metrics.counter Log 5000

let fetch =
fetch
|> Metrics.throughputAsync2To counter (fun (_,_,(_,count)) -> count)

let rec fetchProc p o = async {
let! (nextOffset,count) = fetch p o
match nextOffset with
| None -> return ()
| Some next -> return! fetchProc p next }

return ()
}

return!
offsetRange
|> Map.toSeq
|> Seq.map (fun (p,(o,_)) -> fetchProc p o)
|> Async.Parallel
|> Async.Ignore }
|> Async.Ignore }

try Async.RunSynchronously go
with ex -> Log.error "%O" ex

0 comments on commit 091a14c

Please sign in to comment.