Skip to content

Commit

Permalink
Add job name and worker id to traceId of AppendRowsRequest. (#28729)
Browse files Browse the repository at this point in the history
* Add new functions to retrieve job name and work id from GCE metadata
* Concatenate job name, job id and worker id with ":".
  • Loading branch information
Shunping Huang authored Oct 4, 2023
1 parent f30f6c5 commit b221d80
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,60 @@
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** */
public class GceMetadataUtil {
private static final String BASE_METADATA_URL = "http://metadata/computeMetadata/v1/";

private static final Logger LOG = LoggerFactory.getLogger(GceMetadataUtil.class);

static String fetchMetadata(String key) {
String requestUrl = BASE_METADATA_URL + key;
int timeoutMillis = 5000;
final HttpParams httpParams = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(httpParams, timeoutMillis);
HttpClient client = new DefaultHttpClient(httpParams);
HttpGet request = new HttpGet(BASE_METADATA_URL + key);
request.setHeader("Metadata-Flavor", "Google");

String ret = "";
try {
HttpClient client = new DefaultHttpClient(httpParams);

HttpGet request = new HttpGet(requestUrl);
request.setHeader("Metadata-Flavor", "Google");

HttpResponse response = client.execute(request);
if (response.getStatusLine().getStatusCode() != 200) {
// May mean its running on a non DataflowRunner, in which case it's perfectly normal.
return "";
if (response.getStatusLine().getStatusCode() == 200) {
InputStream in = response.getEntity().getContent();
try (final Reader reader = new InputStreamReader(in, StandardCharsets.UTF_8)) {
ret = CharStreams.toString(reader);
}
}
InputStream in = response.getEntity().getContent();
try (final Reader reader = new InputStreamReader(in, StandardCharsets.UTF_8)) {
return CharStreams.toString(reader);
}
} catch (IOException e) {
// May mean its running on a non DataflowRunner, in which case it's perfectly normal.
} catch (IOException ignored) {
}
return "";

// The return value can be an empty string, which may mean it's running on a non DataflowRunner.
LOG.debug("Fetched GCE Metadata at '{}' and got '{}'", requestUrl, ret);

return ret;
}

private static String fetchVmInstanceMetadata(String instanceMetadataKey) {
return GceMetadataUtil.fetchMetadata("instance/" + instanceMetadataKey);
}

private static String fetchCustomGceMetadata(String customMetadataKey) {
return GceMetadataUtil.fetchMetadata("instance/attributes/" + customMetadataKey);
return GceMetadataUtil.fetchVmInstanceMetadata("attributes/" + customMetadataKey);
}

public static String fetchDataflowJobId() {
return GceMetadataUtil.fetchCustomGceMetadata("job_id");
}

public static String fetchDataflowJobName() {
return GceMetadataUtil.fetchCustomGceMetadata("job_name");
}

public static String fetchDataflowWorkerId() {
return GceMetadataUtil.fetchVmInstanceMetadata("id");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,15 @@ final class BigQueryIOMetadata {

private @Nullable String beamJobId;

private BigQueryIOMetadata(@Nullable String beamJobId) {
private @Nullable String beamJobName;

private @Nullable String beamWorkerId;

private BigQueryIOMetadata(@Nullable String beamJobId, @Nullable String beamJobName,
@Nullable String beamWorkerId) {
this.beamJobId = beamJobId;
this.beamJobName = beamJobName;
this.beamWorkerId = beamWorkerId;
}

private static final Pattern VALID_CLOUD_LABEL_PATTERN =
Expand All @@ -41,17 +48,24 @@ private BigQueryIOMetadata(@Nullable String beamJobId) {
*/
public static BigQueryIOMetadata create() {
String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
String dataflowJobName = GceMetadataUtil.fetchDataflowJobName();
String dataflowWorkerId = GceMetadataUtil.fetchDataflowWorkerId();

// If a Dataflow job id is returned on GCE metadata. Then it means
// this program is running on a Dataflow GCE VM.
boolean isDataflowRunner = dataflowJobId != null && !dataflowJobId.isEmpty();
boolean isDataflowRunner = !dataflowJobId.isEmpty();

String beamJobId = null;
String beamJobName = null;
String beamWorkerId = null;
if (isDataflowRunner) {
if (BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) {
beamJobId = dataflowJobId;
beamJobName = dataflowJobName;
beamWorkerId = dataflowWorkerId;
}
}
return new BigQueryIOMetadata(beamJobId);
return new BigQueryIOMetadata(beamJobId, beamJobName, beamWorkerId);
}

public Map<String, String> addAdditionalJobLabels(Map<String, String> jobLabels) {
Expand All @@ -68,6 +82,20 @@ public Map<String, String> addAdditionalJobLabels(Map<String, String> jobLabels)
return this.beamJobId;
}

/*
* Returns the beam job name. Can be null if it is not running on Dataflow.
*/
public @Nullable String getBeamJobName() {
return this.beamJobName;
}

/*
* Returns the beam worker id. Can be null if it is not running on Dataflow.
*/
public @Nullable String getBeamWorkerId() {
return this.beamWorkerId;
}

/**
* Returns true if label_value is a valid cloud label string. This function can return false in
* cases where the label value is valid. However, it will not return true in a case where the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,12 @@ public StreamAppendClient getStreamAppendClient(
.setChannelsPerCpu(2)
.build();

String traceId = String.format("Dataflow:%s:%s:%s",
bqIOMetadata.getBeamJobName() == null ? options.getJobName()
: bqIOMetadata.getBeamJobName(),
bqIOMetadata.getBeamJobId() == null ? "" : bqIOMetadata.getBeamJobId(),
bqIOMetadata.getBeamWorkerId() == null ? "" : bqIOMetadata.getBeamWorkerId());

StreamWriter streamWriter =
StreamWriter.newBuilder(streamName, newWriteClient)
.setExecutorProvider(
Expand All @@ -1374,11 +1380,7 @@ public StreamAppendClient getStreamAppendClient(
.setEnableConnectionPool(useConnectionPool)
.setMaxInflightRequests(storageWriteMaxInflightRequests)
.setMaxInflightBytes(storageWriteMaxInflightBytes)
.setTraceId(
"Dataflow:"
+ (bqIOMetadata.getBeamJobId() != null
? bqIOMetadata.getBeamJobId()
: options.getJobName()))
.setTraceId(traceId)
.build();
return new StreamAppendClient() {
private int pins = 0;
Expand Down

0 comments on commit b221d80

Please sign in to comment.