Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
properly assign record batch produce offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
eulerfx committed Mar 14, 2018
1 parent 242d3ed commit 5fccaad
Show file tree
Hide file tree
Showing 18 changed files with 398 additions and 219 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ Please also join the [F# Open Source Group](http://fsharp.github.com)

## Version Support

| Version | Status |
| ----------|----------|
| 0.9.0 | Complete |
| 0.10.0 | Complete |
| 0.10.1 | Complete |
| Version | Status |
| -----------|----------|
| 0.9.0 | Complete |
| 0.10.0 | Complete |
| 0.10.1 | Complete |
| 0.11+auto | Protocol |

## Feature Support

Expand All @@ -28,6 +29,7 @@ Please also join the [F# Open Source Group](http://fsharp.github.com)
| TLS | https://github.com/jet/kafunk/issues/66 |
| SASL | https://github.com/jet/kafunk/issues/139 |
| ACL | https://github.com/jet/kafunk/issues/140 |
| TXNS | https://github.com/jet/kafunk/issues/214 |


## Hello World
Expand Down
5 changes: 5 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### 0.1.15-alpha02 - 14.3.2018
* IMPROVE: support for v0.11 of broker protocol, including v5 of Fetch and v3 of Produce APIs
* BREAKING: FetchResponse item now explicit structure rather than tuple. Breaking only if using low-level Fetch API.
* FEATURE: ProducerQueueType to allow configuration of producer message queue networks.

### 0.1.15-alpha01 - 5.3.2018
* BUG: consumer would enter an infinite loop between `offsets_out_of_range` and `resuming_fetch_from_reset_offsets`
* BUG: stalled consumers after TCP connection timeout
Expand Down
4 changes: 2 additions & 2 deletions src/kafunk/Chan.fs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ module internal Chan =
socketAgent
|> Resource.injectWithRecovery
RetryPolicy.none
(fun s buf -> Socket.sendAll s buf |> Async.Catch |> Async.map (Result.mapError (ResourceErrorAction.CloseRetry)))
(fun s buf -> Socket.sendAll s buf |> Async.Catch |> Async.map (Result.mapError (Some >> ResourceErrorAction.CloseRetry)))

let receive =
let receive s buf = async {
Expand Down Expand Up @@ -298,7 +298,7 @@ module internal Chan =
let send session req =
Session.send session req
|> Async.map (function
| None -> Failure (CloseRetry (exn "timeout"))
| None -> Failure (CloseRetry None)
| Some res -> Success (Success res))
sessionAgent
|> Resource.injectWithRecovery config.requestRetryPolicy send
Expand Down
62 changes: 38 additions & 24 deletions src/kafunk/Compression.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ let private createMessage (value:Value) (compression:byte) =
//Message.create value Binary.empty (Some attrs)
Message(0, 0y, attrs, 0L, Binary.empty, value)

let private ofMessage (m:Message) =
MessageSet([| MessageSetItem(0L, Message.Size m, m) |])

[<RequireQualifiedAccess>]
module internal Stream =

Expand Down Expand Up @@ -157,34 +160,45 @@ module Snappy =

#endif

let compress (compression:byte) (ms:MessageSet) =
let compress (magicByte:int8) (compression:byte) (ms:MessageSet) =
match compression with
| CompressionCodec.None -> ms
| CompressionCodec.GZIP -> MessageSet.ofMessage (GZip.compress ms)

#if !NETSTANDARD2_0
| CompressionCodec.Snappy -> MessageSet.ofMessage (Snappy.compress ms)
#endif

| _ -> failwithf "Incorrect compression codec %A" compression
| _ ->
if magicByte >= 2y then
failwithf "compression=%i not supported on message_format=%i" compression magicByte else
// assign offsets
let ms =
ms.messages
|> Array.mapi (fun i msi -> MessageSetItem(int64 i, msi.messageSize, msi.message))
|> MessageSet
match compression with
| CompressionCodec.None -> ms
| CompressionCodec.GZIP -> ofMessage (GZip.compress ms)

#if !NETSTANDARD2_0
| CompressionCodec.Snappy -> ofMessage (Snappy.compress ms)
#endif

| _ -> failwithf "Incorrect compression codec %A" compression

let decompress (magicByte:int8) (ms:MessageSet) =
if ms.messages.Length = 0 then ms
else
ms.messages
|> Array.Parallel.collect (fun msi ->
match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with
| CompressionCodec.None -> [|msi|]
| CompressionCodec.GZIP ->
let decompressed = GZip.decompress magicByte msi.message
decompressed.messages

#if !NETSTANDARD2_0
| CompressionCodec.Snappy ->
let decompressed = Snappy.decompress magicByte msi.message
decompressed.messages
#endif

| c -> failwithf "compression_code=%i not supported" c)
|> MessageSet
let msis =
ms.messages
|> Array.Parallel.collect (fun msi ->
match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with
| CompressionCodec.None -> [|msi|]
| CompressionCodec.GZIP ->
let decompressed = GZip.decompress magicByte msi.message
decompressed.messages

#if !NETSTANDARD2_0
| CompressionCodec.Snappy ->
let decompressed = Snappy.decompress magicByte msi.message
decompressed.messages
#endif

| c -> failwithf "compression_code=%i not supported" c)
MessageSet (msis, ms.lastOffset) // NB: lastOffset mast be propagated for magicByte>=2y support

43 changes: 25 additions & 18 deletions src/kafunk/Consumer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -620,21 +620,21 @@ module Consumer =
(cfg:ConsumerConfig)
(magicByte:int8)
(topic:TopicName)
(partition,errorCode,highWatermark,_,_,_,messageSetSize,ms) =
match errorCode with
(p:FetchResponsePartitionItem) =
match p.errorCode with
| ErrorCode.NoError ->
if messageSetSize = 0 then Choice2Of4 (partition,highWatermark)
if p.messageSetSize = 0 then Choice2Of4 (p.partition,p.highWatermarkOffset)
else
let ms = Compression.decompress magicByte ms
if cfg.checkCrc then
MessageSet.CheckCrc (magicByte, ms)
Choice1Of4 (ConsumerMessageSet(topic, partition, ms, highWatermark))
let ms = Compression.decompress magicByte p.messageSet
if cfg.checkCrc && magicByte < 2y then
MessageSet.CheckCrc ms
Choice1Of4 (ConsumerMessageSet(topic, p.partition, ms, p.highWatermarkOffset))
| ErrorCode.OffsetOutOfRange ->
Choice3Of4 (partition,highWatermark)
Choice3Of4 (p.partition,p.highWatermarkOffset)
| ErrorCode.NotLeaderForPartition | ErrorCode.UnknownTopicOrPartition | ErrorCode.ReplicaNotAvailable ->
Choice4Of4 (partition)
Choice4Of4 (p.partition)
| _ ->
failwithf "unsupported fetch error_code=%i" errorCode
failwithf "unsupported fetch error_code=%i" p.errorCode

/// Fetches the specified offsets.
/// Returns a set of message sets and an end of topic list.
Expand Down Expand Up @@ -771,7 +771,6 @@ module Consumer =
/// multiplexed stream of all fetch responses for this consumer
let private fetchStream (c:Consumer) state initOffsets =
let cfg = c.config
let topic = cfg.topic
let initRetryQueue = RetryQueue.create cfg.endOfTopicPollPolicy fst
(initOffsets, initRetryQueue)
|> AsyncSeq.unfoldAsync
Expand All @@ -792,15 +791,26 @@ module Consumer =
return None
| Some (mss,ends) ->

let nextOffsets =
mss
|> Array.map (fun ms -> ms.partition, MessageSet.nextOffset ms.messageSet ms.highWatermarkOffset)

//let ends,mss =
// // if the current offsets match the next offsets, assume we are at the end
// if nextOffsets.Length > 0 && nextOffsets = offsets then
// let firstOffsets =
// mss
// |> Array.map (fun ms -> ms.partition, MessageSet.firstOffset ms.messageSet)
// Log.warn "same_offsets|last=%A first=%A ends=%A" nextOffsets firstOffsets ends
// nextOffsets,[||]
// else
// ends,mss

let retryQueue =
RetryQueue.retryRemoveAll
retryQueue
ends
(mss |> Seq.map (fun ms -> ms.partition))

let nextOffsets =
mss
|> Array.map (fun mb -> mb.partition, MessageSet.nextOffset mb.messageSet mb.highWatermarkOffset)

//if ends.Length > 0 then
// let msg =
Expand All @@ -821,9 +831,6 @@ module Consumer =
let assignment = Array.map fst initOffsets
let cfg = c.config
let topic = cfg.topic
//use! _cnc = Async.OnCancel (fun () ->
// Log.info "cancelling_fetch_process|group_id=%s generation_id=%i member_id=%s topic=%s partition_count=%i"
// cfg.groupId state.state.generationId state.state.memberId topic (assignment.Length))
Log.info "starting_fetch_process|group_id=%s generation_id=%i member_id=%s topic=%s partition_count=%i"
cfg.groupId state.state.generationId state.state.memberId topic (assignment.Length)
return!
Expand Down
26 changes: 5 additions & 21 deletions src/kafunk/Helpers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,8 @@ open Kafunk
open System
open System.Text

//module Message =

// let create value key attrs =
// // NB: the CRC is computed by the Protocol module during encoding
// //Message(0, 0y, (defaultArg attrs 0y), DateTime.UtcNowUnixMilliseconds, key, value)
// Message(0, 0y, (defaultArg attrs 0y), 0L, key, value)

module MessageSet =

let t = DateTime.UtcNowUnixMilliseconds

let ofMessage (m:Message) =
MessageSet([| MessageSetItem(0L, Message.Size m, m) |])

let ofMessages ms =
MessageSet(ms |> Seq.map (fun m -> MessageSetItem (0L, Message.Size m, m)) |> Seq.toArray)

/// Returns the frist offset in the message set.
let firstOffset (ms:MessageSet) =
if ms.messages.Length > 0 then
Expand All @@ -41,7 +26,7 @@ module MessageSet =
/// Ensures the next offset is bellow high watermark offset.
let nextOffset (ms:MessageSet) (hwm:HighwaterMarkOffset) : Offset =
let lastOffset = lastOffset ms
let nextOffset = lastOffset + 1L
let nextOffset = lastOffset + 1L
if nextOffset <= hwm then
nextOffset
else
Expand Down Expand Up @@ -211,13 +196,12 @@ module internal Printers =
|> Seq.map (fun (tn,ps) ->
let ps =
ps
|> Seq.map (fun (partition,errorCode,highWatermark,_,logStartOffset,_,messageSetSize,messageSet) ->
//let offsetInfo = ms.messages |> Seq.tryItem 0 |> Option.map (fun (o,_,_) -> sprintf " o=%i lag=%i" o (hwmo - o)) |> Option.getOr ""
|> Seq.map (fun p ->
let offsetInfo =
messageSet.messages
p.messageSet.messages
|> Seq.tryItem 0
|> Option.map (fun x -> sprintf " o=%i lag=%i" x.offset (highWatermark - x.offset)) |> Option.getOr ""
sprintf "(p=%i ec=%i lso=%i hwo=%i mss=%i%s)" partition errorCode logStartOffset highWatermark messageSetSize offsetInfo)
|> Option.map (fun x -> sprintf " o=%i lag=%i" x.offset (p.highWatermarkOffset - x.offset)) |> Option.getOr ""
sprintf "(p=%i ec=%i lso=%i hwo=%i mss=%i%s)" p.partition p.errorCode p.logStartOffset p.highWatermarkOffset p.messageSetSize offsetInfo)
|> String.concat ";"
sprintf "topic=%s partitions=[%s]" tn ps)
|> String.concat " ; "
Expand Down
Loading

0 comments on commit 5fccaad

Please sign in to comment.