Skip to content

Commit

Permalink
refactor: wrap with grpc factory, add health check (#148)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-steinfeld authored Jul 5, 2022
1 parent 19cf407 commit 60ab34b
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 124 deletions.
17 changes: 10 additions & 7 deletions helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ spec:
value: "file:///app/resources/configs"
- name: LOG4J_CONFIGURATION_FILE
value: "/var/{{ .Chart.Name }}/log/log4j2.properties"
- name: JAVA_TOOL_OPTIONS
- name: JAVA_OPTS
value: {{ .Values.javaOpts | quote }}
volumeMounts:
- name: service-config
Expand All @@ -85,13 +85,16 @@ spec:
- name: log4j-config
mountPath: /var/{{ .Chart.Name }}/log
livenessProbe:
initialDelaySeconds: {{ int .Values.livenessProbe.initialDelaySeconds }}
periodSeconds: {{ int .Values.livenessProbe.periodSeconds }}
tcpSocket:
port: grpc-port
readinessProbe:
initialDelaySeconds: {{ int .Values.readinessProbe.initialDelaySeconds }}
periodSeconds: {{ int .Values.readinessProbe.periodSeconds }}
failureThreshold: {{ int .Values.livenessProbe.failureThreshold }}
timeoutSeconds: {{ int .Values.livenessProbe.timeoutSeconds }}
httpGet:
path: /health
port: {{ .Values.containerHealthProbePort }}
startupProbe:
periodSeconds: {{ int .Values.startupProbe.periodSeconds }}
failureThreshold: {{ int .Values.startupProbe.failureThreshold }}
timeoutSeconds: {{ int .Values.startupProbe.timeoutSeconds }}
httpGet:
path: /health
port: {{ .Values.containerHealthProbePort }}
Expand Down
9 changes: 5 additions & 4 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ topologySpreadConstraints: []
javaOpts: "-XX:InitialRAMPercentage=50.0 -XX:MaxRAMPercentage=75.0"

livenessProbe:
initialDelaySeconds: 10
periodSeconds: 5

readinessProbe:
initialDelaySeconds: 2
failureThreshold: 3
timeoutSeconds: 3
startupProbe:
periodSeconds: 5
failureThreshold: 24
timeoutSeconds: 3

resources:
limits:
Expand Down
10 changes: 10 additions & 0 deletions query-service-factory/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
plugins {
`java-library`
}

dependencies {
api("org.hypertrace.core.serviceframework:platform-grpc-service-framework:0.1.35")

implementation(project(":query-service-impl"))
implementation("com.google.inject:guice:5.0.1")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.hypertrace.core.query.service;

import com.google.inject.Guice;
import java.util.List;
import org.hypertrace.core.query.service.api.QueryServiceGrpc.QueryServiceImplBase;
import org.hypertrace.core.serviceframework.grpc.GrpcPlatformService;
import org.hypertrace.core.serviceframework.grpc.GrpcPlatformServiceFactory;
import org.hypertrace.core.serviceframework.grpc.GrpcServiceContainerEnvironment;

public class QueryServiceFactory implements GrpcPlatformServiceFactory {
private static final String SERVICE_NAME = "query-service";
private static final String QUERY_SERVICE_CONFIG = "service.config";

@Override
public List<GrpcPlatformService> buildServices(GrpcServiceContainerEnvironment environment) {
return List.of(
new GrpcPlatformService(
Guice.createInjector(
new QueryServiceModule(
environment.getConfig(SERVICE_NAME).getConfig(QUERY_SERVICE_CONFIG),
environment.getChannelRegistry()))
.getInstance(QueryServiceImplBase.class)));
}
}
3 changes: 1 addition & 2 deletions query-service-impl/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ dependencies {
implementation("org.hypertrace.core.attribute.service:attribute-service-api:0.12.3")
implementation("org.hypertrace.core.attribute.service:attribute-projection-registry:0.12.3")
implementation("org.hypertrace.core.attribute.service:caching-attribute-service-client:0.12.3")
implementation("org.hypertrace.core.serviceframework:service-framework-spi:0.1.33")
implementation("com.google.inject:guice:5.0.1")
implementation("org.apache.pinot:pinot-java-client:0.6.0") {
// We want to use log4j2 impl so exclude the log4j binding of slf4j
exclude("org.slf4j", "slf4j-log4j12")
}
implementation("org.slf4j:slf4j-api:1.7.32")
implementation("commons-codec:commons-codec:1.15")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.35")
implementation("com.google.protobuf:protobuf-java-util:3.20.1")
implementation("com.google.guava:guava:31.1-jre")
implementation("io.reactivex.rxjava3:rxjava:3.0.11")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,31 @@

import static org.hypertrace.core.grpcutils.client.RequestContextClientCallCredsProviderFactory.getClientCallCredsProvider;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Channel;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import javax.inject.Inject;
import javax.inject.Provider;
import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient;
import org.hypertrace.core.serviceframework.spi.PlatformServiceLifecycle;
import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry;

final class AttributeClientProvider implements Provider<CachingAttributeClient> {

private final QueryServiceConfig config;
private final PlatformServiceLifecycle serviceLifecycle;
private final GrpcChannelRegistry grpcChannelRegistry;

@Inject
AttributeClientProvider(QueryServiceConfig config, PlatformServiceLifecycle serviceLifecycle) {
AttributeClientProvider(QueryServiceConfig config, GrpcChannelRegistry grpcChannelRegistry) {
this.config = config;
this.serviceLifecycle = serviceLifecycle;
this.grpcChannelRegistry = grpcChannelRegistry;
}

@Override
public CachingAttributeClient get() {
ManagedChannel channel =
ManagedChannelBuilder.forAddress(
config.getAttributeClientConfig().getHost(),
config.getAttributeClientConfig().getPort())
.usePlaintext()
.build();

this.serviceLifecycle.shutdownComplete().thenRun(channel::shutdown);
Channel channel =
grpcChannelRegistry.forPlaintextAddress(
config.getAttributeClientConfig().getHost(),
config.getAttributeClientConfig().getPort());

return CachingAttributeClient.builder(channel)
.withCallCredentials(getClientCallCredsProvider().get())
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@
import com.typesafe.config.Config;
import javax.inject.Singleton;
import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient;
import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry;
import org.hypertrace.core.query.service.api.QueryServiceGrpc.QueryServiceImplBase;
import org.hypertrace.core.query.service.attribubteexpression.AttributeExpressionModule;
import org.hypertrace.core.query.service.multivalue.MutliValueModule;
import org.hypertrace.core.query.service.pinot.PinotModule;
import org.hypertrace.core.query.service.projection.ProjectionModule;
import org.hypertrace.core.query.service.prometheus.PrometheusModule;
import org.hypertrace.core.query.service.validation.QueryValidationModule;
import org.hypertrace.core.serviceframework.spi.PlatformServiceLifecycle;

class QueryServiceModule extends AbstractModule {

private final QueryServiceConfig config;
private final PlatformServiceLifecycle lifecycle;
private final GrpcChannelRegistry grpcChannelRegistry;

QueryServiceModule(Config config, PlatformServiceLifecycle lifecycle) {
QueryServiceModule(Config config, GrpcChannelRegistry grpcChannelRegistry) {
this.config = new QueryServiceConfig(config);
this.lifecycle = lifecycle;
this.grpcChannelRegistry = grpcChannelRegistry;
}

@Override
protected void configure() {
bind(QueryServiceConfig.class).toInstance(this.config);
bind(PlatformServiceLifecycle.class).toInstance(this.lifecycle);
bind(GrpcChannelRegistry.class).toInstance(this.grpcChannelRegistry);
bind(QueryServiceImplBase.class).to(QueryServiceImpl.class);
Multibinder.newSetBinder(binder(), QueryTransformation.class);
bind(CachingAttributeClient.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.File;
import org.hypertrace.core.serviceframework.spi.PlatformServiceLifecycle;
import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry;
import org.junit.jupiter.api.Test;

class QueryServiceModuleTest {
Expand All @@ -25,8 +25,7 @@ public void testResolveBindings() {
.getConfig("service.config");
assertDoesNotThrow(
() ->
Guice.createInjector(
new QueryServiceModule(config, mock(PlatformServiceLifecycle.class)))
Guice.createInjector(new QueryServiceModule(config, mock(GrpcChannelRegistry.class)))
.getAllBindings());
}
}
6 changes: 3 additions & 3 deletions query-service/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ plugins {
}

dependencies {
implementation(project(":query-service-impl"))
implementation(project(":query-service-factory"))
implementation("org.hypertrace.core.grpcutils:grpc-server-utils:0.7.2")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
implementation("org.hypertrace.core.serviceframework:platform-grpc-service-framework:0.1.35")
implementation("org.slf4j:slf4j-api:1.7.32")
implementation("com.typesafe:config:1.4.1")

Expand All @@ -22,7 +22,7 @@ dependencies {
integrationTestImplementation("org.testcontainers:testcontainers:1.16.2")
integrationTestImplementation("org.testcontainers:junit-jupiter:1.16.2")
integrationTestImplementation("org.testcontainers:kafka:1.16.2")
integrationTestImplementation("org.hypertrace.core.serviceframework:integrationtest-service-framework:0.1.33")
integrationTestImplementation("org.hypertrace.core.serviceframework:integrationtest-service-framework:0.1.35")
integrationTestImplementation("com.github.stefanbirkner:system-lambda:1.2.0")

integrationTestImplementation("org.apache.kafka:kafka-clients:5.5.1-ccs")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,82 +1,16 @@
package org.hypertrace.core.query.service;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
import org.hypertrace.core.grpcutils.server.InterceptorUtil;
import org.hypertrace.core.serviceframework.PlatformService;
import org.hypertrace.core.serviceframework.config.ConfigClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryServiceStarter extends PlatformService {
private static final String SERVICE_NAME_CONFIG = "service.name";
private static final String SERVICE_PORT_CONFIG = "service.port";
private static final String QUERY_SERVICE_CONFIG = "service.config";
private static final Logger LOG = LoggerFactory.getLogger(QueryServiceStarter.class);
private String serviceName;
private int serverPort;
private Server queryServiceServer;
import org.hypertrace.core.serviceframework.grpc.GrpcPlatformServiceFactory;
import org.hypertrace.core.serviceframework.grpc.StandAloneGrpcPlatformServiceContainer;

public class QueryServiceStarter extends StandAloneGrpcPlatformServiceContainer {
public QueryServiceStarter(ConfigClient configClient) {
super(configClient);
}

@Override
protected void doInit() {
this.serviceName = getAppConfig().getString(SERVICE_NAME_CONFIG);
this.serverPort = getAppConfig().getInt(SERVICE_PORT_CONFIG);

LOG.info("Creating the Query Service Server on port {}", serverPort);

queryServiceServer =
ServerBuilder.forPort(serverPort)
.addService(
InterceptorUtil.wrapInterceptors(
QueryServiceFactory.build(
getAppConfig().getConfig(QUERY_SERVICE_CONFIG), this.getLifecycle())))
.build();
}

@Override
protected void doStart() {
LOG.info("Attempting to start Query Service on port {}", serverPort);

try {
queryServiceServer.start();
LOG.info("Started Query Service on port {}", serverPort);
} catch (IOException e) {
LOG.error("Unable to start the Query Service");
throw new RuntimeException(e);
}

try {
queryServiceServer.awaitTermination();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

@Override
protected void doStop() {
LOG.info("Shutting down service: {}", serviceName);
while (!queryServiceServer.isShutdown()) {
queryServiceServer.shutdown();
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
}
}
}

@Override
public boolean healthCheck() {
return true;
}

@Override
public String getServiceName() {
return serviceName;
protected GrpcPlatformServiceFactory getServiceFactory() {
return new QueryServiceFactory();
}
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ include(":query-service-api")
include(":query-service-client")
include(":query-service-impl")
include(":query-service")
include(":query-service-factory")

0 comments on commit 60ab34b

Please sign in to comment.