Skip to content

Latest commit

 

History

History
197 lines (154 loc) · 7.5 KB

provider_kafka.md

File metadata and controls

197 lines (154 loc) · 7.5 KB

Apache Kafka Provider for SlimMessageBus

Please read the Introduction before reading this provider documentation.

Underlying client

The SMB Kafka implementation uses confluent-kafka-dotnet .NET wrapper around the native librdkafka library.

When troubleshooting or fine tuning it is worth reading the librdkafka and confluent-kafka-dotnet docs:

Configuration properties

Producer, consumer and global configuration properties are described here. The configuration on the underlying Kafka client can be adjusted like so:

var mbb = MessageBusBuilder.Create()
   // ...
   .WithProviderKafka(new KafkaMessageBusSettings(kafkaBrokers)
   {
      ProducerConfig = (config) =>
      {
         // adjust he producer config
      },
      ConsumerConfig = (config) => 
      {
         // adjust he consumer config
      }
   });

Minimizing message latency

There is a good description here on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus:

var messageBusBuilder = MessageBusBuilder.Create()
   // ...
   .WithProviderKafka(new KafkaMessageBusSettings(kafkaBrokers)
   {
      ProducerConfig = (config) =>
      {
         config.LingerMs = 5; // 5ms
         config.SocketNagleDisable = true;
      },
      ConsumerConfig = (config) => 
      {
         config.FetchErrorBackoffMs = 1;
         config.SocketNagleDisable = true;
      }
   });

There is also a good discussion around latency in this issue.

More documentation here:

SSL and password authentication

Example on how to configure SSL with SASL authentication (for cloudkarafka.com):

var messageBusBuilder = MessageBusBuilder.Create()
   // ...
   .WithProviderKafka(new KafkaMessageBusSettings(kafkaBrokers)
   {
      ProducerConfig = (config) =>
      {
         AddSsl(kafkaUsername, kafkaPassword, config);
      },
      ConsumerConfig = (config) => 
      {
         AddSsl(kafkaUsername, kafkaPassword, config);
      }
   });
private static void AddSsl(string username, string password, ClientConfig c)
{
   c.SecurityProtocol = SecurityProtocol.SaslSsl;
   c.SaslUsername = username;
   c.SaslPassword = password;
   c.SaslMechanism = SaslMechanism.ScramSha256;
   c.SslCaLocation = "cloudkarafka_2020-12.ca";
}

The file cloudkarafka_2020-12.ca has to be set to Copy to Output Directory as Copy always.

Selecting message partition for topic producer

Kafka topics are broken into partitions. The question is how does SMB Kafka choose the partition to assign the message? There are two possible options:

Default partitioner with message key

Currently the confluent-kafka-dotnet does not support custom partitioners (see here). The default partitioner is supported, which works in this way:

  • when message key is not provided then partition is assigned using round-robin,
  • when message key is provided then same partition is assigned to same key

SMB Kafka allows to set a provider (selector) that will assign the message key for a given message and topic pair. Here is an example:

IMessageBus messageBus = MessageBusBuilder.Create()
   .Produce<MultiplyRequest>(x => 
   {
      x.DefaultTopic("topic1");
      // Message key could be set for the message
      x.KeyProvider((request, topic) => Encoding.ASCII.GetBytes((request.Left + request.Right).ToString()));
   })
   .WithProviderKafka(new KafkaMessageBusSettings(kafkaBrokers))
   .Build();

The key must be a byte[].

Assigning partition explicitly

SMB Kafka allows to set a provider (selector) that will assign the partition number for a given message and topic pair. Here is an example:

IMessageBus messageBus = MessageBusBuilder.Create()
   .Produce<PingMessage>(x =>
   {
      x.DefaultTopic("topic1");
      // Partition #0 for even counters
      // Partition #1 for odd counters
      x.PartitionProvider((message, topic) => message.Counter % 2);
   })
   .WithProviderKafka(new KafkaMessageBusSettings(kafkaBrokers))
   .Build();

With this approach your provider needs to know the number of partitions for a topic.

Consumer context

The consumer can implement the IConsumerWithContext interface to access the Kafka native message:

public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
   public IConsumerContext Context { get; set; }

   public Task OnHandle(PingMessage message, string path)
   {
      // SMB Kafka transport specific extension:
      var transportMessage = Context.GetTransportMessage();
      var partition = transportMessage.TopicPartition.Partition;
   }
}

This could be useful to extract the message's offset or partition.

Message Headers

SMB uses headers to pass additional metadata information with the message. This includes the MessageType (of type string) or in the case of request/response messages the RequestId (of type string), ReplyTo (of type string) and Expires (of type long).

The Kafka message header values are natively binary (byte[]) in the underlying .NET client, as a result SMB needs to serialize the header values. By default the same serializer is used to serialize header values as is being used for the message serialization. If you need to specify a different serializer provide a specfic IMessageSerializer implementation (custom or one of the available serialization plugins):

IMessageBus messageBus = MessageBusBuilder.Create()
   .WithProviderKafka(new KafkaMessageBusSettings(kafkaBrokers)
   {
      HeaderSerializer = new JsonMessageSerializer() // specify a different header values serializer
   })
   .Buid();

By default for header serialization (if not specified) SMB Kafka uses the same serializer that was set for the bus.

Deployment

The librdkafka distribution for Windows requires Visual C++ Redistributable for 2013 installed on the server. More information can be found here.