Skip to content

Commit

Permalink
Support Samza on Kubernetes. Contributed by Weiqing Yang &Jian He
Browse files Browse the repository at this point in the history
  • Loading branch information
jian.h authored and jian-he committed Oct 20, 2019
1 parent e7361bb commit 381a683
Show file tree
Hide file tree
Showing 17 changed files with 1,147 additions and 29 deletions.
21 changes: 21 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,27 @@ project(":samza-yarn_$scalaSuffix") {
jar.dependsOn("lesscss")
}

project(":samza-kubernetes_$scalaSuffix") {
apply plugin: 'java'

dependencies {
compile project(':samza-api')
compile project(":samza-core_$scalaSuffix")
compile "org.codehaus.jackson:jackson-core-asl:1.9.7"
compile group: 'io.fabric8', name: 'kubernetes-client', version: kubernetesJavaClientVersion
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
}

tasks.create(name: "releaseKubeTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
into "samza-kubernetes-${version}"
compression = Compression.GZIP
from(configurations.runtime) { into("lib/") }
from(configurations.archives.artifacts.files) { into("lib/") }
duplicatesStrategy 'exclude'
}
}

project(":samza-shell") {
apply plugin: 'java'

Expand Down
1 change: 1 addition & 0 deletions gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@
jnaVersion = "4.5.1"
couchbaseClientVersion = "2.7.2"
couchbaseMockVersion = "1.5.22"
kubernetesJavaClientVersion = "4.1.3"
}
84 changes: 84 additions & 0 deletions gradlew.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################

@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal

set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=

@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome

set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init

echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.

goto fail

:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe

if exist "%JAVA_EXE%" goto init

echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.

goto fail

:init
@rem Get command-line arguments, handling Windows variants

if not "%OS%" == "Windows_NT" goto win9xME_args

:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2

:win9xME_args_slurp
if "x%~1" == "x" goto execute

set CMD_LINE_ARGS=%*

:execute
@rem Setup the command line

set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar

@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%

:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd

:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1

:mainEnd
if "%OS%"=="Windows_NT" endlocal

:omega
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
* The Allocator matches requests to resources and executes processes.
*/
private final ContainerAllocator containerAllocator;
private final Thread allocatorThread;
private Thread allocatorThread = null;

// The StandbyContainerManager manages standby-aware allocation and failover of containers
private final Optional<StandbyContainerManager> standbyContainerManager;
Expand Down Expand Up @@ -166,8 +166,10 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri
}

this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.standbyContainerManager);
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
LOG.info("Finished container process manager initialization.");
if (shouldStartAllocateThread()) {
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
}
LOG.info("finished initialization of samza task manager");
}

@VisibleForTesting
Expand All @@ -184,14 +186,23 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri

this.clusterResourceManager = resourceManager;
this.standbyContainerManager = Optional.empty();

this.diagnosticsManager = Option.empty();
this.containerAllocator = allocator.orElseGet(
() -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state,
hostAffinityEnabled, this.standbyContainerManager));
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
if (shouldStartAllocateThread()) {
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
}
LOG.info("Finished container process manager initialization");
}

// In Kubernetes, the pod will be started by kubelet automatically once it is allocated, it does not need a
// separate thread to keep polling the allocated resources to start the container.
public boolean shouldStartAllocateThread() {
return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager");
}

public boolean shouldShutdown() {
LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}, Is allocator thread alive: {}",
state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no");
Expand Down Expand Up @@ -237,7 +248,9 @@ public void start() {

// Start container allocator thread
LOG.info("Starting the container allocator thread");
allocatorThread.start();
if (allocatorThread != null) {
allocatorThread.start();
}
LOG.info("Starting the container process manager");
}

Expand All @@ -246,12 +259,15 @@ public void stop() {

// Shutdown allocator thread
containerAllocator.stop();
try {
if (allocatorThread != null) {

try {
allocatorThread.join();
LOG.info("Stopped container allocator");
} catch (InterruptedException ie) {
LOG.error("Allocator thread join threw an interrupted exception", ie);
Thread.currentThread().interrupt();
LOG.error("Allocator thread join threw an interrupted exception", ie);
Thread.currentThread().interrupt();
}
}

if (diagnosticsManager.isDefined()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,22 @@ import java.util
import java.util.concurrent.atomic.AtomicReference

import org.apache.samza.{Partition, SamzaException}
import org.apache.samza.config._
import org.apache.samza.config.Config
import org.apache.samza.container.grouper.stream.SSPGrouperProxy
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
import org.apache.samza.config.{Config, _}
import org.apache.samza.container.grouper.stream.{SSPGrouperProxy, SystemStreamPartitionGrouperFactory}
import org.apache.samza.container.grouper.task._
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping
import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
import org.apache.samza.container.LocalityManager
import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
import org.apache.samza.coordinator.server.HttpServer
import org.apache.samza.coordinator.server.JobServlet
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.TaskMode
import org.apache.samza.job.model.TaskModel
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.container.{LocalityManager, TaskName}
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
import org.apache.samza.coordinator.stream.messages.{SetContainerHostMapping, SetTaskContainerMapping, SetTaskModeMapping, SetTaskPartitionMapping}
import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, TaskModel}
import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
import org.apache.samza.runtime.LocationId
import org.apache.samza.system._
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{Logging, ReflectionUtil, Util}

import scala.collection.JavaConverters
import scala.collection.JavaConversions._
import scala.collection.JavaConverters
import scala.collection.JavaConverters._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,14 @@ class HttpServer(
throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.")
}
}

def getIpUrl = {
if (running) {
val runningPort = server.getConnectors()(0).asInstanceOf[NetworkConnector].getLocalPort()

new URL("http://" + Util.getLocalHost.getHostAddress + ":" + runningPort + rootPath)
} else {
throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ object HttpUtil {
(exception, loop) => {
exception match {
case ioe: IOException => {
error(ioe)
warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
httpConn = getHttpConnection(url, timeout)
}
Expand Down
34 changes: 34 additions & 0 deletions samza-kubernetes/src/docker/dockerfiles/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# samzaJarsFolder includes all the Samza jars (you needs to make sure all the samza jars are there.)
# You can build Samza image by:
# docker build -t dockerHubAccount/samza:versionNumber .
# Then Samza user can use the Samza image as base image to build their application image.
#

FROM ubuntu:latest

RUN apt-get update -y && apt-get upgrade -y && apt-get install -y openjdk-8-jdk

ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64
ENV PATH $PATH:$JAVA_HOME/bin

RUN mkdir -p /opt/samza
WORKDIR /opt/samza/
COPY samzaJarsFolder/ /opt/samza/
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.config;

public class KubeConfig {

// the image name of samza
public static final String APP_IMAGE = "kube.app.image";
public static final String DEFAULT_IMAGE = "weiqingyang/samza:v0";

// The directory path inside which the log will be stored.
public static final String SAMZA_MOUNT_DIR = "kube.app.pod.mnt.path";
public static final String K8S_API_NAMESPACE = "kube.app.namespace";
public static final String STREAM_PROCESSOR_CONTAINER_NAME_PREFIX = "sp";
public static final String JC_CONTAINER_NAME_PREFIX = "jc";
public static final String POD_RESTART_POLICY = "OnFailure";
public static final String JC_POD_NAME_FORMAT = "%s-%s-%s"; // jc-appName-appId
public static final String TASK_POD_NAME_FORMAT = "%s-%s-%s-%s"; // sp-appName-appId-containerId

// Environment variable
public static final String COORDINATOR_POD_NAME = "COORDINATOR_POD_NAME";
public static final String AZURE_REMOTE_VOLUME_ENABLED = "kube.app.volume.azure.file-share.enabled";
public static final String AZURE_SECRET = "kube.app.volume.azure-secret";
public static final String AZURE_FILESHARE = "kube.app.volume.azure.file-share";

private Config config;
public KubeConfig(Config config) {
this.config = config;
}

public static KubeConfig validate(Config config) throws ConfigException {
KubeConfig kc = new KubeConfig(config);
kc.validate();
return kc;
}

private void validate() throws ConfigException {
// TODO
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.job.kubernetes;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class KubeClientFactory {
private static final Logger LOG = LoggerFactory.getLogger(KubeClientFactory.class);

public static KubernetesClient create() {
ConfigBuilder builder = new ConfigBuilder();
Config config = builder.build();
KubernetesClient client = new DefaultKubernetesClient(config);
LOG.info("Kubernetes client created. ");
return client;
}
}
Loading

0 comments on commit 381a683

Please sign in to comment.