-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Register debuggee prior to job submission #15
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,10 @@ | |
import static com.google.common.base.Preconditions.checkState; | ||
|
||
import com.google.api.client.googleapis.json.GoogleJsonResponseException; | ||
import com.google.api.services.clouddebugger.v2.Clouddebugger; | ||
import com.google.api.services.clouddebugger.v2.model.Debuggee; | ||
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest; | ||
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse; | ||
import com.google.api.services.dataflow.Dataflow; | ||
import com.google.api.services.dataflow.model.DataflowPackage; | ||
import com.google.api.services.dataflow.model.Job; | ||
|
@@ -168,6 +172,8 @@ | |
import java.util.SortedSet; | ||
import java.util.TreeSet; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
/** | ||
* A {@link PipelineRunner} that executes the operations in the | ||
* pipeline by first translating them to the Dataflow representation | ||
|
@@ -420,6 +426,43 @@ private <T> PCollection<T> applyWindow( | |
return super.apply(new AssignWindows<>(transform), input); | ||
} | ||
|
||
private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not particularly modular design. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about putting this somewhere else, but we'd end up with a "DebuggerUtility" that was essentially this code. Given the high coupling with when it needs to be called (eg., it needs to be called before translating the job, it needs to have the uniquifier, etc) I don't think that getting these ~45 lines into a separate module helps much (eg., we're unlikely to reuse it, etc.). Open to thoughts if you had a specific suggestion for improvement. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pattern of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Given the dependency on pipeline options to create the client, etc. I'm not sure how reusable this will be, but I do think it is clearer to separate the options interactions from the debugger interactions. |
||
if (!options.getEnableCloudDebugger()) { | ||
return; | ||
} | ||
|
||
if (options.getDebuggee() != null) { | ||
throw new RuntimeException("Should not specify the debuggee"); | ||
} | ||
|
||
Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build(); | ||
Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier); | ||
options.setDebuggee(debuggee); | ||
} | ||
|
||
private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifier) { | ||
RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest(); | ||
registerReq.setDebuggee(new Debuggee() | ||
.setProject(options.getProject()) | ||
.setUniquifier(uniquifier) | ||
.setDescription(uniquifier) | ||
.setAgentVersion("google.com/cloud-dataflow-java/v1")); | ||
|
||
try { | ||
RegisterDebuggeeResponse registerResponse = | ||
debuggerClient.controller().debuggees().register(registerReq).execute(); | ||
Debuggee debuggee = registerResponse.getDebuggee(); | ||
if (debuggee.getStatus() != null && debuggee.getStatus().getIsError()) { | ||
throw new RuntimeException("Unable to register with the debugger: " + | ||
debuggee.getStatus().getDescription().getFormat()); | ||
} | ||
|
||
return debuggee; | ||
} catch (IOException e) { | ||
throw new RuntimeException("Unable to register with the debugger: ", e); | ||
} | ||
} | ||
|
||
@Override | ||
public DataflowPipelineJob run(Pipeline pipeline) { | ||
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); | ||
|
@@ -428,9 +471,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { | |
+ "related to Google Compute Engine usage and other Google Cloud Services."); | ||
|
||
List<DataflowPackage> packages = options.getStager().stageFiles(); | ||
JobSpecification jobSpecification = | ||
translator.translate(pipeline, this, packages); | ||
Job newJob = jobSpecification.getJob(); | ||
|
||
|
||
// Set a unique client_request_id in the CreateJob request. | ||
// This is used to ensure idempotence of job creation across retried | ||
|
@@ -442,6 +483,15 @@ public DataflowPipelineJob run(Pipeline pipeline) { | |
int randomNum = new Random().nextInt(9000) + 1000; | ||
String requestId = DateTimeFormat.forPattern("YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC) | ||
.print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum; | ||
|
||
// Try to create a debuggee ID. This must happen before the job is translated since it may | ||
// update the options. | ||
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); | ||
maybeRegisterDebuggee(dataflowOptions, requestId); | ||
|
||
JobSpecification jobSpecification = | ||
translator.translate(pipeline, this, packages); | ||
Job newJob = jobSpecification.getJob(); | ||
newJob.setClientRequestId(requestId); | ||
|
||
String version = DataflowReleaseInfo.getReleaseInfo().getVersion(); | ||
|
@@ -450,7 +500,6 @@ public DataflowPipelineJob run(Pipeline pipeline) { | |
newJob.getEnvironment().setUserAgent(DataflowReleaseInfo.getReleaseInfo()); | ||
// The Dataflow Service may write to the temporary directory directly, so | ||
// must be verified. | ||
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); | ||
if (!Strings.isNullOrEmpty(options.getTempLocation())) { | ||
newJob.getEnvironment().setTempStoragePrefix( | ||
dataflowOptions.getPathValidator().verifyPath(options.getTempLocation())); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add the getter too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 41: getDebuggee()
Line 42: setDebuggee()