Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding message metadata in logs in case of errors #128

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

brunodomenici
Copy link

Hello,

We have difficult to identify bad messages in ours workloads. We need to know the topic/partition/offset of problematic message.
In the PR we propose to include topic/partition/offset/timestamp information in the log In case of error in BigQuery side and also a log for schema cycle error.

Thank you

Copy link

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @brunodomenici. I've left some thoughts here; let me know if you have any questions.

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change; we prefer explicit import lists instead of wildcards.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@@ -44,6 +40,7 @@
* {@link com.google.cloud.bigquery.Schema BigQuery Schemas}.
*/
public class BigQuerySchemaConverter implements SchemaConverter<com.google.cloud.bigquery.Schema> {
private static final Logger logger = LoggerFactory.getLogger(AdaptiveBigQueryWriter.class);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have its own logging namespace instead of copying the one from the adaptive writer:

Suggested change
private static final Logger logger = LoggerFactory.getLogger(AdaptiveBigQueryWriter.class);
private static final Logger logger = LoggerFactory.getLogger(BigQuerySchemaConverter.class);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

import com.wepay.kafka.connect.bigquery.convert.logicaltype.DebeziumLogicalConverters;
import com.wepay.kafka.connect.bigquery.convert.logicaltype.KafkaLogicalConverters;
import com.wepay.kafka.connect.bigquery.convert.logicaltype.LogicalConverterRegistry;
import com.wepay.kafka.connect.bigquery.convert.logicaltype.LogicalTypeConverter;
import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException;

import com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed once the logging namespace is corrected.

import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;

import com.google.cloud.bigquery.*;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change and keep the import list explicit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.*;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change and keep the import list explicit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@@ -106,7 +103,9 @@ public BigQuerySchemaConverter(boolean allFieldsNullable) {
ConversionConnectException("Top-level Kafka Connect schema must be of type 'struct'");
}

throwOnCycle(kafkaConnectSchema, new ArrayList<>());
if(throwOnCycle(kafkaConnectSchema, new ArrayList<>())) {
throw new ConversionConnectException("Kafka Connect schema contains cycle. See logs for detail");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the "See logs for detail" bit necessary? Why not include information on the failing schema in the exception message itself?

This is an important distinction because the message of the exception that causes the connector to fail is made visible to users via the REST API via things like the /connector/{connector}/status endpoint, whereas logs are harder to access. We should try to include important information directly in exception messages when possible so that people don't have to look at logs to get the info they need. See #150 for an example of where we're trying to improve things on that front.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. Done

}

if (seenSoFar.contains(kafkaConnectSchema)) {
throw new ConversionConnectException("Kafka Connect schema contains cycle");
logger.error("Cycle detected : " + kafkaConnectSchema.name());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mention this above, but I don't think this should be a log message (or at least, not exclusively one) and can be included in the message of the exception that ends up being thrown.

Also, what happens if there's a cycle involving nested sub-schemas? Will kafkaConnectSchema.name() give us the name of the top-level schema (which is probably what we want to give users), or will it give the name of the nested sub-schema that creates the cycle (which may confuse people as it may not refer to the actual schema they've registered in, e.g., Schema Registry)?

Finally, similar to what you've done with the LoggerUtils::logRecord method, should we include metadata about the record that causes this issue in the exception message?

Ultimately, what I think would be useful here is an exception whose message explains that a cycle has been detected in the record schema, contains name of the top-level schema for the record, and has metadata on the topic/partition/offset for the record.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I agree this could be confusing. I've tried to more explicit.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've refactored this to give both the top level schema name as well as the name of the attribute causing the cycle.
Moreover, we also wrapped the ConnectConversionException with another one to give details about the specified connect record that causes an issue, so that we didn't need to alter the SchemaConverter interface.

@@ -120,33 +119,37 @@ public BigQuerySchemaConverter(boolean allFieldsNullable) {
return com.google.cloud.bigquery.Schema.of(fields);
}

private void throwOnCycle(Schema kafkaConnectSchema, List<Schema> seenSoFar) {
private boolean throwOnCycle(Schema kafkaConnectSchema, List<Schema> seenSoFar) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if we're not throwing an exception from this method anymore and are instead returning a boolean indicating whether the schema has a cycle or not, the method should be renamed to something like schemaContainsCycle.

Copy link
Author

@brunodomenici brunodomenici Dec 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. Done

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted back to throwOnCycle with additional parameters.

@@ -324,6 +324,7 @@ private TableInfo getTableInfo(TableId table, List<SinkRecord> records, Boolean
List<com.google.cloud.bigquery.Schema> bigQuerySchemas = new ArrayList<>();
Optional.ofNullable(readTableSchema(table)).ifPresent(bigQuerySchemas::add);
for (SinkRecord record : records) {
logger.debug("Convert Schema for :"+record.topic()+"[p="+record.kafkaPartition()+"](o="+record.kafkaOffset()+")");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments:

  • We may not actually do any conversion if the retriever returns null. This should probably be moved directly above the call to convertRecordSchema
  • We're only converting the value schema here; key conversion takes place elsewhere. We should note this in the message here ("Converting value schema for...") and also add a logging message for key schema conversion where that takes place
  • The message can be made more human-readable by eliminating shorthand syntax and becoming more verbose; something like "record with topic: , partition: , offset: "
  • Slf4j marker syntax (i.e., "{}") can be used instead of string concatenation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok for marker syntax and more human readable.
For the first point I don't understand. I think this code was already there.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log line is replaced with a clearer error message in the ConversionConnectException.
Therefore, there is not really a need for this log line anymore.

@@ -189,6 +186,7 @@ private boolean isPartialFailure(SortedMap<SinkRecord, InsertAllRequest.RowToIns
for (Map.Entry<SinkRecord, InsertAllRequest.RowToInsert> row: rows.entrySet()) {
if (failRowsSet.contains((long)index)) {
failRows.put(row.getKey(), row.getValue());
logger.trace("Failed Record: {}", row);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After digging a little, this is the import point to us. We really need to know more about failed records. I don't need to relate them with record pooled by Kafka Connect.

This simplify changes I guess.

@brunodomenici
Copy link
Author

Hi @C0urante . Thanks for the review. I'm really sorry, I had no time to work on this. I'm back now.

I think I'm able to simplify a little bit.

@brunodomenici brunodomenici marked this pull request as draft December 2, 2021 09:41
@brunodomenici
Copy link
Author

Let me rebase...and resolve conflits

@brunodomenici brunodomenici marked this pull request as ready for review December 2, 2021 14:43
@hassankishk hassankishk requested a review from a team as a code owner October 18, 2022 14:33
Co-authored-by: Hassan Kishk <[email protected]>
Co-authored-by: Bruno Domenici <[email protected]>
@NicolasFruyAdeo
Copy link

@C0urante Hey, sorry we've kept this PR alone for so long. We've finally come back to it.
We've rebased our branch on the current master branch.
We've tried to revert a few things and hope this new approach is satisfactory.

@C0urante
Copy link

@NicolasFruyAdeo I am no longer working at Confluent and am not actively reviewing PRs for this project; please reach out to the current maintainers if you'd still like to pursue this change.

@NicolasFruyAdeo
Copy link

Hi @ManasjyotiSharma @kapilchhajer @ypmahajan,
Would you mind taking over ownership of this PR and review our proposal to improve diagnosing issues through better error messages?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants