Skip to content

Commit

Permalink
Improved Kafka dissector. (#2456)
Browse files Browse the repository at this point in the history
 * detect more Kafka request packet's
 * requires less flow memory
 * same detection behavior as before e.g. no asym detection implemented
   (can be done by dissecting responses, requires more effort)

Signed-off-by: Toni Uhlig <[email protected]>
Co-authored-by: Nardi Ivan <[email protected]>
  • Loading branch information
utoni and IvanNardi authored May 27, 2024
1 parent d1a59c0 commit abce6d4
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 35 deletions.
6 changes: 0 additions & 6 deletions src/include/ndpi_typedefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -835,9 +835,6 @@ struct ndpi_flow_tcp_struct {
/* NDPI_PROTOCOL_SSH */
u_int32_t ssh_stage:3;

/* NDPI_PROTOCOL_KAFKA */
u_int32_t kafka_stage:1;

/* NDPI_PROTOCOL_VNC */
u_int32_t vnc_stage:2; // 0 - 3

Expand Down Expand Up @@ -891,9 +888,6 @@ struct ndpi_flow_tcp_struct {

/* NDPI_PROTOCOL_RADMIN */
u_int32_t radmin_stage:1;

/* NDPI_PROTOCOL_KAFKA */
u_int32_t kafka_correlation_id;
};

/* ************************************************** */
Expand Down
51 changes: 32 additions & 19 deletions src/lib/protocols/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
#include "ndpi_api.h"
#include "ndpi_private.h"

static void ndpi_int_kafka_add_connection(struct ndpi_detection_module_struct *ndpi_struct,
struct ndpi_flow_struct *flow)
{
NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
}

static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct,
struct ndpi_flow_struct *flow)
{
Expand All @@ -41,32 +49,37 @@ static void ndpi_search_kafka(struct ndpi_detection_module_struct *ndpi_struct,
* API keys: https://kafka.apache.org/protocol.html#protocol_api_keys
* API versions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+APIs
*/
if (packet->payload_packet_len > 40 &&
ntohl(get_u_int32_t(packet->payload, 0)) == (u_int32_t)(packet->payload_packet_len-4))
if (packet->payload_packet_len < 8 /* min. required packet length */ ||
ntohl(get_u_int32_t(packet->payload, 0)) != (uint32_t)(packet->payload_packet_len - 4))
{
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}

/* Request */
if (ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */
ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */)
{
/* Request */
if (!flow->l4.tcp.kafka_stage &&
current_pkt_from_client_to_server(ndpi_struct, flow) &&
ntohs(get_u_int16_t(packet->payload, 4)) < 75 && /* API key */
ntohs(get_u_int16_t(packet->payload, 6)) < 16 /* API version */)
if (packet->payload_packet_len < 14)
{
flow->l4.tcp.kafka_correlation_id = ntohl(get_u_int16_t(packet->payload, 8));
flow->l4.tcp.kafka_stage = 1;
return;
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}

/* Response */
if (flow->l4.tcp.kafka_stage == 1 &&
current_pkt_from_server_to_client(ndpi_struct, flow))
const uint16_t client_id_len = ntohs(get_u_int16_t(packet->payload, 12));
if (client_id_len + 12 + 2 > packet->payload_packet_len)
{
if (ntohl(get_u_int16_t(packet->payload, 4)) == flow->l4.tcp.kafka_correlation_id)
{
NDPI_LOG_INFO(ndpi_struct, "found Apache Kafka\n");
ndpi_set_detected_protocol(ndpi_struct, flow, NDPI_PROTOCOL_APACHE_KAFKA,
NDPI_PROTOCOL_UNKNOWN, NDPI_CONFIDENCE_DPI);
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}
}
if (ndpi_is_printable_buffer(&packet->payload[14], client_id_len) == 0)
{
NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
return;
}

ndpi_int_kafka_add_connection(ndpi_struct, flow);
return;
}

NDPI_EXCLUDE_PROTO(ndpi_struct, flow);
Expand Down
Binary file modified tests/cfgs/default/pcap/kafka.pcapng
Binary file not shown.
31 changes: 21 additions & 10 deletions tests/cfgs/default/result/kafka.pcapng.out
Original file line number Diff line number Diff line change
@@ -1,27 +1,38 @@
DPI Packets (TCP): 6 (6.00 pkts/flow)
Confidence DPI : 1 (flows)
Num dissector calls: 150 (150.00 diss/flow)
Guessed flow protos: 1

DPI Packets (TCP): 16 (1.78 pkts/flow)
Confidence Match by port : 1 (flows)
Confidence DPI : 8 (flows)
Num dissector calls: 222 (24.67 diss/flow)
LRU cache ookla: 0/0/0 (insert/search/found)
LRU cache bittorrent: 0/0/0 (insert/search/found)
LRU cache bittorrent: 0/3/0 (insert/search/found)
LRU cache stun: 0/0/0 (insert/search/found)
LRU cache tls_cert: 0/0/0 (insert/search/found)
LRU cache mining: 0/0/0 (insert/search/found)
LRU cache mining: 0/1/0 (insert/search/found)
LRU cache msteams: 0/0/0 (insert/search/found)
LRU cache stun_zoom: 0/0/0 (insert/search/found)
Automa host: 0/0 (search/found)
Automa domain: 0/0 (search/found)
Automa tls cert: 0/0 (search/found)
Automa risk mask: 0/0 (search/found)
Automa common alpns: 0/0 (search/found)
Patricia risk mask: 0/0 (search/found)
Patricia risk mask: 14/0 (search/found)
Patricia risk mask IPv6: 0/0 (search/found)
Patricia risk: 0/0 (search/found)
Patricia risk IPv6: 0/0 (search/found)
Patricia protocols: 2/0 (search/found)
Patricia protocols: 18/0 (search/found)
Patricia protocols IPv6: 0/0 (search/found)

Kafka 19 2237 1
Kafka 41 7067 9

Acceptable 19 2237 1
Acceptable 41 7067 9

1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 6][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
1 TCP 127.0.0.1:46136 <-> 127.0.0.1:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 4][cat: RPC/16][12 pkts/1107 bytes <-> 7 pkts/1130 bytes][Goodput ratio: 28/58][13.63 sec][bytes ratio: -0.010 (Mixed)][IAT c2s/s2c min/avg/max/stddev: 0/0 800/288 6849/1049 2039/441][Pkt Len c2s/s2c min/avg/max/stddev: 66/66 92/161 206/512 42/149][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: TCP connection with unidirectional traffic][PLAIN TEXT (console)][Plen Bins: 12,38,12,12,12,0,0,0,0,0,0,0,0,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
2 TCP 172.16.17.101:38176 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][3 pkts/1408 bytes <-> 2 pkts/254 bytes][Goodput ratio: 86/48][0.58 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (timestamp)][Plen Bins: 0,40,0,0,0,0,0,0,0,0,0,40,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
3 TCP 172.30.0.237:9092 <-> 172.16.17.101:58052 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: Match by port][DPI packets: 5][cat: RPC/16][4 pkts/974 bytes <-> 1 pkts/110 bytes][Goodput ratio: 73/40][599.70 sec][PLAIN TEXT (172.30.0.237)][Plen Bins: 0,20,0,60,0,0,0,0,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
4 TCP 172.16.17.101:49280 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][2 pkts/201 bytes <-> 3 pkts/788 bytes][Goodput ratio: 34/75][899.84 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (172.30.0.237)][Plen Bins: 20,20,0,40,0,0,0,0,0,0,20,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
5 TCP 172.16.17.101:56556 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes <-> 1 pkts/416 bytes][Goodput ratio: 27/84][0.03 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 50,0,0,0,0,0,0,0,0,0,50,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
6 TCP 172.16.17.101:40042 <-> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/110 bytes <-> 1 pkts/186 bytes][Goodput ratio: 40/64][0.03 sec][Risk: ** Probing attempt **][Risk Score: 50][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][PLAIN TEXT (172.30.0.237)][Plen Bins: 0,50,0,50,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
7 TCP 172.16.17.101:53768 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/110 bytes -> 0 pkts/0 bytes][Goodput ratio: 40/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 0,100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
8 TCP 172.16.17.101:53052 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes -> 0 pkts/0 bytes][Goodput ratio: 27/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
9 TCP 172.16.17.101:58300 -> 172.30.0.237:9092 [proto: 377/Kafka][IP: 0/Unknown][ClearText][Confidence: DPI][DPI packets: 1][cat: RPC/16][1 pkts/91 bytes -> 0 pkts/0 bytes][Goodput ratio: 27/0][< 1 sec][Risk: ** Unidirectional Traffic **** Probing attempt **][Risk Score: 60][Risk Info: No server to client traffic / TCP connection with unidirectional traffic][Plen Bins: 100,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]

0 comments on commit abce6d4

Please sign in to comment.