Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
properly assign record batch produce offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
eulerfx committed Mar 13, 2018
1 parent 242d3ed commit 2e35663
Show file tree
Hide file tree
Showing 17 changed files with 327 additions and 178 deletions.
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 0.1.15-alpha02 - 12.3.2018
* IMPROVE: support for v0.11 of broker protocol, including v5 of Fetch and v3 of Produce APIs
* BREAKING: FetchResponse item now explicit structure rather than tuple. Breaking only if using low-level Fetch API.

### 0.1.15-alpha01 - 5.3.2018
* BUG: consumer would enter an infinite loop between `offsets_out_of_range` and `resuming_fetch_from_reset_offsets`
* BUG: stalled consumers after TCP connection timeout
Expand Down
4 changes: 2 additions & 2 deletions src/kafunk/Chan.fs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ module internal Chan =
socketAgent
|> Resource.injectWithRecovery
RetryPolicy.none
(fun s buf -> Socket.sendAll s buf |> Async.Catch |> Async.map (Result.mapError (ResourceErrorAction.CloseRetry)))
(fun s buf -> Socket.sendAll s buf |> Async.Catch |> Async.map (Result.mapError (Some >> ResourceErrorAction.CloseRetry)))

let receive =
let receive s buf = async {
Expand Down Expand Up @@ -298,7 +298,7 @@ module internal Chan =
let send session req =
Session.send session req
|> Async.map (function
| None -> Failure (CloseRetry (exn "timeout"))
| None -> Failure (CloseRetry None)
| Some res -> Success (Success res))
sessionAgent
|> Resource.injectWithRecovery config.requestRetryPolicy send
Expand Down
44 changes: 25 additions & 19 deletions src/kafunk/Compression.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ let private createMessage (value:Value) (compression:byte) =
//Message.create value Binary.empty (Some attrs)
Message(0, 0y, attrs, 0L, Binary.empty, value)

let private ofMessage (m:Message) =
MessageSet([| MessageSetItem(0L, Message.Size m, m) |])

[<RequireQualifiedAccess>]
module internal Stream =

Expand Down Expand Up @@ -157,34 +160,37 @@ module Snappy =

#endif

let compress (compression:byte) (ms:MessageSet) =
let compress (magicByte:int8) (compression:byte) (ms:MessageSet) =
if magicByte >= 2y && compression <> CompressionCodec.None then
failwithf "compression=%i not supported on message_format=%i" compression magicByte else
match compression with
| CompressionCodec.None -> ms
| CompressionCodec.GZIP -> MessageSet.ofMessage (GZip.compress ms)
| CompressionCodec.GZIP -> ofMessage (GZip.compress ms)

#if !NETSTANDARD2_0
| CompressionCodec.Snappy -> MessageSet.ofMessage (Snappy.compress ms)
| CompressionCodec.Snappy -> ofMessage (Snappy.compress ms)
#endif

| _ -> failwithf "Incorrect compression codec %A" compression

let decompress (magicByte:int8) (ms:MessageSet) =
if ms.messages.Length = 0 then ms
else
ms.messages
|> Array.Parallel.collect (fun msi ->
match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with
| CompressionCodec.None -> [|msi|]
| CompressionCodec.GZIP ->
let decompressed = GZip.decompress magicByte msi.message
decompressed.messages

#if !NETSTANDARD2_0
| CompressionCodec.Snappy ->
let decompressed = Snappy.decompress magicByte msi.message
decompressed.messages
#endif

| c -> failwithf "compression_code=%i not supported" c)
|> MessageSet
let msis =
ms.messages
|> Array.Parallel.collect (fun msi ->
match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with
| CompressionCodec.None -> [|msi|]
| CompressionCodec.GZIP ->
let decompressed = GZip.decompress magicByte msi.message
decompressed.messages

#if !NETSTANDARD2_0
| CompressionCodec.Snappy ->
let decompressed = Snappy.decompress magicByte msi.message
decompressed.messages
#endif

| c -> failwithf "compression_code=%i not supported" c)
MessageSet (msis, ms.lastOffset) // NB: lastOffset mast be propagated for magicByte>=2y support

23 changes: 10 additions & 13 deletions src/kafunk/Consumer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -620,21 +620,21 @@ module Consumer =
(cfg:ConsumerConfig)
(magicByte:int8)
(topic:TopicName)
(partition,errorCode,highWatermark,_,_,_,messageSetSize,ms) =
match errorCode with
(p:FetchResponsePartitionItem) =
match p.errorCode with
| ErrorCode.NoError ->
if messageSetSize = 0 then Choice2Of4 (partition,highWatermark)
if p.messageSetSize = 0 then Choice2Of4 (p.partition,p.highWatermarkOffset)
else
let ms = Compression.decompress magicByte ms
if cfg.checkCrc then
MessageSet.CheckCrc (magicByte, ms)
Choice1Of4 (ConsumerMessageSet(topic, partition, ms, highWatermark))
let ms = Compression.decompress magicByte p.messageSet
if cfg.checkCrc && magicByte < 2y then
MessageSet.CheckCrc ms
Choice1Of4 (ConsumerMessageSet(topic, p.partition, ms, p.highWatermarkOffset))
| ErrorCode.OffsetOutOfRange ->
Choice3Of4 (partition,highWatermark)
Choice3Of4 (p.partition,p.highWatermarkOffset)
| ErrorCode.NotLeaderForPartition | ErrorCode.UnknownTopicOrPartition | ErrorCode.ReplicaNotAvailable ->
Choice4Of4 (partition)
Choice4Of4 (p.partition)
| _ ->
failwithf "unsupported fetch error_code=%i" errorCode
failwithf "unsupported fetch error_code=%i" p.errorCode

/// Fetches the specified offsets.
/// Returns a set of message sets and an end of topic list.
Expand Down Expand Up @@ -821,9 +821,6 @@ module Consumer =
let assignment = Array.map fst initOffsets
let cfg = c.config
let topic = cfg.topic
//use! _cnc = Async.OnCancel (fun () ->
// Log.info "cancelling_fetch_process|group_id=%s generation_id=%i member_id=%s topic=%s partition_count=%i"
// cfg.groupId state.state.generationId state.state.memberId topic (assignment.Length))
Log.info "starting_fetch_process|group_id=%s generation_id=%i member_id=%s topic=%s partition_count=%i"
cfg.groupId state.state.generationId state.state.memberId topic (assignment.Length)
return!
Expand Down
26 changes: 9 additions & 17 deletions src/kafunk/Helpers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,15 @@ open Kafunk
open System
open System.Text

//module Message =

// let create value key attrs =
// // NB: the CRC is computed by the Protocol module during encoding
// //Message(0, 0y, (defaultArg attrs 0y), DateTime.UtcNowUnixMilliseconds, key, value)
// Message(0, 0y, (defaultArg attrs 0y), 0L, key, value)

module MessageSet =

let t = DateTime.UtcNowUnixMilliseconds
//let t = DateTime.UtcNowUnixMilliseconds

let ofMessage (m:Message) =
MessageSet([| MessageSetItem(0L, Message.Size m, m) |])
//let ofMessage (m:Message) =
// MessageSet([| MessageSetItem(0L, Message.Size m, m) |])

let ofMessages ms =
MessageSet(ms |> Seq.map (fun m -> MessageSetItem (0L, Message.Size m, m)) |> Seq.toArray)
//let ofMessages ms =
// MessageSet(ms |> Seq.map (fun m -> MessageSetItem (0L, Message.Size m, m)) |> Seq.toArray)

/// Returns the frist offset in the message set.
let firstOffset (ms:MessageSet) =
Expand Down Expand Up @@ -211,13 +204,12 @@ module internal Printers =
|> Seq.map (fun (tn,ps) ->
let ps =
ps
|> Seq.map (fun (partition,errorCode,highWatermark,_,logStartOffset,_,messageSetSize,messageSet) ->
//let offsetInfo = ms.messages |> Seq.tryItem 0 |> Option.map (fun (o,_,_) -> sprintf " o=%i lag=%i" o (hwmo - o)) |> Option.getOr ""
|> Seq.map (fun p ->
let offsetInfo =
messageSet.messages
p.messageSet.messages
|> Seq.tryItem 0
|> Option.map (fun x -> sprintf " o=%i lag=%i" x.offset (highWatermark - x.offset)) |> Option.getOr ""
sprintf "(p=%i ec=%i lso=%i hwo=%i mss=%i%s)" partition errorCode logStartOffset highWatermark messageSetSize offsetInfo)
|> Option.map (fun x -> sprintf " o=%i lag=%i" x.offset (p.highWatermarkOffset - x.offset)) |> Option.getOr ""
sprintf "(p=%i ec=%i lso=%i hwo=%i mss=%i%s)" p.partition p.errorCode p.logStartOffset p.highWatermarkOffset p.messageSetSize offsetInfo)
|> String.concat ";"
sprintf "topic=%s partitions=[%s]" tn ps)
|> String.concat " ; "
Expand Down
6 changes: 4 additions & 2 deletions src/kafunk/Kafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,12 @@ type private RetryAction =
r.topics
|> Seq.tryPick (fun (topicName,partitionMetadata) ->
partitionMetadata
|> Seq.tryPick (fun (_,ec,_,_,_,_,_,_) ->
|> Seq.tryPick (fun p ->
let ec = p.errorCode
match ec with
| ErrorCode.NoError -> None
| ErrorCode.NotLeaderForPartition -> Some (ec, RetryAction.RefreshMetadataAndRetry [|topicName|])
| ErrorCode.NotLeaderForPartition | ErrorCode.UnknownTopicOrPartition ->
Some (ec, RetryAction.RefreshMetadataAndRetry [|topicName|])
| ec ->
RetryAction.errorRetryAction ec
|> Option.map (fun action -> ec, action)))
Expand Down
76 changes: 56 additions & 20 deletions src/kafunk/Producer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ type ProducerConfig = {
/// If this value is greater than 1, message ordering cannot be guaranteed.
maxInFlightRequests : int

/// The queue (network) type.
queueType : ProducerQueueType

} with

/// The default required acks = RequiredAcks.AllInSync.
Expand All @@ -171,9 +174,12 @@ type ProducerConfig = {
/// The default maximum number of in-flight requests per broker connection.
static member DefaultMaxInFlightRequests = 1

/// The default queue type = ProducerQueueType.QueuePerBroker.
static member DefaultQueueType = ProducerQueueType.QueuePerBroker

/// Creates a producer configuration.
static member create (topic:TopicName, partition:Partitioner, ?requiredAcks:RequiredAcks, ?compression:CompressionCodec,
?timeout:Timeout, ?bufferSizeBytes:int, ?batchSizeBytes, ?batchLingerMs, ?maxInFlightRequests) =
?timeout:Timeout, ?bufferSizeBytes:int, ?batchSizeBytes, ?batchLingerMs, ?maxInFlightRequests, ?queueType) =
{
topic = topic
partitioner = partition
Expand All @@ -184,8 +190,17 @@ type ProducerConfig = {
batchSizeBytes = defaultArg batchSizeBytes ProducerConfig.DefaultBatchSizeBytes
batchLingerMs = defaultArg batchLingerMs ProducerConfig.DefaultBatchLingerMs
maxInFlightRequests = defaultArg maxInFlightRequests ProducerConfig.DefaultMaxInFlightRequests
queueType = defaultArg queueType ProducerConfig.DefaultQueueType
}

/// Producer message queue type.
and ProducerQueueType =

/// A message queue per broker.
| QueuePerBroker

/// A message queue per partition.
| QueuePerPartition

/// Producer state corresponding to the state of a cluster.
[<NoEquality;NoComparison;AutoSerializable(false)>]
Expand Down Expand Up @@ -267,19 +282,22 @@ module Producer =
let mutable xs = Unchecked.defaultof<_>
if not (ps.TryGetValue(b.partition, &xs)) then
xs <- ResizeArray<_>()
ps.Add (b.partition, xs)
ps.Add (b.partition, xs)
for j = 0 to b.messages.Length - 1 do
let offset = int64 xs.Count // the offset of the message with respect to partition batj
let pm = b.messages.[j]
let m = Message(0, magicByte, 0y, 0L, pm.key, pm.value)
let ms =
if magicByte < 2y then Message.Size m
else Message.SizeRecord 0L 0 m
xs.Add (MessageSetItem(0L, ms, m))
else
// the offset delta is simply the offset of this message in the in-memory batch
Message.SizeRecord 0L (int offset) m
xs.Add (MessageSetItem(offset, ms, m))
let arr = Array.zeroCreate ps.Count
let mutable i = 0
for p in ps do
let ms = MessageSet(p.Value.ToArray())
let ms = ms |> Compression.compress compression
let ms = ms |> Compression.compress magicByte compression
let mss =
if magicByte < 2y then MessageSet.Size ms
else MessageSet.SizeRecordBatch ms
Expand All @@ -294,11 +312,19 @@ module Producer =
(magicByte:int8)
(b:Broker)
(batch:ProducerMessageBatch[]) = async {
//Log.trace "sending_batch|ep=%O batch_size_bytes=%i" (Broker.endpoint b) (batch |> Seq.sumBy (fun b -> b.size))
//Log.info "sending_batch|ep=%O batch_size_bytes=%i batch_count=%i partition_count=%i"
// (Broker.endpoint b) (batch |> Seq.sumBy (fun b -> b.size)) batch.Length (batch |> Seq.distinctBy (fun b -> b.partition) |> Seq.length)

let ps = Dictionary<Partition, ResizeArray<_>>()
let pms = toMessageSet magicByte cfg.compression batch ps

//Log.info "sending_batch|ep=%O batch_size_bytes=%i batch_count=%i partition_count=%i msg_count=%i"
// (Broker.endpoint b)
// (pms |> Seq.sumBy (fun pm -> pm.messageSetSize))
// pms.Length
// (pms |> Seq.distinctBy (fun pm -> pm.partition) |> Seq.length)
// (pms |> Seq.sumBy (fun pm -> pm.messageSet.messages.Length))

let req = ProduceRequest(cfg.requiredAcks, cfg.timeout, [| ProduceRequestTopicMessageSet (cfg.topic,pms) |])
let! res =
conn.SendToBroker (b, (RequestMessage.Produce req))
Expand Down Expand Up @@ -473,17 +499,26 @@ module Producer =
(add,proc)

let partitionQueues,queueProcs =
let qs = Dict.ofSeq []
routes.borkerByPartition
|> Seq.mapi (fun _ b ->
match Dict.tryGet b.nodeId qs with
| Some q -> q,None
| None ->
match cfg.queueType with
| ProducerQueueType.QueuePerPartition ->
routes.borkerByPartition
|> Seq.mapi (fun _ b ->
let q,proc = startBrokerQueue b
qs.Add (b.nodeId,q)
q,Some proc)
|> Seq.toArray
|> Array.unzip
|> Seq.toArray
|> Array.unzip
| ProducerQueueType.QueuePerBroker ->
let qs = Dict.ofSeq []
routes.borkerByPartition
|> Seq.mapi (fun _ b ->
match Dict.tryGet b.nodeId qs with
| Some q -> q,None
| None ->
let q,proc = startBrokerQueue b
qs.Add (b.nodeId,q)
q,Some proc)
|> Seq.toArray
|> Array.unzip

let queueProcs = queueProcs |> Seq.choose id |> Async.Parallel |> Async.Ignore

Expand Down Expand Up @@ -562,10 +597,10 @@ module Producer =
|> Seq.partitionChoices
if recovers.Length > 0 then
let ex = Exn.ofSeq (Seq.append recovers retries)
return Failure (Resource.ResourceErrorAction.CloseRetry ex)
return Failure (Resource.ResourceErrorAction.CloseRetry (Some ex))
else
let ex = Exn.ofSeq retries
return Failure (Resource.ResourceErrorAction.Retry ex)
return Failure (Resource.ResourceErrorAction.Retry (Some ex))
else
return Success oks }

Expand All @@ -578,12 +613,13 @@ module Producer =
| Success res -> return Success res
| Failure err ->
match producerErrorToRetryAction err with
| Choice1Of2 ex -> return Failure <| Resource.ResourceErrorAction.Retry ex
| Choice2Of2 ex -> return Failure <| Resource.ResourceErrorAction.CloseRetry ex }
| Choice1Of2 ex -> return Failure <| Resource.ResourceErrorAction.Retry (Some ex)
| Choice2Of2 ex -> return Failure <| Resource.ResourceErrorAction.CloseRetry (Some ex) }

/// Creates a producer.
let createAsync (conn:KafkaConn) (config:ProducerConfig) : Async<Producer> = async {
Log.info "initializing_producer|topic=%s" config.topic
Log.info "initializing_producer|topic=%s batch_size_bytes=%i batch_linger_ms=%i acks=%i compression=%i"
config.topic config.batchSizeBytes config.batchLingerMs config.requiredAcks config.compression
let batchTimeout =
let tcpReqTimeout = conn.Config.tcpConfig.requestTimeout
let prodReqTimeout = TimeSpan.FromMilliseconds config.timeout
Expand Down
Loading

0 comments on commit 2e35663

Please sign in to comment.