Skip to content

Commit

Permalink
feat(auth): support for acls-filter-regexp (#1281)
Browse files Browse the repository at this point in the history
  • Loading branch information
meeraj257 authored Jan 2, 2023
1 parent 82b1daf commit 823cff6
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 1 deletion.
3 changes: 2 additions & 1 deletion docs/docs/configuration/authentifications/groups.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ Define groups with specific roles for your users
* `attributes.topics-filter-regexp`: Regexp list to filter topics available for current group
* `attributes.connects-filter-regexp`: Regexp list to filter Connect tasks available for current group
* `attributes.consumer-groups-filter-regexp`: Regexp list to filter Consumer Groups available for current group
* `attributes.acls-filter-regexp`: Regexp list to filter acls available for current group

::: warning
`topics-filter-regexp`, `connects-filter-regexp` and `consumer-groups-filter-regexp` are only used when listing resources.
`topics-filter-regexp`, `connects-filter-regexp`, `consumer-groups-filter-regexp` and `acls-filter-regexp` are only used when listing resources.
If you have `topics/create` or `connect/create` roles and you try to create a resource that doesn't follow the regexp, that resource **WILL** be created.
:::

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package org.akhq.repositories;

import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.utils.SecurityService;
import io.micronaut.context.ApplicationContext;
import org.akhq.models.AccessControl;
import org.akhq.modules.AbstractKafkaWrapper;
import org.akhq.utils.DefaultGroupUtils;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand All @@ -20,11 +26,18 @@ public class AccessControlListRepository extends AbstractRepository {
@Inject
private AbstractKafkaWrapper kafkaWrapper;

@Inject
private ApplicationContext applicationContext;

@Inject
private DefaultGroupUtils defaultGroupUtils;

public List<AccessControl> findAll(String clusterId, Optional<String> search) throws ExecutionException, InterruptedException {
return toGroupedAcl(kafkaWrapper
.describeAcls(clusterId, AclBindingFilter.ANY)
.stream()
.filter(aclBinding -> isSearchMatch(search, aclBinding.entry().principal()))
.filter(aclBinding -> isMatchRegex(getAclFilterRegex(),aclBinding.entry().principal()))
.collect(Collectors.toList())
);
}
Expand Down Expand Up @@ -72,4 +85,32 @@ private static List<AccessControl> toGroupedAcl(Collection<AclBinding> aclBindin
))
.collect(Collectors.toList());
}

private Optional<List<String>> getAclFilterRegex() {

List<String> aclFilterRegex = new ArrayList<>();

if (applicationContext.containsBean(SecurityService.class)) {
SecurityService securityService = applicationContext.getBean(SecurityService.class);
Optional<Authentication> authentication = securityService.getAuthentication();
if (authentication.isPresent()) {
Authentication auth = authentication.get();
aclFilterRegex.addAll(getAclFilterRegexFromAttributes(auth.getAttributes()));
}
}
// get topic filter regex for default groups
aclFilterRegex.addAll(getAclFilterRegexFromAttributes(
defaultGroupUtils.getDefaultAttributes()
));

return Optional.of(aclFilterRegex);
}

@SuppressWarnings("unchecked")
private List<String> getAclFilterRegexFromAttributes(Map<String, Object> attributes) {
if ((attributes.get("aclsFilterRegexp") != null) && (attributes.get("aclsFilterRegexp") instanceof List)) {
return (List<String>)attributes.get("aclsFilterRegexp");
}
return new ArrayList<>();
}
}
8 changes: 8 additions & 0 deletions src/test/java/org/akhq/KafkaTestCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ private void injectTestData() throws InterruptedException, ExecutionException {
new ResourcePattern(ResourceType.TOPIC, "anotherAclTestTopic", PatternType.LITERAL),
new AccessControlEntry("user:toto", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
);
bindings.add(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "anotherAclTestTopic", PatternType.LITERAL),
new AccessControlEntry("test:toto", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
);
bindings.add(new AclBinding(
new ResourcePattern(ResourceType.GROUP, "groupConsumer", PatternType.LITERAL),
new AccessControlEntry("user:toto", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
Expand All @@ -337,6 +341,10 @@ private void injectTestData() throws InterruptedException, ExecutionException {
new ResourcePattern(ResourceType.GROUP, "groupConsumer2", PatternType.LITERAL),
new AccessControlEntry("user:toto", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
);
bindings.add(new AclBinding(
new ResourcePattern(ResourceType.GROUP, "groupConsumer2", PatternType.LITERAL),
new AccessControlEntry("test:toto", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
);
testUtils.getAdminClient().createAcls(bindings).all().get();
log.debug("bindings acls added");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ void findAllByUser() throws ExecutionException, InterruptedException {
);
}

@Test
void findAllBySecondUser() throws ExecutionException, InterruptedException {
var searchResult = aclRepository.findByPrincipal(KafkaTestCluster.CLUSTER_ID, AccessControl.encodePrincipal("test:toto"), Optional.empty());
assertEquals("test:toto", searchResult.getPrincipal());
assertEquals(2, searchResult.getAcls().size());
assertEquals(2, searchResult
.getAcls()
.stream()
.filter(acl -> acl.getOperation().getPermissionType() == AclPermissionType.ALLOW)
.count()
);
}

@Test
void findHostByUser() throws ExecutionException, InterruptedException {
var searchResult = aclRepository.findByPrincipal(KafkaTestCluster.CLUSTER_ID, AccessControl.encodePrincipal("user:tata"), Optional.empty());
Expand Down
8 changes: 8 additions & 0 deletions src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ akhq:
- connect/update
- connect/delete
- connect/state/update
attributes:
acls-filter-regexp:
- "user.*"
limited:
name: limited
roles:
Expand All @@ -114,6 +117,8 @@ akhq:
attributes:
topics-filter-regexp:
- "test.*"
acls-filter-regexp:
- "user.*"
operator:
name: operator
roles:
Expand All @@ -131,6 +136,9 @@ akhq:
- topic/insert
- topic/delete
- registry/version/delete
attributes:
acls-filter-regexp:
- "user.*"
basic-auth:
- username: user
password: d74ff0ee8da3b9806b18c877dbf29bbde50b5bd8e4dad7a3a725000feb82e8f1
Expand Down

0 comments on commit 823cff6

Please sign in to comment.