Skip to content

Commit

Permalink
jj
Browse files Browse the repository at this point in the history
  • Loading branch information
shaeqahmed committed Dec 9, 2024
1 parent acd52ee commit f3db496
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 7 deletions.
5 changes: 3 additions & 2 deletions crates/iceberg/src/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
// under the License.

//! Avro related codes.
mod schema;
pub(crate) use schema::*;
#[allow(dead_code)]
pub mod schema;
pub use schema::*;
19 changes: 15 additions & 4 deletions crates/iceberg/src/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use apache_avro::schema::{
ArraySchema, DecimalSchema, FixedSchema, MapSchema, Name, RecordField as AvroRecordField,
RecordFieldOrder, RecordSchema, UnionSchema,
};
use apache_avro::Schema as AvroSchema;
pub use apache_avro::Schema as AvroSchema;
use itertools::{Either, Itertools};
use serde_json::{Number, Value};

Expand All @@ -45,6 +45,14 @@ const LOGICAL_TYPE: &str = "logicalType";

struct SchemaToAvroSchema {
schema: String,
next_rec_id: i32,
}

impl SchemaToAvroSchema {
fn next_record_id(&mut self) -> i32 {
self.next_rec_id += 1;
self.next_rec_id
}
}

type AvroSchemaOrField = Either<AvroSchema, AvroRecordField>;
Expand Down Expand Up @@ -74,7 +82,7 @@ impl SchemaVisitor for SchemaToAvroSchema {
) -> Result<AvroSchemaOrField> {
let mut field_schema = avro_schema.unwrap_left();
if let AvroSchema::Record(record) = &mut field_schema {
record.name = Name::from(format!("r{}", field.id).as_str());
record.name = Name::from(format!("r{}", if field.id > 0 { field.id.to_string() } else { self.next_record_id().to_string() }).as_str());
}

if !field.required {
Expand Down Expand Up @@ -121,7 +129,9 @@ impl SchemaVisitor for SchemaToAvroSchema {
let mut field_schema = value.unwrap_left();

if let AvroSchema::Record(record) = &mut field_schema {
record.name = Name::from(format!("r{}", list.element_field.id).as_str());
record.name = Name::from(format!("r{}",
if list.element_field.id > 0 { list.element_field.id.to_string() } else { self.next_record_id().to_string() }
).as_str());
}

if !list.element_field.required {
Expand Down Expand Up @@ -244,9 +254,10 @@ impl SchemaVisitor for SchemaToAvroSchema {
}

/// Converting iceberg schema to avro schema.
pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Result<AvroSchema> {
pub fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Result<AvroSchema> {
let mut converter = SchemaToAvroSchema {
schema: name.to_string(),
next_rec_id: 0,
};

visit_schema(schema, &mut converter).map(Either::unwrap_left)
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub use catalog::{

pub mod table;

mod avro;
pub mod avro;
pub mod io;
pub mod spec;

Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,12 @@ impl fmt::Display for StructType {
/// Fields may have an optional comment or doc string. Fields can have default values.
pub struct NestedField {
/// Id unique in table schema
#[serde(default)]
pub id: i32,
/// Field Name
pub name: String,
/// Optional or required
#[serde(default)]
pub required: bool,
/// Datatype
pub field_type: Box<Type>,
Expand All @@ -534,8 +536,10 @@ pub struct NestedField {
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
struct SerdeNestedField {
#[serde(default)]
pub id: i32,
pub name: String,
#[serde(default)]
pub required: bool,
#[serde(rename = "type")]
pub field_type: Box<Type>,
Expand Down Expand Up @@ -726,7 +730,9 @@ pub(super) mod _serde {
#[serde(rename_all = "kebab-case")]
List {
r#type: String,
#[serde(default)]
element_id: i32,
#[serde(default)]
element_required: bool,
element: Cow<'a, Type>,
},
Expand Down

0 comments on commit f3db496

Please sign in to comment.