Skip to content

Commit

Permalink
fix(topic): describe only filtered topics (#629)
Browse files Browse the repository at this point in the history
Co-authored-by: alozano3 <[email protected]>
  • Loading branch information
alozano3 and alozano3 authored Mar 9, 2021
1 parent e5f11c6 commit d46f2e1
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/main/java/org/akhq/repositories/TopicRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import javax.inject.Singleton;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

@Singleton
public class TopicRepository extends AbstractRepository {
Expand Down Expand Up @@ -100,14 +101,15 @@ public Topic findByName(String clusterId, String name) throws ExecutionException

public List<Topic> findByName(String clusterId, List<String> topics) throws ExecutionException, InterruptedException {
ArrayList<Topic> list = new ArrayList<>();

Set<Map.Entry<String, TopicDescription>> topicDescriptions = kafkaWrapper.describeTopics(clusterId, topics).entrySet();
Map<String, List<Partition.Offsets>> topicOffsets = kafkaWrapper.describeTopicsOffsets(clusterId, topics);

Optional<List<String>> topicRegex = getTopicFilterRegex();

List<String> filteredTopics = topics.stream()
.filter(t -> isMatchRegex(topicRegex, t))
.collect(Collectors.toList());
Set<Map.Entry<String, TopicDescription>> topicDescriptions = kafkaWrapper.describeTopics(clusterId, filteredTopics).entrySet();
Map<String, List<Partition.Offsets>> topicOffsets = kafkaWrapper.describeTopicsOffsets(clusterId, filteredTopics);

for (Map.Entry<String, TopicDescription> description : topicDescriptions) {
if(isMatchRegex(topicRegex, description.getValue().name())){
list.add(
new Topic(
description.getValue(),
Expand All @@ -117,7 +119,6 @@ public List<Topic> findByName(String clusterId, List<String> topics) throws Exec
isStream(description.getValue().name())
)
);
}
}

list.sort(Comparator.comparing(Topic::getName));
Expand Down

0 comments on commit d46f2e1

Please sign in to comment.