Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide Serializable Interface for Creating WorkerRuns #2232

Merged
merged 11 commits into from
Mar 3, 2021

Conversation

cgardens
Copy link
Contributor

@cgardens cgardens commented Feb 27, 2021

closes #2177

What

  • Adjust the interface for submitting jobs so that we have an interface that takes environment specific information for worker runs (in the future this information will be provided via temporal instead of the scheduler app) and a serializable configuration for setting up a WorkerRun of a specific type. In other words Temporal will run WorkerRuns (instead of just Workers) to use our existing naming.

How

  • Create a WorkerRunFactory for each job type that takes in the 2 types of configurations mentioned above. It looks silly right now, but should start to make more sense once the creation of the worker run happens in temporal.
  • What used to be called WorkerRunFactory is now called SchedulerWorkerRunAssembly. It has the environment specific information required for the Scheduler implementation of the executor to work. It also takes in the job type specific WorkerRunFactories. WorkerRunFactory is an interface for how to create WorkerRuns.

Pre-merge Checklist

  • lots of tests to fix

Recommended reading order

  1. SchedulerWorkerRunAssembly
  2. WorkerRunFactory (and classes that implement it)
  3. the rest

@cgardens cgardens requested a review from jrhizor February 27, 2021 01:52
Copy link
Contributor

@michel-tricot michel-tricot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of class indirections and classes with similar names or that are using Assembly because Factory was taken already.

I think the PR in its current state is going to make the code harder to follow.

* This class is a runnable that give a job id and db connection figures out how to run the
* appropriate worker for a given job.
*/
public class SchedulerWorkerRunAssembly {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you call it an assembly? It seems to be a factory.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncWorkerRunFactories {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to group them together instead of splitting them up in separate classes?

Copy link
Contributor Author

@cgardens cgardens Mar 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because they are essentially just the same logic with very slight changes in configuration. i wanted the code close together.

@cgardens cgardens requested a review from jrhizor March 2, 2021 22:18
@@ -26,6 +26,7 @@

import pendulum as pendulum
from base_python import BaseClient

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh oh


public class WorkerRunFactoryUtils {

public static IntegrationLauncher createLauncher(long jobId, int attempt, final String image, ProcessBuilderFactory pbf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could put this as a static method AirbyteIntegrationLauncher.create(...) instead of having a new utils file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchedulerWorkerRunWithEnvironmentFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the only value of this class vs just including it inside of JobSubmitter for testing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish the name just didn't resemble WorkerRunWithEnvironmentFactory. JobToWorkerRunConverter or anything really feels better so they have clearer roles.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, the naming really has a bad code smell right, but if I understand from your PR desc, only one will remain right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly. this is to get us over the hump from scheduler doing stuff to temporal doing it. will go away.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncWorkerRunFactories {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have that pattern anywhere in the code and it doesn't match the pattern of the other factories. you're basically doing "modules" with static classes. If the logic is the same, you can use inheritance or your can use helpers.

// T must be serializable. The generated json pojos do not implement serializable (but they are
// serializable). It means we can't force Serializable as a constraint in this interface
// unfortunately.
public interface WorkerRunFactory<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can enforce serializable on T and you can make sure the generated classes implement Serializable. jsonschema2pojo supports it.

https://github.com/joelittlejohn/jsonschema2pojo/tree/master/jsonschema2pojo-gradle-plugin
serializable = true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙌 thanks!


public class WorkerRunFactoryUtils {

public static IntegrationLauncher createLauncher(long jobId, int attempt, final String image, ProcessBuilderFactory pbf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

@@ -93,11 +94,12 @@ public SchedulerApp(Path workspaceRoot,
public void start() throws IOException {
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final WorkerRunFactory workerRunFactory = new WorkerRunFactory(workspaceRoot, pbf);
final SchedulerWorkerRunWithEnvironmentFactory schedulerWorkerRunWithEnvironmentFactory =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: You can probably shorten the var name to avoid the multiline statement. I think it is still ok to call it workerRunFactory

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchedulerWorkerRunWithEnvironmentFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, the naming really has a bad code smell right, but if I understand from your PR desc, only one will remain right?

this.workerRunFactory = workerRunFactory;
}

public WorkerRun create(final long jobId, final int attempt, final T config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this logic of creating path can at some point belong to the workerrun right (once we are 100% on temporal)? it is a bit overkill at the moment to have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can redefine the worker run concept a bit going forward to simplify this, but i think figuring that out will be easier once we're on temporal because we'll know exactly what the constrains are.

@cgardens cgardens force-pushed the cgardens/worker_run_assembly branch 2 times, most recently from 4115f46 to 7749367 Compare March 3, 2021 01:58
@cgardens cgardens force-pushed the cgardens/worker_run_assembly branch from 418e9ce to d83b6e0 Compare March 3, 2021 18:13
@cgardens cgardens merged commit 8503b7e into master Mar 3, 2021
@cgardens cgardens deleted the cgardens/worker_run_assembly branch March 3, 2021 18:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

improve worker construction interface
3 participants