Skip to content

Commit

Permalink
feat(groups): performance, fetch consumer group of topic page in a si…
Browse files Browse the repository at this point in the history
…ngle request (#486)
  • Loading branch information
polarising-java authored Oct 30, 2020
1 parent 4a7053b commit 12cd1c9
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 8 deletions.
20 changes: 12 additions & 8 deletions client/src/containers/Topic/TopicList/TopicList.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Header from '../../Header';
import SearchBar from '../../../components/SearchBar';
import Pagination from '../../../components/Pagination';
import ConfirmModal from '../../../components/Modal/ConfirmModal';
import { uriDeleteTopics, uriTopics, uriTopicsGroups } from '../../../utils/endpoints';
import {uriConsumerGroupByTopics, uriDeleteTopics, uriTopics} from '../../../utils/endpoints';
import constants from '../../../utils/constants';
import { calculateTopicOffsetLag, showBytes } from '../../../utils/converters';
import './styles.scss';
Expand Down Expand Up @@ -181,17 +181,21 @@ class TopicList extends Root {
groupComponent: undefined,
internal: topic.internal
}

collapseConsumerGroups[topic.name] = false;

this.getApi(uriTopicsGroups(selectedCluster, topic.name))
.then(value => {
tableTopics[topic.name].groupComponent = value.data
setState()
})
});
this.setState({collapseConsumerGroups});
setState()

const topicsName = topics.map(topic => topic.name).join(",");
this.getApi(uriConsumerGroupByTopics(selectedCluster, encodeURIComponent(topicsName)))
.then(value => {
topics.forEach(topic => {
tableTopics[topic.name].groupComponent = (value && value.data)? value.data.filter(consumerGroup =>
(consumerGroup.activeTopics && consumerGroup.activeTopics.includes(topic.name))
|| (consumerGroup.topics && consumerGroup.topics.includes(topic.name))): [];
});
setState();
});
}


Expand Down
4 changes: 4 additions & 0 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ export const uriConsumerGroupAcls = (clusterId, groupId) => {
return `${apiUrl}/${clusterId}/group/${groupId}/acls`;
};

export const uriConsumerGroupByTopics = (clusterId, topicList) => {
return `${apiUrl}/${clusterId}/group/topics?topics=${topicList}`;
};

export const uriAclsByPrincipal = (clusterId, principalEncoded, resourceType = 'ANY') => {
return `${apiUrl}/${clusterId}/acls/${principalEncoded}?resourceType=${resourceType}`;
};
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/akhq/controllers/GroupController.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.time.Instant;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -90,6 +91,21 @@ public List<AccessControl> acls(String cluster, String groupName) throws Executi
return aclRepository.findByResourceType(cluster, ResourceType.GROUP, groupName);
}

@Get("topics")
@Operation(tags = {"consumer group"}, summary = "Retrieve consumer group for list of topics")
public List filterByTopics(String cluster, Optional<List<String>> topics) {

return topics.map(
topicsName -> {
try {
return this.consumerGroupRepository.findByTopics(cluster, topicsName);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
).orElse(Collections.EMPTY_LIST);
}

@Secured(Role.ROLE_GROUP_OFFSETS_UPDATE)
@Post(value = "{groupName}/offsets", consumes = MediaType.APPLICATION_JSON)
@Operation(tags = {"consumer group"}, summary = "Update consumer group offsets")
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/org/akhq/repositories/ConsumerGroupRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,23 @@ public List<ConsumerGroup> findByTopic(String clusterId, String topic) throws Ex
.collect(Collectors.toList());
}

public List<ConsumerGroup> findByTopics(String clusterId, List<String> topics) throws ExecutionException, InterruptedException {

List<String> groupName = this.all(clusterId, Optional.empty());
List<ConsumerGroup> list = this.findByName(clusterId, groupName);
return list
.stream()
.filter(consumerGroups ->
consumerGroups.getActiveTopics()
.stream()
.anyMatch(s -> topics.contains(s) ) ||
consumerGroups.getTopics()
.stream()
.anyMatch(s -> topics.contains(s))
)
.collect(Collectors.toList());
}

public void updateOffsets(String clusterId, String name, Map<org.akhq.models.TopicPartition, Long> offset) {
KafkaConsumer<byte[], byte[]> consumer = kafkaModule.getConsumer(clusterId, new Properties() {{
put(ConsumerConfig.GROUP_ID_CONFIG, name);
Expand Down

0 comments on commit 12cd1c9

Please sign in to comment.