Skip to content

Commit

Permalink
feat(node): add page partition repartition (#1033)
Browse files Browse the repository at this point in the history
close #983
  • Loading branch information
jgrammen-agilitypr authored Mar 17, 2022
1 parent 6fe665b commit b78b35e
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 16 deletions.
40 changes: 34 additions & 6 deletions client/src/containers/Node/NodeList/NodesList.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Header from '../../Header';
import Table from '../../../components/Table';
import * as constants from '../../../utils/constants';
import { uriNodes } from '../../../utils/endpoints';
import { uriNodePartitions } from '../../../utils/endpoints';
import Root from '../../../components/Root';

class NodesList extends Root {
Expand All @@ -19,23 +20,43 @@ class NodesList extends Root {
async getNodes() {
let nodes = [];
const { clusterId } = this.props.match.params;

nodes = await this.getApi(uriNodes(clusterId));
this.handleData(nodes.data);
this.setState({ selectedCluster: clusterId });
}

handleData(nodes) {
let tableNodes = nodes.nodes.map(node => {
return {
const { clusterId } = this.props.match.params;
let tableNodes = {}
const setState = () => {
this.setState({ data: Object.values(tableNodes), loading: false});
}

nodes.nodes.forEach(node => {
tableNodes[node.id] = {
id: JSON.stringify(node.id) || '',
host: `${node.host}:${node.port}` || '',
rack: node.rack || '',
controller: nodes.controller.id === node.id ? 'True': 'False' || '',
controller: nodes.controller.id === node.id ? 'True' : 'False' || '',
partition: undefined
};
});
this.setState({ data: tableNodes, loading: false });
return tableNodes;

setState();

this.getApi(uriNodePartitions(clusterId))
.then(value => {
for (let node of value.data) {
const topicNode = tableNodes[node.id];
tableNodes[node.id].partition = topicNode ?
(node.countLeader) + ' (' + (((node.countLeader) / node.totalPartitions) * 100).toFixed(2) + '%)' :
'';
}

setState();
})

return Object.values(tableNodes);
}

render() {
Expand Down Expand Up @@ -72,6 +93,13 @@ class NodesList extends Root {
type: 'text',
sortable: true
},
{
id: 'partition',
accessor: 'partition',
colName: 'Partitions (% of total)',
type: 'text',
sortable: false
},
{
id: 'rack',
accessor: 'rack',
Expand Down
5 changes: 5 additions & 0 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ export const uriNodes = id => {
return `${apiUrl}/${id}/node`;
};

export const uriNodePartitions = clusterId => {
return `${apiUrl}/${clusterId}/node/partitions`;
};

export const uriNodesConfigs = (clusterId, nodeId) => {
return `${apiUrl}/${clusterId}/node/${nodeId}/configs`;
};
Expand Down Expand Up @@ -321,6 +325,7 @@ export default {
uriNodesConfigs,
uriTopicsLogs,
uriTopicsGroups,
uriNodePartitions,
uriTopicsPartitions,
uriTopicData,
uriTopicsProduce,
Expand Down
35 changes: 34 additions & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ volumes:
driver: local
kafka-data:
driver: local
kafka-data1:
driver: local
ui-modules:
driver: local
ui-build:
Expand Down Expand Up @@ -63,23 +65,54 @@ services:
environment:
KAFKA_BROKER_ID: '0'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://kafka:9092
KAFKA_NUM_PARTITIONS: '12'
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_COMPRESSION_TYPE: 'gzip'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092'
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
KAFKA_JMX_PORT: '9091'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
KAFKA_ALLOW_PLAINTEXT_LISTENER: 'true'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
links:
- zookeeper
ports:
- 9092:9092
- 9091:9091

# kafka-1:
# image: confluentinc/cp-kafka
# volumes:
# - kafka-data1:/var/lib/kafka/data:Z
# depends_on:
# - kafka
# environment:
# KAFKA_BROKER_ID: '1'
# KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# KAFKA_NUM_PARTITIONS: '12'
# KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29093,EXTERNAL://kafka-1:9093
# KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# KAFKA_COMPRESSION_TYPE: 'gzip'
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
# KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1'
# KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1'
# KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
# KAFKA_JMX_PORT: '9091'
# KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
# KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
# KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
# KAFKA_ALLOW_PLAINTEXT_LISTENER: 'true'
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
# links:
# - zookeeper
# ports:
# - 9093:9093

schema-registry:
image: confluentinc/cp-schema-registry
depends_on:
Expand Down
63 changes: 54 additions & 9 deletions src/main/java/org/akhq/controllers/NodeController.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
package org.akhq.controllers;

import io.micronaut.core.annotation.Introspected;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import io.micronaut.security.annotation.Secured;
import io.swagger.v3.oas.annotations.Operation;
import jakarta.inject.Inject;
import lombok.Builder;
import lombok.Getter;
import org.akhq.configs.Role;
import org.akhq.models.Cluster;
import org.akhq.models.Config;
import org.akhq.models.LogDir;
import org.akhq.models.Node;
import org.akhq.models.*;
import org.akhq.repositories.ClusterRepository;
import org.akhq.repositories.ConfigRepository;
import org.akhq.repositories.LogDirRepository;
import org.akhq.repositories.TopicRepository;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import jakarta.inject.Inject;
import java.util.stream.Collectors;

@Secured(Role.ROLE_NODE_READ)
@Controller
public class NodeController extends AbstractController {
private final ClusterRepository clusterRepository;
private final ConfigRepository configRepository;
private final LogDirRepository logDirRepository;
@Inject
private TopicRepository topicRepository;

@Inject
public NodeController(ClusterRepository clusterRepository, ConfigRepository configRepository, LogDirRepository logDirRepository) {
Expand All @@ -35,6 +36,50 @@ public NodeController(ClusterRepository clusterRepository, ConfigRepository conf
this.logDirRepository = logDirRepository;
}

@Introspected
@Builder(toBuilder = true)
@Getter
public static class NodePartition {
int id;
int countLeader;
int countInSyncReplicas;
long totalPartitions;
}

@Get("api/{cluster}/node/partitions")
@Operation(tags = {"topic"}, summary = "partition counts")
public List<NodePartition> nodePartitions( String cluster ) throws ExecutionException, InterruptedException {
List<String> topicNames = this.topicRepository.all(cluster, TopicRepository.TopicListView.HIDE_INTERNAL, Optional.empty());
List<Topic> topics = this.topicRepository.findByName(cluster, topicNames);
long totalPartitions = topics
.stream()
.mapToInt(t -> t.getPartitions().size())
.sum();
return topics
.stream()
.flatMap(topic -> topic.getPartitions().stream())
.flatMap(partition -> partition.getNodes()
.stream()
.map(n -> NodePartition.builder()
.id(n.getId())
.countLeader(n.isLeader() ? 1 : 0)
.countInSyncReplicas(n.isInSyncReplicas() ? 1 : 0)
.build()
)
)
.collect(Collectors.groupingBy(NodePartition::getId))
.entrySet()
.stream()
.map(n -> NodePartition.builder()
.id(n.getKey())
.countLeader(n.getValue().stream().mapToInt(NodePartition::getCountLeader).sum())
.countInSyncReplicas(n.getValue().stream().mapToInt(NodePartition::getCountInSyncReplicas).sum())
.totalPartitions(totalPartitions)
.build()
)
.collect(Collectors.toList());
}

@Get("api/{cluster}/node")
@Operation(tags = {"node"}, summary = "List all nodes")
public Cluster list(String cluster) throws ExecutionException, InterruptedException {
Expand Down

0 comments on commit b78b35e

Please sign in to comment.