Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve nullability from Avro to Catalyst Schema #137

Closed
kevinwallimann opened this issue Jun 15, 2020 · 1 comment · Fixed by #140
Closed

Preserve nullability from Avro to Catalyst Schema #137

kevinwallimann opened this issue Jun 15, 2020 · 1 comment · Fixed by #140
Assignees
Labels
bug Something isn't working
Milestone

Comments

@kevinwallimann
Copy link
Collaborator

kevinwallimann commented Jun 15, 2020

Currently, in a Kafka-to-Kafka (i.e. Avro -> Catalyst -> Avro) workflow (with columnselectortransformer), all fields are always nullable in the destination topic.
Example:
Source schema

{
  "type": "record",
  "name": "pageviews",
  "namespace": "ksql",
  "fields": [
    {
      "name": "viewtime",
      "type": "long"
    }
  ]
}

is written as

{
  "type": "record",
  "name": "pageviews",
  "namespace": "ksql",
  "fields": [
    {
      "name": "viewtime",
      "type": ["long", "null"]
    }
  ]
}

Expected: Non-nullable fields in the source Avro schema should also be non-nullable in the destination.
Nullable fields should stay nullable obviously.

Migration note
Making an existing nullable field non-nullable is a forward-compatible change (it's almost like adding a field)

@kevinwallimann
Copy link
Collaborator Author

Analysis
za.co.absa.abris.avro.sql.AvroDataToCatalyst is always nullable. https://github.com/AbsaOSS/ABRiS/blob/985fab1894826b4ea97a48d065a048b44a0ed180/src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala#L47
Not sure why it's not child.nullable, but it wouldn't change much in our case.

Since the expression wraps around the actual avro record, it causes all fields to be nullable when the wrapper expression is flattened like this

dataFrame
      .select(from_confluent_avro(col("value"), schemaRegistrySettings) as 'data)
      .select("data.*")

The root cause for the nullability is that the value of a kafka record can in general always be null. One use-case for a null value is the tombstone https://www.confluent.io/blog/handling-gdpr-log-forget/.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant