Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eulerfx committed Mar 14, 2018
1 parent 030eb03 commit d756fd8
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 30 deletions.
58 changes: 37 additions & 21 deletions src/kafunk/Kafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,14 @@ type internal ClusterState = {
match Map.tryFind nodeId brokersById with
| Some b -> Some ((t,p),b)
| None -> None)
let removeTopicPartitions =
topicNodes |> Seq.choose (fun (t,p,n) -> if n < 0 then Some (t,n) else None)
{ s with
brokersByNodeId = brokersById
brokersByTopicPartition = s.brokersByTopicPartition |> Map.addMany brokersByPartitions
brokersByTopicPartition =
s.brokersByTopicPartition
|> Map.addMany brokersByPartitions
|> Map.removeAll removeTopicPartitions
version = s.version + 1 }

static member updateGroupCoordinator (broker:Broker, gid:GroupId) (s:ClusterState) =
Expand Down Expand Up @@ -339,6 +344,8 @@ type private RetryAction =

| ErrorCode.UnknownTopicOrPartition ->
Some (RetryAction.Escalate)
//Some (RetryAction.RefreshMetadataAndRetry)

| ErrorCode.InvalidMessage ->
Some (RetryAction.Escalate)
| _ ->
Expand All @@ -348,9 +355,13 @@ type private RetryAction =
match res with
| ResponseMessage.MetadataResponse r ->
r.topicMetadata
|> Seq.tryPick (fun x ->
RetryAction.errorRetryAction x.topicErrorCode
|> Option.map (fun action -> x.topicErrorCode,action))
|> Seq.tryPick (fun x ->
match x.topicErrorCode with
| ErrorCode.UnknownTopicOrPartition ->
Some (x.topicErrorCode,RetryAction.RefreshMetadataAndRetry [|x.topicName|])
| _ ->
RetryAction.errorRetryAction x.topicErrorCode
|> Option.map (fun action -> x.topicErrorCode,action))

| ResponseMessage.OffsetResponse r ->
r.topics
Expand Down Expand Up @@ -512,8 +523,8 @@ type KafkaConfig = {
/// The default Kafka server version = 0.10.1.
static member DefaultVersion = Version.Parse "0.10.1"

/// The default setting for supporting auto API versions = false.
static member DefaultAutoApiVersions = false
/// The default setting for supporting auto API versions = true.
static member DefaultAutoApiVersions = true

/// The default broker channel configuration.
static member DefaultChanConfig = ChanConfig.create ()
Expand Down Expand Up @@ -558,6 +569,8 @@ type KafkaConn internal (cfg:KafkaConfig) =
do Log.info "created_conn|api_version=%O auto_api_versions=%b client_version=%O client_id=%s conn_id=%s"
cfg.version cfg.autoApiVersions (Assembly.executingAssemblyVersion ()) cfg.clientId cfg.connId

do if String.IsNullOrEmpty cfg.clientId then Log.warn "client_id_unspecified"

let apiVersion = ref (Versions.byVersion cfg.version)
let stateCell : MVar<ClusterState> = MVar.createFull (ClusterState.Zero)
let cts = new CancellationTokenSource()
Expand Down Expand Up @@ -674,10 +687,10 @@ type KafkaConn internal (cfg:KafkaConfig) =
else getAndApplyBootstrap

/// Fetches metadata and returns an updated connection state.
and metadata (state:ClusterState) (topics:TopicName[]) = async {
and metadata (state:ClusterState) (rs:RetryState) (topics:TopicName[]) = async {

let send =
routeToBrokerWithRecovery true RetryState.init state
routeToBrokerWithRecovery true rs state
|> AsyncFunc.dimap RequestMessage.Metadata ResponseMessage.toMetadata

let! metadata = send (MetadataRequest(topics))
Expand All @@ -687,7 +700,7 @@ type KafkaConn internal (cfg:KafkaConfig) =
for tmd in metadata.topicMetadata do
for pmd in tmd.partitionMetadata do
if pmd.leader = -1 then
Log.error "leaderless_partition_detected|partition=%i error_code=%i" pmd.partitionId pmd.partitionErrorCode
Log.error "leaderless_partition_detected|topic=%s partition=%i error_code=%i" tmd.topicName pmd.partitionId pmd.partitionErrorCode

let topicNodes =
metadata.topicMetadata
Expand All @@ -699,7 +712,7 @@ type KafkaConn internal (cfg:KafkaConfig) =
return state |> ClusterState.updateMetadata (metadata.brokers, topicNodes) }

/// Fetches and applies metadata to the current connection.
and getAndApplyMetadata (requireMatchingCaller:bool) (callerState:ClusterState) (topics:TopicName[]) =
and getAndApplyMetadata (requireMatchingCaller:bool) (callerState:ClusterState) (rs:RetryState) (topics:TopicName[]) =
stateCell
|> MVar.updateAsync (fun (currentState:ClusterState) -> async {
if requireMatchingCaller
Expand All @@ -708,7 +721,7 @@ type KafkaConn internal (cfg:KafkaConfig) =
Log.trace "skipping_metadata_update|current_version=%i caller_version=%i" currentState.version callerState.version
return currentState
else
return! metadata currentState topics })
return! metadata currentState rs topics })

/// Refreshes metadata for existing topics.
and refreshMetadata (critical:bool) (callerState:ClusterState) =
Expand All @@ -721,8 +734,8 @@ type KafkaConn internal (cfg:KafkaConfig) =
and refreshMetadataFor (critical:bool) (callerState:ClusterState) topics =
Log.info "refreshing_metadata|topics=%A version=%i bootstrap_broker=%A conn_id=%s"
topics callerState.version (callerState.bootstrapBroker |> Option.map (Broker.endpoint)) cfg.connId
if critical then metadata callerState topics
else getAndApplyMetadata true callerState topics
if critical then metadata callerState RetryState.init topics
else getAndApplyMetadata true callerState RetryState.init topics

/// Fetches group coordinator metadata.
and groupCoordinator (state:ClusterState) (groupId:GroupId) = async {
Expand Down Expand Up @@ -799,6 +812,9 @@ type KafkaConn internal (cfg:KafkaConfig) =
else removeBrokerAndApply b state
return Failure [ChanError.ChanFailure ex]
| Failure errs ->
let! _state =
if critical then ClusterState.removeBroker b state
else removeBrokerAndApply b state
return Failure errs }

/// Sends a request to a specific broker and handles failures.
Expand All @@ -812,21 +828,21 @@ type KafkaConn internal (cfg:KafkaConfig) =
| None ->
return res
| Some (errorCode,action) ->
Log.warn "channel_response_errored|endpoint=%O error_code=%i retry_action=%A req=%s res=%s conn_id=%s"
(Broker.endpoint b) errorCode action (RequestMessage.Print req) (ResponseMessage.Print res) cfg.connId
Log.warn "channel_response_errored|endpoint=%O error_code=%i retry_action=%A attempt=%i req=%s res=%s conn_id=%s"
(Broker.endpoint b) errorCode action rs.attempt (RequestMessage.Print req) (ResponseMessage.Print res) cfg.connId
match action with
| RetryAction.PassThru ->
return res
| RetryAction.Escalate ->
return raise (EscalationException (errorCode,req,res,(sprintf "endpoint=%O" (Broker.endpoint b))))
| RetryAction.RefreshMetadataAndRetry topics ->
| RetryAction.RefreshMetadataAndRetry topics ->
let! rs' = RetryPolicy.awaitNextState cfg.requestRetryPolicy rs
match rs' with
| Some rs ->
| Some rs' ->
let! state' =
if critical then metadata state topics
else getAndApplyMetadata true state topics
return! routeToBrokerWithRecovery critical rs state' req
if critical then metadata state rs' topics
else getAndApplyMetadata true state rs' topics
return! routeToBrokerWithRecovery critical rs' state' req
| None ->
return failwithf "request_failure|attempt=%i request=%s response=%s"
rs.attempt (RequestMessage.Print req) (ResponseMessage.Print res)
Expand Down Expand Up @@ -951,7 +967,7 @@ type KafkaConn internal (cfg:KafkaConfig) =

member internal __.GetMetadataState (topics:TopicName[]) = async {
let! state = MVar.get stateCell
return! getAndApplyMetadata false state topics }
return! getAndApplyMetadata false state RetryState.init topics }

member internal __.GetMetadata (topics:TopicName[]) = async {
let! state' = __.GetMetadataState topics
Expand Down
8 changes: 4 additions & 4 deletions tests/kafunk.Tests/Consumer.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ let go = async {
//[KafkaUri.parse "localhost:9092" ; KafkaUri.parse "localhost:9093" ; KafkaUri.parse "localhost:9094"],
tcpConfig = chanConfig,
requestRetryPolicy = KafkaConfig.DefaultRequestRetryPolicy,
//version = Versions.V_0_10_1,
//autoApiVersions = true,
version = Versions.V_0_9_0,
autoApiVersions = false,
version = Versions.V_0_10_1,
autoApiVersions = true,
//version = Versions.V_0_9_0,
//autoApiVersions = false,
clientId = "leo")
Kafka.connAsync connConfig
let consumerConfig =
Expand Down
10 changes: 5 additions & 5 deletions tests/kafunk.Tests/Producer.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ let connCfg =
//requestRetryPolicy = RetryPolicy.constantBoundedMs 1000 10,
bootstrapConnectRetryPolicy = KafkaConfig.DefaultBootstrapConnectRetryPolicy,
//bootstrapConnectRetryPolicy = RetryPolicy.constantBoundedMs 1000 3,
version = Versions.V_0_9_0,
autoApiVersions = false
//version = Versions.V_0_10_1,
//autoApiVersions = true
//version = Versions.V_0_9_0,
//autoApiVersions = false
version = Versions.V_0_10_1,
autoApiVersions = true
)

let conn = Kafka.conn connCfg
Expand All @@ -72,7 +72,7 @@ let producerCfg =
bufferSizeBytes = ProducerConfig.DefaultBufferSizeBytes,
batchSizeBytes = 100000, //ProducerConfig.DefaultBatchSizeBytes,
batchLingerMs = 100,
compression = CompressionCodec.Snappy,
compression = CompressionCodec.None,
maxInFlightRequests = 1
)

Expand Down

0 comments on commit d756fd8

Please sign in to comment.