Skip to content

Commit

Permalink
refactor: add di and some cleanup (#19)
Browse files Browse the repository at this point in the history
* refactor: add di and some cleanup

* test: add client config registry test
  • Loading branch information
aaron-steinfeld authored Sep 20, 2020
1 parent 2fcf57f commit 2bafdd4
Show file tree
Hide file tree
Showing 22 changed files with 551 additions and 403 deletions.
5 changes: 4 additions & 1 deletion query-service-impl/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@ dependencies {
}
}
api(project(":query-service-api"))
api("com.typesafe:config:1.4.0")
implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.1.4")
implementation("com.google.inject:guice:4.2.3")
implementation("org.apache.pinot:pinot-java-client:0.5.0") {
// We want to use log4j2 impl so exclude the log4j binding of slf4j
exclude("org.slf4j", "slf4j-log4j12")
}
implementation("org.slf4j:slf4j-api:1.7.30")
implementation("com.typesafe:config:1.4.0")
implementation("commons-codec:commons-codec:1.13")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.8")
implementation("com.google.protobuf:protobuf-java-util:3.12.2")
implementation("com.google.guava:guava:29.0-jre")

testImplementation(project(":query-service-api"))
testImplementation("org.junit.jupiter:junit-jupiter:5.6.2")
testImplementation("org.mockito:mockito-core:3.3.3")
testImplementation("org.mockito:mockito-junit-jupiter:3.3.3")
testImplementation("org.apache.logging.log4j:log4j-slf4j-impl:2.13.3")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.hypertrace.core.query.service;

import com.google.inject.Guice;
import com.typesafe.config.Config;
import org.hypertrace.core.query.service.api.QueryServiceGrpc.QueryServiceImplBase;

public class QueryServiceFactory {

public static QueryServiceImplBase build(Config config) {
return Guice.createInjector(new QueryServiceModule(config))
.getInstance(QueryServiceImplBase.class);
}
}
Original file line number Diff line number Diff line change
@@ -1,62 +1,26 @@
package org.hypertrace.core.query.service;

import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.hypertrace.core.grpcutils.context.RequestContext;
import org.hypertrace.core.query.service.QueryServiceImplConfig.ClientConfig;
import org.hypertrace.core.query.service.QueryServiceImplConfig.RequestHandlerConfig;
import org.hypertrace.core.query.service.api.QueryRequest;
import org.hypertrace.core.query.service.api.QueryServiceGrpc;
import org.hypertrace.core.query.service.api.ResultSetChunk;
import org.hypertrace.core.query.service.pinot.PinotBasedRequestHandler;
import org.hypertrace.core.query.service.pinot.PinotClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryServiceImpl extends QueryServiceGrpc.QueryServiceImplBase {

private static final org.slf4j.Logger LOG =
org.slf4j.LoggerFactory.getLogger(QueryServiceImpl.class);
@Singleton
class QueryServiceImpl extends QueryServiceGrpc.QueryServiceImplBase {

private static final Logger LOG = LoggerFactory.getLogger(QueryServiceImpl.class);
private final RequestHandlerSelector selector;

public QueryServiceImpl(QueryServiceImplConfig config) {
Map<String, ClientConfig> clientConfigMap =
config.getClients().stream()
.map(ClientConfig::parse)
.collect(Collectors.toMap(ClientConfig::getType, clientConfig -> clientConfig));

for (Config requestHandlerConfig : config.getQueryRequestHandlersConfig()) {
initRequestHandler(
RequestHandlerConfig.parse(requestHandlerConfig), clientConfigMap);
}
selector = new RequestHandlerSelector(RequestHandlerRegistry.get());
}

private void initRequestHandler(RequestHandlerConfig config,
Map<String, ClientConfig> clientConfigMap) {

// Register Pinot RequestHandler
if ("pinot".equals(config.getType())) {
boolean registered = RequestHandlerRegistry.get().register(config.getName(),
new RequestHandlerInfo(
config.getName(), PinotBasedRequestHandler.class, config.getRequestHandlerInfo()));
if (!registered) {
throw new RuntimeException("Could not initialize the request handler: " + config.getName());
}
} else {
throw new UnsupportedOperationException(
"Unsupported RequestHandler type - " + config.getType());
}

// Register Pinot Client
ClientConfig clientConfig = clientConfigMap.get(config.getClientConfig());
Preconditions.checkNotNull(clientConfig);
PinotClientFactory.createPinotClient(
config.getName(), clientConfig.getType(), clientConfig.getConnectionString());
@Inject
public QueryServiceImpl(RequestHandlerSelector selector) {
this.selector = selector;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class QueryServiceImplConfig {
private List<Config> clients;
private List<Config> queryRequestHandlersConfig;

public static QueryServiceImplConfig parse(Config config) {
static QueryServiceImplConfig parse(Config config) {
return ConfigBeanFactory.create(config, QueryServiceImplConfig.class);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.hypertrace.core.query.service;

import com.google.inject.AbstractModule;
import com.typesafe.config.Config;
import org.hypertrace.core.query.service.api.QueryServiceGrpc.QueryServiceImplBase;
import org.hypertrace.core.query.service.pinot.PinotModule;

class QueryServiceModule extends AbstractModule {

private final QueryServiceImplConfig config;

QueryServiceModule(Config config) {
this.config = QueryServiceImplConfig.parse(config);
}

@Override
protected void configure() {
bind(QueryServiceImplConfig.class).toInstance(this.config);
bind(QueryServiceImplBase.class).to(QueryServiceImpl.class);
install(new PinotModule());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.hypertrace.core.query.service;

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.hypertrace.core.query.service.QueryServiceImplConfig.ClientConfig;

public class RequestClientConfigRegistry {
private final Map<String, ClientConfig> clientConfigMap;

@Inject
RequestClientConfigRegistry(QueryServiceImplConfig queryServiceImplConfig) {
this.clientConfigMap =
queryServiceImplConfig.getClients().stream()
.map(ClientConfig::parse)
.collect(Collectors.toUnmodifiableMap(ClientConfig::getType, Function.identity()));
}

public Optional<ClientConfig> get(String key) {
return Optional.ofNullable(this.clientConfigMap.get(key));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ public interface RequestHandler<T, R> {

String getName();

QueryCost canHandle(T request, Set<String> referencedSources, ExecutionContext analyzer);
QueryCost canHandle(T request, ExecutionContext context);

/**
* Handle the request and add rows to the collector.
*/
void handleRequest(ExecutionContext executionContext, QueryRequest request,
QueryResultCollector<R> collector);

void init(String name, Config config);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.hypertrace.core.query.service;

import org.hypertrace.core.query.service.QueryServiceImplConfig.RequestHandlerConfig;

public interface RequestHandlerBuilder {

boolean canBuild(RequestHandlerConfig config);

RequestHandler<?, ?> build(RequestHandlerConfig config);
}
Original file line number Diff line number Diff line change
@@ -1,34 +1,42 @@
package org.hypertrace.core.query.service;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.hypertrace.core.query.service.QueryServiceImplConfig.RequestHandlerConfig;

@Singleton
public class RequestHandlerRegistry {

private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(RequestHandlerRegistry.class);

Map<String, RequestHandlerInfo> requestHandlerInfoMap = new HashMap<>();

private static final RequestHandlerRegistry INSTANCE = new RequestHandlerRegistry();

private RequestHandlerRegistry() {}

public boolean register(String handlerName, RequestHandlerInfo requestHandlerInfo) {
if (requestHandlerInfoMap.containsKey(handlerName)) {
LOG.error("RequestHandlerInfo registration failed. Duplicate Handler:{} ", handlerName);
return false;
}
requestHandlerInfoMap.put(handlerName, requestHandlerInfo);
return true;
private final Set<RequestHandler<?, ?>> requestHandlers;

@Inject
RequestHandlerRegistry(
QueryServiceImplConfig config, Set<RequestHandlerBuilder> requestHandlerInfoSet) {
this.requestHandlers =
config.getQueryRequestHandlersConfig().stream()
.map(RequestHandlerConfig::parse)
.map(handlerConfig -> buildFromMatchingHandler(requestHandlerInfoSet, handlerConfig))
.collect(
Collectors.collectingAndThen(
Collectors.toCollection(LinkedHashSet::new), Collections::unmodifiableSet));
}

public Collection<RequestHandlerInfo> getAll() {
return requestHandlerInfoMap.values();
public Set<RequestHandler<?, ?>> getAll() {
return requestHandlers;
}

public static RequestHandlerRegistry get() {
return INSTANCE;
private RequestHandler<?, ?> buildFromMatchingHandler(
Set<RequestHandlerBuilder> handlerInfoBuilders, RequestHandlerConfig config) {
return handlerInfoBuilders.stream()
.filter(builder -> builder.canBuild(config))
.findFirst()
.map(builder -> builder.build(config))
.orElseThrow(
() ->
new UnsupportedOperationException(
"No builder registered matching provided config: " + config.toString()));
}
}
Original file line number Diff line number Diff line change
@@ -1,41 +1,19 @@
package org.hypertrace.core.query.service;

import com.google.common.collect.ImmutableList;
import javax.inject.Inject;
import org.hypertrace.core.query.service.api.QueryRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class RequestHandlerSelector {

private static final Logger LOG = LoggerFactory.getLogger(RequestHandlerSelector.class);

private final List<RequestHandler> requestHandlers;

public RequestHandlerSelector(List<RequestHandler> requestHandlers) {
this.requestHandlers = ImmutableList.copyOf(requestHandlers);
}
private final RequestHandlerRegistry registry;

@Inject
public RequestHandlerSelector(RequestHandlerRegistry registry) {
Collection<RequestHandlerInfo> requestHandlerInfoList = registry.getAll();
requestHandlers = new ArrayList<>();
for (RequestHandlerInfo requestHandlerInfo : requestHandlerInfoList) {
try {
Constructor<? extends RequestHandler> constructor =
requestHandlerInfo.getRequestHandlerClazz().getConstructor(new Class[] {});
RequestHandler requestHandler = constructor.newInstance();
requestHandler.init(requestHandlerInfo.getName(), requestHandlerInfo.getConfig());
requestHandlers.add(requestHandler);
} catch (Exception e) {
LOG.error("Error initializing request Handler:{}", requestHandlerInfo, e);
}
}
this.registry = registry;
}

public RequestHandler select(QueryRequest request, ExecutionContext executionContext) {
Expand All @@ -44,10 +22,8 @@ public RequestHandler select(QueryRequest request, ExecutionContext executionCon
// that query
double minCost = Double.MAX_VALUE;
RequestHandler selectedHandler = null;
Set<String> referencedColumns = executionContext.getReferencedColumns();
Set<String> referencedSources = new HashSet<>(request.getSourceList());
for (RequestHandler requestHandler : requestHandlers) {
QueryCost queryCost = requestHandler.canHandle(request, referencedSources, executionContext);
for (RequestHandler requestHandler : registry.getAll()) {
QueryCost queryCost = requestHandler.canHandle(request, executionContext);
double cost = queryCost.getCost();
if (LOG.isDebugEnabled()) {
LOG.debug("Request handler: {}, query cost: {}", requestHandler.getName(), cost);
Expand All @@ -64,14 +40,14 @@ public RequestHandler select(QueryRequest request, ExecutionContext executionCon
"Selected requestHandler: {} for the query: {}; referencedColumns: {}, cost: {}",
selectedHandler.getName(),
request,
referencedColumns,
executionContext.getReferencedColumns(),
minCost);
}
} else {
LOG.error(
"No requestHandler for the query: {}; referencedColumns: {}, cost: {}",
request,
referencedColumns,
executionContext.getReferencedColumns(),
minCost);
}
return selectedHandler;
Expand Down
Loading

0 comments on commit 2bafdd4

Please sign in to comment.