Skip to content

Commit

Permalink
feat(group): regexp filter on consumer groups (#628)
Browse files Browse the repository at this point in the history
Co-authored-by: alozano3 <[email protected]>
  • Loading branch information
alozano3 and alozano3 authored Mar 9, 2021
1 parent de23bf9 commit 371186c
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 11 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@
- User groups configuration
- Filter topics with regexp for current groups
- Ldap configuration to match AKHQ groups/roles

- Filter consumer groups with regexp for current groups

## New React UI

Since this is a major rework, the new UI can have some issues, so please [report any issue](https://github.com/tchiotludo/akhq/issues), thanks!
Expand Down Expand Up @@ -409,6 +410,7 @@ Define groups with specific roles for your users
* `roles`: Roles list for the group
* `attributes.topics-filter-regexp`: Regexp to filter topics available for current group
* `attributes.connects-filter-regexp`: Regexp to filter Connect tasks available for current group
* `attributes.consumer-groups-filter-regexp`: Regexp to filter Consumer Groups available for current group


3 defaults group are available :
Expand Down Expand Up @@ -512,6 +514,7 @@ akhq:
# Regexp to filter topic available for group
topics-filter-regexp: "test\\.reader.*"
connects-filter-regexp: "^test.*$"
consumer-groups-filter-regexp: "consumer.*"
topic-writer:
name: topic-writer # Group name
roles:
Expand All @@ -522,6 +525,7 @@ akhq:
attributes:
topics-filter-regexp: "test.*"
connects-filter-regexp: "^test.*$"
consumer-groups-filter-regexp: "consumer.*"
ldap:
groups:
- name: mathematicians
Expand Down
2 changes: 2 additions & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ akhq:
topics-filter-regexp: "test.*"
# Regexp to filter connect configs visible for group
connects-filter-regexp: "^test.*$"
# Regexp to filter consumer groups visible for group
consumer-groups-filter-regexp: "consumer.*"
topic-reader: # unique key
name: topic-reader # Other group
roles:
Expand Down
71 changes: 61 additions & 10 deletions src/main/java/org/akhq/repositories/ConsumerGroupRepository.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package org.akhq.repositories;

import io.micronaut.context.ApplicationContext;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.utils.SecurityService;
import org.akhq.configs.SecurityProperties;
import org.akhq.models.ConsumerGroup;
import org.akhq.models.Partition;
import org.akhq.modules.AbstractKafkaWrapper;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.PagedList;
import org.akhq.utils.Pagination;
import org.akhq.utils.UserGroupUtils;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.akhq.models.ConsumerGroup;
import org.akhq.models.Partition;
import org.akhq.modules.KafkaModule;
import org.akhq.modules.AbstractKafkaWrapper;
import org.akhq.utils.PagedList;
import org.akhq.utils.Pagination;

import javax.inject.Inject;
import javax.inject.Singleton;
Expand All @@ -28,6 +33,15 @@ public class ConsumerGroupRepository extends AbstractRepository {
@Inject
private KafkaModule kafkaModule;

@Inject
private ApplicationContext applicationContext;

@Inject
private UserGroupUtils userGroupUtils;

@Inject
private SecurityProperties securityProperties;

public PagedList<ConsumerGroup> list(String clusterId, Pagination pagination, Optional<String> search) throws ExecutionException, InterruptedException {
return PagedList.of(all(clusterId, search), pagination, groupsList -> this.findByName(clusterId, groupsList));
}
Expand All @@ -36,7 +50,8 @@ public List<String> all(String clusterId, Optional<String> search) throws Execut
ArrayList<String> list = new ArrayList<>();

for (ConsumerGroupListing item : kafkaWrapper.listConsumerGroups(clusterId)) {
if (isSearchMatch(search, item.groupId())) {
if (isSearchMatch(search, item.groupId()) && isMatchRegex(
getConsumerGroupFilterRegex(), item.groupId())) {
list.add(item.groupId());
}
}
Expand All @@ -47,13 +62,19 @@ public List<String> all(String clusterId, Optional<String> search) throws Execut
}

public ConsumerGroup findByName(String clusterId, String name) throws ExecutionException, InterruptedException {
Optional<ConsumerGroup> consumerGroup = this.findByName(clusterId, Collections.singletonList(name)).stream().findFirst();

Optional<ConsumerGroup> consumerGroup = Optional.empty();
if(isMatchRegex(getConsumerGroupFilterRegex(),name)) {
consumerGroup = this.findByName(clusterId, Collections.singletonList(name)).stream().findFirst();
}
return consumerGroup.orElseThrow(() -> new NoSuchElementException("Consumer Group '" + name + "' doesn't exist"));
}

public List<ConsumerGroup> findByName(String clusterId, List<String> groups) throws ExecutionException, InterruptedException {
Map<String, ConsumerGroupDescription> consumerDescriptions = kafkaWrapper.describeConsumerGroups(clusterId, groups);
Optional<List<String>> consumerGroupRegex = getConsumerGroupFilterRegex();
List<String> filteredConsumerGroups = groups.stream()
.filter(t -> isMatchRegex(consumerGroupRegex, t))
.collect(Collectors.toList());
Map<String, ConsumerGroupDescription> consumerDescriptions = kafkaWrapper.describeConsumerGroups(clusterId, filteredConsumerGroups);

Map<String, Map<TopicPartition, OffsetAndMetadata>> groupGroupsOffsets = consumerDescriptions.keySet().stream()
.map(group -> {
Expand Down Expand Up @@ -139,4 +160,34 @@ public void updateOffsets(String clusterId, String name, Map<org.akhq.models.Top

kafkaWrapper.clearConsumerGroupsOffsets();
}

private Optional<List<String>> getConsumerGroupFilterRegex() {

List<String> consumerGroupFilterRegex = new ArrayList<>();

if (applicationContext.containsBean(SecurityService.class)) {
SecurityService securityService = applicationContext.getBean(SecurityService.class);
Optional<Authentication> authentication = securityService.getAuthentication();
if (authentication.isPresent()) {
Authentication auth = authentication.get();
consumerGroupFilterRegex.addAll(getConsumerGroupFilterRegexFromAttributes(auth.getAttributes()));
}
}
// get consumer group filter regex for default groups
consumerGroupFilterRegex.addAll(getConsumerGroupFilterRegexFromAttributes(
userGroupUtils.getUserAttributes(Collections.singletonList(securityProperties.getDefaultGroup()))
));

return Optional.of(consumerGroupFilterRegex);
}

@SuppressWarnings("unchecked")
private List<String> getConsumerGroupFilterRegexFromAttributes(Map<String, Object> attributes) {
if (attributes.get("consumerGroupsFilterRegexp") != null) {
if (attributes.get("consumerGroupsFilterRegexp") instanceof List) {
return (List<String>)attributes.get("consumerGroupsFilterRegexp");
}
}
return new ArrayList<>();
}
}
1 change: 1 addition & 0 deletions src/test/java/org/akhq/KafkaTestCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class KafkaTestCluster implements Runnable, Stoppable {
public static final int TOPIC_HIDE_INTERNAL_COUNT = 11;
public static final int TOPIC_HIDE_INTERNAL_STREAM_COUNT = 9;
public static final int TOPIC_HIDE_STREAM_COUNT = 17;
public static final int CONSUMER_GROUP_COUNT = 6;

public static final String CONSUMER_STREAM_TEST = "stream-test-example";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package org.akhq.repositories;

import io.micronaut.context.ApplicationContext;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.authentication.DefaultAuthentication;
import io.micronaut.security.utils.DefaultSecurityService;
import io.micronaut.security.utils.SecurityService;
import lombok.extern.slf4j.Slf4j;
import org.akhq.AbstractTest;
import org.akhq.KafkaTestCluster;
import org.akhq.utils.Pagination;
import org.codehaus.httpcache4j.uri.URIBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import javax.inject.Inject;
import java.util.*;
import java.util.concurrent.ExecutionException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;

@Slf4j
public class ConsumerGroupRepositoryTest extends AbstractTest {

@Inject
@InjectMocks
protected ConsumerGroupRepository consumerGroupRepository;

@Mock
ApplicationContext applicationContext;

@BeforeEach
public void before(){
MockitoAnnotations.initMocks(this);
}

@Test
public void list() throws ExecutionException, InterruptedException {
assertEquals(KafkaTestCluster.CONSUMER_GROUP_COUNT, consumerGroupRepository.list(
KafkaTestCluster.CLUSTER_ID,
new Pagination(100, URIBuilder.empty(), 1),
Optional.empty()
).size());
}

@Test
public void listWithConsumerGroupRegex() throws ExecutionException, InterruptedException {
mockApplicationContext();
assertEquals(5, consumerGroupRepository.list(
KafkaTestCluster.CLUSTER_ID,
new Pagination(100, URIBuilder.empty(), 1),
Optional.empty()
).size());
}

@Test
public void search() throws ExecutionException, InterruptedException {
assertEquals(1, consumerGroupRepository.list(
KafkaTestCluster.CLUSTER_ID,
new Pagination(100, URIBuilder.empty(), 1),
Optional.of("consu 2")
).size());
}

@Test
public void searchWithTopicRegex() throws ExecutionException, InterruptedException {
mockApplicationContext();
assertEquals(0, consumerGroupRepository.list(
KafkaTestCluster.CLUSTER_ID,
new Pagination(100, URIBuilder.empty(), 1),
Optional.of("stream")
).size());
}

@Test
public void findByNameWithTopicRegex() throws ExecutionException, InterruptedException {
mockApplicationContext();
Assertions.assertThrows(NoSuchElementException.class, () -> {
consumerGroupRepository.findByName(KafkaTestCluster.CLUSTER_ID,"cgroup-1");
});

assertEquals(1, consumerGroupRepository.findByName(KafkaTestCluster.CLUSTER_ID, List.of("consumer-6", "cgroup-1")).size());
}

private void mockApplicationContext() {
Authentication auth = new DefaultAuthentication("test", Collections.singletonMap("consumerGroupsFilterRegexp", new ArrayList<>(Arrays.asList("consumer-.*"))));
DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class);
when(securityService.getAuthentication()).thenReturn(Optional.of(auth));
when(applicationContext.containsBean(SecurityService.class)).thenReturn(true);
when(applicationContext.getBean(SecurityService.class)).thenReturn(securityService);
}
}

0 comments on commit 371186c

Please sign in to comment.