-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
README section for Logical Types Conversion #5
Comments
That's a great question. When I implemented logical types support I first tried to go the standard AVRO way of defining the schema of course. However, it always resulted in errors at least for the then current kafka versions (0.10.0+0.10.1). Was pretty hard to find out how to define the schema to get it working thus I decided to explicitly point this out in the README to save others some headaches :) It may well be that in the upcoming version (the commit you're referring to is actually for confluent platform 3.2.0-rc1) the standard AVRO way by just using the logical type is fine. We'll see... If you find/found a way to get it working without the fully-qualified class name I'm happy to receive a pull request to change the code and/or readme. Cheers and thx again! |
I did manage to get it working. I was using the Kafka Connect docker image confluentinc/cp-kafka-connect:3.2.0 I have only verified the byte array to connect record conversion by using logicalType property for date. This is how I defined the field in my Avro Schema and it worked fine - i.e. I ended up with a Date on Mongo. |
Ok I see. It seems to me that this most likely became possible with version 0.10.2 because it didn't work in the past for me. Haven't tried it after upgrading dependencies to be honest. Can you confirm that at the moment both ways are working for you? Or is it now only possible with the logical type name instead of the class name? |
My observation is that it is only possible if you have "logicalType" set in the field schema. Setting the logicalType is now mandatory for the AvroConverter to work properly. Setting the "connect.name" property is now optional. I have only worked with the Date logical type so far - but I believe my observation holds true for all logical types. You may refer to the toConnectSchema method here https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java |
I was just investigating matters with confluent platform 3.2.0 and think that we are maybe talking about different things here. My local demo scenario is as follows:
All of this works as intended. Data is serialized correctly and stored in the kafka topic as expected. Now, if I were to change the .avsc AVRO schema definition as you suggest, i.e. leaving out the connect.name property and renaming the nested type property to logicalType the AVRO code generator fails complaining about "...no such type". |
Ok. You should not rename the nested type property to logicalType -- just mention the "logicalType" in addition to the "type". So converting your example to use logicalType should look like: ... {"name": "date","type": { "type": "int", "logicalType": "date" } } ... Is that how you tried it? |
No I didn't. Have tried it as you suggested and at least code generation works with that. However, interestingly enough, the resulting Java class definition changed and thus broke my producer app code :) Instead of the date field resulting in Java type int/Integer it becomes a Joda LocalDate with your definition... very strange to me. Not so for the other three logical types (decimal, time, timestamp) which are still mapped to the same Java types as they were with my schema definition. Many thx anyway for pointing this out. Unsure how to proceed on this because it feels wrong to me to accept a Joda LocalDate for this. Maybe I'm wrong and it's the way to go. |
Here is how I was doing it: Now from my kafka-connect app which listens to this topic, I have |
I just had a look at the Avro Repository and I think what you observed might be the intended behavior in Avro. It seems for a date logicalType -- if you are working with a GenericRecord then you just populate the field as an int as I had mentioned in my earlier comment. However, when you work with Custom value class (generated from the Avro Schema) then the instance variable in the custom value class is a Joda LocalDate. Refer the DateConversion inner class from https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java |
I used avro-tools-1.8.0 to generate a class for me from an avsc file. While converting to byte arrays, Avro has converters that encode date and time-millis as int and timestamp-millis as long. The io.confluent.connect.avro.AvroConverter will treat these logicalTypes properly as long as the schema has the "logicalType" property in addition to the "type" property From a documentation perspective for the mongo connector, we are more concerned about the conversion of the Avro Schema to a Connect Schema as part of creating a ConnectRecord. The io.confluent.connect.avro.AvroConverter will do this Avro Schema --> Connect Schema conversion properly only if the Avro schema has the correct "logicalType" configured. |
Thx for checking this stuff. I'll adapt the readme to reflect the main aspects of this discussion. I can confirm the type mapping by getting the following code generation result with avro-tools-1.8.1
mapped to Java Types: private org.joda.time.LocalDate date; |
Thanks @hpgrahsl |
It is working for me .Many thanks Soumabrata. |
Hi,
While working with logical types, I noticed that Confluent have changed their AvroConverter to rely on the property "logicalType" instead of "connect.name".
This is with reference to this commit confluentinc/schema-registry@da4d548
So, for example, for Date logical type, I can now mention "logicalType":"date" in the relevant schema field instead of "connect.name":"org.apache.kafka.connect.data.Date"
The README explicitly advises to use the connect.name instead of logicalType. Is there some aspect I am missing?
The text was updated successfully, but these errors were encountered: