-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Register debuggee prior to job submission #15
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,10 @@ | |
import static com.google.common.base.Preconditions.checkState; | ||
|
||
import com.google.api.client.googleapis.json.GoogleJsonResponseException; | ||
import com.google.api.services.clouddebugger.v2.Clouddebugger; | ||
import com.google.api.services.clouddebugger.v2.model.Debuggee; | ||
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest; | ||
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse; | ||
import com.google.api.services.dataflow.Dataflow; | ||
import com.google.api.services.dataflow.model.DataflowPackage; | ||
import com.google.api.services.dataflow.model.Job; | ||
|
@@ -168,6 +172,8 @@ | |
import java.util.SortedSet; | ||
import java.util.TreeSet; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
/** | ||
* A {@link PipelineRunner} that executes the operations in the | ||
* pipeline by first translating them to the Dataflow representation | ||
|
@@ -420,6 +426,37 @@ private <T> PCollection<T> applyWindow( | |
return super.apply(new AssignWindows<>(transform), input); | ||
} | ||
|
||
@Nullable | ||
private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not particularly modular design. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about putting this somewhere else, but we'd end up with a "DebuggerUtility" that was essentially this code. Given the high coupling with when it needs to be called (eg., it needs to be called before translating the job, it needs to have the uniquifier, etc) I don't think that getting these ~45 lines into a separate module helps much (eg., we're unlikely to reuse it, etc.). Open to thoughts if you had a specific suggestion for improvement. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pattern of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Given the dependency on pipeline options to create the client, etc. I'm not sure how reusable this will be, but I do think it is clearer to separate the options interactions from the debugger interactions. |
||
if (!options.getEnableCloudDebugger() || options.getDebuggee() != null) { | ||
return; | ||
} | ||
|
||
Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build(); | ||
RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest(); | ||
registerReq.setDebuggee(new Debuggee() | ||
.setProject(options.getProject()) | ||
.setUniquifier(uniquifier) | ||
.setDescription(uniquifier) | ||
.setAgentVersion("google.com/cloud-dataflow-java/v1")); | ||
|
||
try { | ||
RegisterDebuggeeResponse registerResponse = | ||
debuggerClient.controller().debuggees().register(registerReq).execute(); | ||
Debuggee debuggee = registerResponse.getDebuggee(); | ||
if (debuggee.getStatus() != null && debuggee.getStatus().getIsError()) { | ||
LOG.error("Error registering with debugger: {}", | ||
debuggee.getStatus().getDescription().getFormat()); | ||
return; | ||
} | ||
|
||
options.setDebuggee(debuggee); | ||
} catch (IOException e) { | ||
System.out.println("Unable to register with debugger: " + e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is is worth failing the job, instead of proceeding? The user has explicitly signed up for this behavior. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm torn. The current behavior of the worker will be to register a debuggee using the Job ID if the flag to enable the debugger is present but this call fails -- so in that regard there is a graceful fallback. On the other hand, I think failing here wouldn't be unreasonable since debugging was requested, and this error may indicate a problem that the worker won't be able to deal with. I'll add this tomorrow. |
||
LOG.error("Unable to register with debugger:", e); | ||
} | ||
} | ||
|
||
@Override | ||
public DataflowPipelineJob run(Pipeline pipeline) { | ||
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); | ||
|
@@ -428,9 +465,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { | |
+ "related to Google Compute Engine usage and other Google Cloud Services."); | ||
|
||
List<DataflowPackage> packages = options.getStager().stageFiles(); | ||
JobSpecification jobSpecification = | ||
translator.translate(pipeline, this, packages); | ||
Job newJob = jobSpecification.getJob(); | ||
|
||
|
||
// Set a unique client_request_id in the CreateJob request. | ||
// This is used to ensure idempotence of job creation across retried | ||
|
@@ -442,6 +477,15 @@ public DataflowPipelineJob run(Pipeline pipeline) { | |
int randomNum = new Random().nextInt(9000) + 1000; | ||
String requestId = DateTimeFormat.forPattern("YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC) | ||
.print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum; | ||
|
||
// Try to create a debuggee ID. This must happen before the job is translated since it may | ||
// update the options. | ||
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); | ||
maybeRegisterDebuggee(dataflowOptions, requestId); | ||
|
||
JobSpecification jobSpecification = | ||
translator.translate(pipeline, this, packages); | ||
Job newJob = jobSpecification.getJob(); | ||
newJob.setClientRequestId(requestId); | ||
|
||
String version = DataflowReleaseInfo.getReleaseInfo().getVersion(); | ||
|
@@ -450,7 +494,6 @@ public DataflowPipelineJob run(Pipeline pipeline) { | |
newJob.getEnvironment().setUserAgent(DataflowReleaseInfo.getReleaseInfo()); | ||
// The Dataflow Service may write to the temporary directory directly, so | ||
// must be verified. | ||
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); | ||
if (!Strings.isNullOrEmpty(options.getTempLocation())) { | ||
newJob.getEnvironment().setTempStoragePrefix( | ||
dataflowOptions.getPathValidator().verifyPath(options.getTempLocation())); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import com.google.api.client.json.JsonFactory; | ||
import com.google.api.client.json.jackson2.JacksonFactory; | ||
import com.google.api.services.bigquery.Bigquery; | ||
import com.google.api.services.clouddebugger.v2.Clouddebugger; | ||
import com.google.api.services.dataflow.Dataflow; | ||
import com.google.api.services.pubsub.Pubsub; | ||
import com.google.api.services.storage.Storage; | ||
|
@@ -148,6 +149,17 @@ public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options | |
.setGoogleClientRequestInitializer(options.getGoogleApiTrace()); | ||
} | ||
|
||
public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { | ||
return new Clouddebugger.Builder(getTransport(), | ||
getJsonFactory(), | ||
chainHttpRequestInitializer( | ||
options.getGcpCredential(), | ||
// Do not log 404. It clutters the output and is possible even required by the caller. | ||
new RetryHttpRequestInitializer(ImmutableList.of(404)))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You probably don't need this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the retry is probably necessary, so I assume you're suggesting I get rid of the "do not log 404" part? I'll do so tomorrow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
.setApplicationName(options.getAppName()) | ||
.setGoogleClientRequestInitializer(options.getGoogleApiTrace()); | ||
} | ||
|
||
/** | ||
* Returns a Dataflow client that does not automatically retry failed | ||
* requests. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add the getter too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 41: getDebuggee()
Line 42: setDebuggee()