Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove warnings when temporal starts #2317

Merged
merged 3 commits into from
Mar 5, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,24 @@

package io.airbyte.scheduler.temporal;

import static java.util.stream.Collectors.toSet;

import io.airbyte.scheduler.temporal.TemporalUtils.TemporalJobType;
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.temporal.api.namespace.v1.NamespaceInfo;
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
import io.temporal.api.workflowservice.v1.ListNamespacesRequest;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import java.nio.file.Path;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TemporalPool implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(TemporalPool.class);

private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;

Expand All @@ -42,6 +52,8 @@ public TemporalPool(Path workspaceRoot, ProcessBuilderFactory pbf) {

@Override
public void run() {
waitForTemporalServerAndLog();

WorkerFactory factory = WorkerFactory.newInstance(TemporalUtils.TEMPORAL_CLIENT);

final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name());
Expand All @@ -67,4 +79,36 @@ public void run() {
factory.start();
}

private static void waitForTemporalServerAndLog() {
LOGGER.info("Waiting for temporal server...");

while (!getNamespaces().contains("default")) {
LOGGER.warn("Waiting for default namespace to be initialized in temporal...");
wait(5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we do 2 instead of 5? i feel like increments of 5 seconds is a long time given current startup speeds.

}

// sometimes it takes a few additional seconds for workflow queue listening to be available
wait(5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any other api call we can use to check this as opposed to just a hardcoded wait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tagged Davin for that; I wasn't able to find one. I'm not sure what we could do besides maybe attempt to start a worker and retry while hiding the exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems right. we didn't run into this problem since our temporal cluster was deployed once via helm manually and not touched after (except for manuel upgrades etc)

I spent some time looking at temporal docs + GRPC calls and wait does seem like the simplest solution for now.


LOGGER.info("Found temporal default namespace!");
}

private static void wait(int seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private static Set<String> getNamespaces() {
return TemporalUtils.TEMPORAL_SERVICE.blockingStub()
.listNamespaces(ListNamespacesRequest.newBuilder().build())
.getNamespacesList()
.stream()
.map(DescribeNamespaceResponse::getNamespaceInfo)
.map(NamespaceInfo::getName)
.collect(toSet());
}

}