From 3bbbe2ff8fdc1b42fab6b15f78f90752470529fc Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 11 Dec 2023 21:54:33 +0200 Subject: [PATCH 01/11] First version WIP --- .../beam/sdk/io/clickhouse/ClickHouseIO.java | 18 ++++++- .../sdk/io/clickhouse/ClickHouseWriter.java | 13 +++++ .../beam/sdk/io/clickhouse/TableSchema.java | 26 +++++++++- .../src/main/javacc/ColumnTypeParser.jj | 49 +++++++++++++++++ .../sdk/io/clickhouse/ClickHouseIOTest.java | 49 +++++++++++++++++ .../sdk/io/clickhouse/TableSchemaTest.java | 52 +++++++++++++++++++ 6 files changed, 205 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index 52d520a0a191..2ac36979f720 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -27,6 +27,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; @@ -498,7 +499,22 @@ public static TableSchema getTableSchema(String jdbcUrl, String table) { String defaultTypeStr = rs.getString("default_type"); String defaultExpression = rs.getString("default_expression"); - ColumnType columnType = ColumnType.parse(type); + ColumnType columnType = null; + if (type.toLowerCase().trim().startsWith("tuple(")) { + System.out.println(type); + List l = + Arrays.stream(type.trim().split(",")) + .map(s -> s.trim().replaceAll(" +", "':;")) + .collect(Collectors.toList()); + String content = + String.join(",", l).trim().replaceAll("\\(", "('").replaceAll(",", ",'"); + System.out.println(content); + // columnType = ColumnType.parse(content); + columnType = ColumnType.parse(content); + + } else { + columnType = ColumnType.parse(type); + } DefaultType defaultType = DefaultType.parse(defaultTypeStr).orElse(null); Object defaultValue; diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java index 8ed62eee3b59..c3e0758fa67a 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java @@ -21,9 +21,11 @@ import com.clickhouse.client.ClickHousePipedOutputStream; import com.clickhouse.client.data.BinaryStreamUtils; import java.io.IOException; +import java.util.Collection; import java.util.List; import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.RowWithStorage; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.joda.time.Days; @@ -146,6 +148,17 @@ static void writeValue(ClickHouseOutputStream stream, ColumnType columnType, Obj case BOOL: BinaryStreamUtils.writeBoolean(stream, (Boolean) value); break; + case TUPLE: + RowWithStorage rowValues = (RowWithStorage) value; + List tupleValues = rowValues.getValues(); + Collection columnTypesList = columnType.tupleTypes().values(); + BinaryStreamUtils.writeVarInt(stream, tupleValues.size()); + int index = 0; + for (ColumnType ct : columnTypesList) { + writeValue(stream, ct, tupleValues.get(index)); + index++; + } + break; } } diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java index 06ba2399a3cd..b89a88b3fae8 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.checkerframework.checker.nullness.qual.Nullable; @@ -111,6 +112,14 @@ public static Schema.FieldType getEquivalentFieldType(ColumnType columnType) { return Schema.FieldType.STRING; case BOOL: return Schema.FieldType.BOOLEAN; + case TUPLE: + List fields = + columnType.tupleTypes().entrySet().stream() + .map(x -> Schema.Field.of(x.getKey(), Schema.FieldType.DATETIME)) + .collect(Collectors.toList()); + Schema.Field[] array = fields.toArray(new Schema.Field[fields.size()]); + Schema schema = Schema.of(array); + return Schema.FieldType.row(schema); } // not possible, errorprone checks for exhaustive switch @@ -168,7 +177,9 @@ public enum TypeName { // Composite type ARRAY, // Primitive type - BOOL + BOOL, + // Composite type + TUPLE } /** @@ -208,6 +219,7 @@ public abstract static class ColumnType implements Serializable { public static final ColumnType UINT32 = ColumnType.of(TypeName.UINT32); public static final ColumnType UINT64 = ColumnType.of(TypeName.UINT64); public static final ColumnType BOOL = ColumnType.of(TypeName.BOOL); + public static final ColumnType TUPLE = ColumnType.of(TypeName.TUPLE); // ClickHouse doesn't allow nested nullables, so boolean flag is enough public abstract boolean nullable(); @@ -220,6 +232,8 @@ public abstract static class ColumnType implements Serializable { public abstract @Nullable ColumnType arrayElementType(); + public abstract @Nullable Map tupleTypes(); + public ColumnType withNullable(boolean nullable) { return toBuilder().nullable(nullable).build(); } @@ -265,6 +279,14 @@ public static ColumnType array(ColumnType arrayElementType) { .build(); } + public static ColumnType tuple(Map elements) { + return ColumnType.builder() + .typeName(TypeName.TUPLE) + .nullable(false) + .tupleTypes(elements) + .build(); + } + /** * Parse string with ClickHouse type to {@link ColumnType}. * @@ -339,6 +361,8 @@ abstract static class Builder { public abstract Builder fixedStringSize(Integer size); + public abstract Builder tupleTypes(Map tupleElements); + public abstract ColumnType build(); } } diff --git a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj index abe29aff3f8d..fd22b99fcb35 100644 --- a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj +++ b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj @@ -17,6 +17,10 @@ */ options { IGNORE_CASE=true; + DEBUG_PARSER = true; + DEBUG_LOOKAHEAD = true; + DEBUG_TOKEN_MANAGER = true; + STATIC = false; } PARSER_BEGIN(ColumnTypeParser) @@ -99,6 +103,9 @@ TOKEN : | < EQ : "=" > | < BOOL : "BOOL" > | < LOWCARDINALITY : "LOWCARDINALITY" > + | < TUPLE : "TUPLE" > + | < COLON : ":" > + | < SEMI_COLON : ";" > } public ColumnType columnType() : @@ -113,6 +120,7 @@ public ColumnType columnType() : | ct = array() | ct = nullable() | ct = lowcardenality() + | ct = tuple() ) { return ct; @@ -263,6 +271,33 @@ private Map enumElements() : } } +private Map.Entry tupleElement() : +{ + String key; + ColumnType value; + Token token; +} +{ + ( (key = string() ) ( value = columnType() ) ) { + return Maps.immutableEntry(key, value); + } +} + +private Map tupleElements() : +{ + Map.Entry el; + List> entries = Lists.newArrayList(); +} +{ + ( + ( el = tupleElement() { entries.add(el); } ) + ( ( el = tupleElement() { entries.add(el); } ) )* + ) + { + return ImmutableMap.copyOf(entries); + } +} + private ColumnType enum_() : { Map elements; @@ -289,4 +324,18 @@ private ColumnType lowcardenality() : ( ( (ct = primitive()) ) { return ct; } ) +} + +private ColumnType tuple() : +{ + Map elements; +} +{ + ( + ( ( elements = tupleElements() ) ) + { + return ColumnType.tuple(elements); + } + ) + } \ No newline at end of file diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java index 33fe9467d45b..35d3584353b5 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java @@ -138,6 +138,55 @@ public void testArrayOfArrayOfInt64() throws Exception { assertEquals(15L, sum0); } + @Test + public void testTupleType() throws Exception { + /* Tuple('s':;String) + Tuple( + browser Tuple(name Nullable(String), + size Tuple(width Nullable(Int64), height Nullable(Int64)), + version Nullable(String) + ), + deviceCategory Nullable(String), + mobileDeviceInfo Nullable(String), + mobileDeviceMarketingName Nullable(String), + mobileDeviceModel Nullable(String), + mobileInputSelector Nullable(String), + operatingSystem Nullable(String), + operatingSystemVersion Nullable(String), + mobileDeviceBranding Nullable(String), + flashVersion Nullable(String), + javaEnabled Nullable(Bool), + language Nullable(String), + screen Tuple(colors Nullable(String), + resolution Tuple(width Nullable(Int64), + height Nullable(Int64) + )), + isBot Nullable(Bool), + isApp Nullable(Bool), + isInAppWebview Nullable(Bool) + ) + */ + + Schema tupleSchema = + Schema.of( + Schema.Field.of("f0", FieldType.STRING), Schema.Field.of("f1", FieldType.BOOLEAN)); + Schema schema = Schema.of(Schema.Field.of("t0", FieldType.row(tupleSchema))); + Row row1Tuple = Row.withSchema(tupleSchema).addValue("tuple").addValue(true).build(); + + Row row1 = Row.withSchema(schema).addValue(row1Tuple).build(); + + executeSql( + "CREATE TABLE test_named_tuples (" + "t0 Tuple(`f0` String, `f1` Bool)" + ") ENGINE=Log"); + + pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_named_tuples")); + + pipeline.run().waitUntilFinish(); + + try (ResultSet rs = executeQuery("SELECT * FROM test_named_tuples")) { + rs.next(); + } + } + @Test public void testPrimitiveTypes() throws Exception { Schema schema = diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java index 174761403471..51ebf1e8f2b0 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; +import java.util.HashMap; import java.util.Map; import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType; import org.apache.beam.sdk.schemas.Schema; @@ -196,4 +197,55 @@ public void testEquivalentSchema() { assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema)); } + + @Test + public void testParseTupleSingle() { + Map m1 = new HashMap<>(); + m1.put("s", ColumnType.STRING); + ColumnType columnType01 = ColumnType.parse("Tuple('s':;String)"); + assertEquals(ColumnType.tuple(m1), columnType01); + } + + @Test + public void testParseTupleDouble() { + Map m2 = new HashMap<>(); + m2.put("a1", ColumnType.STRING); + m2.put("b", ColumnType.BOOL); + ColumnType columnType02 = ColumnType.parse("Tuple('a1':;String,'b':;Bool)"); + assertEquals(ColumnType.tuple(m2), columnType02); + } + + @Test + public void testTupleNested() { + Map m1 = new HashMap<>(); + m1.put("a", ColumnType.STRING); + Map m3 = new HashMap<>(); + m3.put("a", ColumnType.STRING); + m3.put("b", ColumnType.BOOL); + m3.put("c", ColumnType.tuple(m1)); + ColumnType columnType03 = + ColumnType.parse("Tuple('a':;String,'b':;Bool,'c':;Tuple('a':;String))"); + assertEquals(ColumnType.tuple(m3), columnType03); + } + + @Test + public void testTupleComplex() { + Map m1 = new HashMap<>(); + m1.put("width", ColumnType.INT64.withNullable(true)); + m1.put("height", ColumnType.INT64.withNullable(true)); + + Map m2 = new HashMap<>(); + m2.put("name", ColumnType.STRING.withNullable(true)); + m2.put("size", ColumnType.tuple(m1)); + m2.put("version", ColumnType.STRING.withNullable(true)); + + Map m3 = new HashMap<>(); + m3.put("browser", ColumnType.tuple(m2)); + m3.put("deviceCategory", ColumnType.STRING.withNullable(true)); + + ColumnType columnType03 = + ColumnType.parse( + "Tuple('browser':;Tuple('name':;Nullable(String),'size':;Tuple('width':;Nullable(Int64),'height':;Nullable(Int64)),'version':;Nullable(String)),'deviceCategory':; Nullable(String))"); + assertEquals(ColumnType.tuple(m3), columnType03); + } } From a3a1850bea86506a9e3cf43f2333d84bd7cda59a Mon Sep 17 00:00:00 2001 From: mzitnik Date: Tue, 12 Dec 2023 08:32:04 +0200 Subject: [PATCH 02/11] Implement write tuple in RowBinary format --- .../apache/beam/sdk/io/clickhouse/ClickHouseWriter.java | 2 +- .../apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java index c3e0758fa67a..4cd0c295ca89 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java @@ -152,7 +152,7 @@ static void writeValue(ClickHouseOutputStream stream, ColumnType columnType, Obj RowWithStorage rowValues = (RowWithStorage) value; List tupleValues = rowValues.getValues(); Collection columnTypesList = columnType.tupleTypes().values(); - BinaryStreamUtils.writeVarInt(stream, tupleValues.size()); +// BinaryStreamUtils.writeVarInt(stream, tupleValues.size()); int index = 0; for (ColumnType ct : columnTypesList) { writeValue(stream, ct, tupleValues.get(index)); diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java index 35d3584353b5..26fb957c0f0e 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java @@ -184,6 +184,13 @@ isInAppWebview Nullable(Bool) try (ResultSet rs = executeQuery("SELECT * FROM test_named_tuples")) { rs.next(); + assertEquals("('tuple',true)", rs.getString("t0")); + } + + try (ResultSet rs = executeQuery("SELECT t0.f0 as f0, t0.f1 as f1 FROM test_named_tuples")) { + rs.next(); + assertEquals("tuple", rs.getString("f0")); + assertEquals("true", rs.getString("f1")); } } From 53e4e819d8ec396ebca1afe65bdad75db999ceb6 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Tue, 12 Dec 2023 09:30:47 +0200 Subject: [PATCH 03/11] Added complex tuple test --- .../beam/sdk/io/clickhouse/ClickHouseIO.java | 2 +- .../sdk/io/clickhouse/ClickHouseWriter.java | 7 +- .../sdk/io/clickhouse/ClickHouseIOTest.java | 76 ++++++++++++------- 3 files changed, 55 insertions(+), 30 deletions(-) diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index 2ac36979f720..aa95b0b0c3a6 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -507,7 +507,7 @@ public static TableSchema getTableSchema(String jdbcUrl, String table) { .map(s -> s.trim().replaceAll(" +", "':;")) .collect(Collectors.toList()); String content = - String.join(",", l).trim().replaceAll("\\(", "('").replaceAll(",", ",'"); + String.join(",", l).trim().replaceAll("Tuple\\(", "Tuple('").replaceAll(",", ",'"); System.out.println(content); // columnType = ColumnType.parse(content); columnType = ColumnType.parse(content); diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java index 4cd0c295ca89..c8c49a656e3b 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java @@ -152,10 +152,13 @@ static void writeValue(ClickHouseOutputStream stream, ColumnType columnType, Obj RowWithStorage rowValues = (RowWithStorage) value; List tupleValues = rowValues.getValues(); Collection columnTypesList = columnType.tupleTypes().values(); -// BinaryStreamUtils.writeVarInt(stream, tupleValues.size()); int index = 0; for (ColumnType ct : columnTypesList) { - writeValue(stream, ct, tupleValues.get(index)); + if (ct.nullable()) { + writeNullableValue(stream, ct, tupleValues.get(index)); + } else { + writeValue(stream, ct, tupleValues.get(index)); + } index++; } break; diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java index 26fb957c0f0e..b31a19236cb0 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java @@ -140,33 +140,6 @@ public void testArrayOfArrayOfInt64() throws Exception { @Test public void testTupleType() throws Exception { - /* Tuple('s':;String) - Tuple( - browser Tuple(name Nullable(String), - size Tuple(width Nullable(Int64), height Nullable(Int64)), - version Nullable(String) - ), - deviceCategory Nullable(String), - mobileDeviceInfo Nullable(String), - mobileDeviceMarketingName Nullable(String), - mobileDeviceModel Nullable(String), - mobileInputSelector Nullable(String), - operatingSystem Nullable(String), - operatingSystemVersion Nullable(String), - mobileDeviceBranding Nullable(String), - flashVersion Nullable(String), - javaEnabled Nullable(Bool), - language Nullable(String), - screen Tuple(colors Nullable(String), - resolution Tuple(width Nullable(Int64), - height Nullable(Int64) - )), - isBot Nullable(Bool), - isApp Nullable(Bool), - isInAppWebview Nullable(Bool) - ) - */ - Schema tupleSchema = Schema.of( Schema.Field.of("f0", FieldType.STRING), Schema.Field.of("f1", FieldType.BOOLEAN)); @@ -194,6 +167,55 @@ isInAppWebview Nullable(Bool) } } + @Test + public void testComplexTupleType() throws Exception { + Schema sizeSchema = + Schema.of( + Schema.Field.of("width", FieldType.INT64.withNullable(true)), + Schema.Field.of("height", FieldType.INT64.withNullable(true))); + + Schema browserSchema = + Schema.of( + Schema.Field.of("name", FieldType.STRING.withNullable(true)), + Schema.Field.of("size", FieldType.row(sizeSchema)), + Schema.Field.of("version", FieldType.STRING.withNullable(true))); + + Schema propSchema = + Schema.of( + Schema.Field.of("browser", FieldType.row(browserSchema)), + Schema.Field.of("deviceCategory", FieldType.STRING.withNullable(true))); + + Schema schema = Schema.of(Schema.Field.of("prop", FieldType.row(propSchema))); + + Row sizeRow = Row.withSchema(sizeSchema).addValue(10L).addValue(20L).build(); + Row browserRow = + Row.withSchema(browserSchema).addValue("test").addValue(sizeRow).addValue("1.0.0").build(); + Row propRow = Row.withSchema(propSchema).addValue(browserRow).addValue("mobile").build(); + Row row1 = Row.withSchema(schema).addValue(propRow).build(); + + executeSql( + "CREATE TABLE test_named_complex_tuples (" + + "`prop` Tuple(`browser` Tuple(`name` Nullable(String),`size` Tuple(`width` Nullable(Int64), `height` Nullable(Int64)),`version` Nullable(String)),`deviceCategory` Nullable(String))" + + ") ENGINE=Log"); + + pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_named_complex_tuples")); + + pipeline.run().waitUntilFinish(); + + try (ResultSet rs = executeQuery("SELECT * FROM test_named_complex_tuples")) { + rs.next(); + assertEquals("(('test',(10,20),'1.0.0'),'mobile')", rs.getString("prop")); + } + + try (ResultSet rs = + executeQuery( + "SELECT prop.browser.name as name, prop.browser.size as size FROM test_named_complex_tuples")) { + rs.next(); + assertEquals("test", rs.getString("name")); + assertEquals("(10,20)", rs.getString("size")); + } + } + @Test public void testPrimitiveTypes() throws Exception { Schema schema = From a1d51820c6648d4ee97d0e0593a26e5b3fec39e0 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Tue, 12 Dec 2023 09:32:25 +0200 Subject: [PATCH 04/11] Disable debug of javacc --- sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj index fd22b99fcb35..cdf2b4f02fa5 100644 --- a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj +++ b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj @@ -17,9 +17,9 @@ */ options { IGNORE_CASE=true; - DEBUG_PARSER = true; - DEBUG_LOOKAHEAD = true; - DEBUG_TOKEN_MANAGER = true; + DEBUG_PARSER = false; + DEBUG_LOOKAHEAD = false; + DEBUG_TOKEN_MANAGER = false; STATIC = false; } From a3cafa6ee2fa4c3af27dd88f75938e729ca16604 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Tue, 12 Dec 2023 10:01:45 +0200 Subject: [PATCH 05/11] Move tuple preprocessing logic --- .../beam/sdk/io/clickhouse/ClickHouseIO.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index aa95b0b0c3a6..61177a34c24e 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -476,6 +476,15 @@ abstract static class Builder { } } + private static String tuplePreprocessing(String payload) { + List l = + Arrays.stream(payload.trim().split(",")) + .map(s -> s.trim().replaceAll(" +", "':;")) + .collect(Collectors.toList()); + String content = + String.join(",", l).trim().replaceAll("Tuple\\(", "Tuple('").replaceAll(",", ",'"); + return content; + } /** * Returns {@link TableSchema} for a given table. * @@ -501,17 +510,8 @@ public static TableSchema getTableSchema(String jdbcUrl, String table) { ColumnType columnType = null; if (type.toLowerCase().trim().startsWith("tuple(")) { - System.out.println(type); - List l = - Arrays.stream(type.trim().split(",")) - .map(s -> s.trim().replaceAll(" +", "':;")) - .collect(Collectors.toList()); - String content = - String.join(",", l).trim().replaceAll("Tuple\\(", "Tuple('").replaceAll(",", ",'"); - System.out.println(content); - // columnType = ColumnType.parse(content); + String content = tuplePreprocessing(type); columnType = ColumnType.parse(content); - } else { columnType = ColumnType.parse(type); } From 120bb2b6c5ed2d0f4be844e87b32d40613a16ddf Mon Sep 17 00:00:00 2001 From: mzitnik Date: Tue, 12 Dec 2023 10:47:37 +0200 Subject: [PATCH 06/11] Adding to CHANGES.md & auto generated docs. --- CHANGES.md | 1 + .../java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index f9ee5d289117..fef85f168f7a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,6 +66,7 @@ * TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)). * Python GCSIO is now implemented with GCP GCS Client instead of apitools ([#25676](https://github.com/apache/beam/issues/25676)) * Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)). +* Adding support for Tuples DataType in ClickHouse (Java) ([Tuple Support](https://github.com/apache/beam/pull/29715)). ## New Features / Improvements diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index 61177a34c24e..061b1d87d423 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -113,7 +113,8 @@ * * * Nullable row columns are supported through Nullable type in ClickHouse. Low cardinality hint is - * supported through LowCardinality DataType in ClickHouse. + * supported through LowCardinality DataType in ClickHouse supported through Tuple DataType in + * ClickHouse see: Example in ClickHouseIOTest.testComplexTupleType. * *

Nested rows should be unnested using {@link Select#flattenedSchema()}. Type casting should be * done using {@link org.apache.beam.sdk.schemas.transforms.Cast} before {@link ClickHouseIO}. From a477cbc2c217dd0604ad19627fb649f33e28c447 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Wed, 13 Dec 2023 20:08:18 +0200 Subject: [PATCH 07/11] Fix CHANGES.md & fix docs --- CHANGES.md | 3 +-- .../java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 16cdbb19a960..082f491e2e79 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,11 +65,10 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)). * Python GCSIO is now implemented with GCP GCS Client instead of apitools ([#25676](https://github.com/apache/beam/issues/25676)) -* Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)). * Added support for handling bad records to KafkaIO (Java) ([#29546](https://github.com/apache/beam/pull/29546)) * Add support for generating text embeddings in MLTransform for Vertex AI and Hugging Face Hub models.([#29564](https://github.com/apache/beam/pull/29564)) -* Adding support for Tuples DataType in ClickHouse (Java) ([Tuple Support](https://github.com/apache/beam/pull/29715)). * NATS IO connector added (Go) ([#29000](https://github.com/apache/beam/issues/29000)). +* Adding support for LowCardinality and Tuples DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533), [#29715](https://github.com/apache/beam/pull/29715)). ## New Features / Improvements diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index 061b1d87d423..0c73addd7067 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -113,8 +113,8 @@ * * * Nullable row columns are supported through Nullable type in ClickHouse. Low cardinality hint is - * supported through LowCardinality DataType in ClickHouse supported through Tuple DataType in - * ClickHouse see: Example in ClickHouseIOTest.testComplexTupleType. + * supported through LowCardinality DataType in ClickHouse + * supported through Tuple DataType in ClickHouse see: Example in ClickHouseIOTest.testComplexTupleType. * *

Nested rows should be unnested using {@link Select#flattenedSchema()}. Type casting should be * done using {@link org.apache.beam.sdk.schemas.transforms.Cast} before {@link ClickHouseIO}. From ccfdc3759aaa1dc0f8dd915bf07585e71f70ffa5 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Wed, 13 Dec 2023 20:17:50 +0200 Subject: [PATCH 08/11] Fix spotless syntax --- .../java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index 0c73addd7067..061b1d87d423 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -113,8 +113,8 @@ * * * Nullable row columns are supported through Nullable type in ClickHouse. Low cardinality hint is - * supported through LowCardinality DataType in ClickHouse - * supported through Tuple DataType in ClickHouse see: Example in ClickHouseIOTest.testComplexTupleType. + * supported through LowCardinality DataType in ClickHouse supported through Tuple DataType in + * ClickHouse see: Example in ClickHouseIOTest.testComplexTupleType. * *

Nested rows should be unnested using {@link Select#flattenedSchema()}. Type casting should be * done using {@link org.apache.beam.sdk.schemas.transforms.Cast} before {@link ClickHouseIO}. From 67434ef0a3d18371f68b2ef5b768545dadc05444 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Wed, 13 Dec 2023 21:16:56 +0200 Subject: [PATCH 09/11] Remove :; from parsing. Only adding ' to field name. --- .../org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java | 2 +- .../io/clickhouse/src/main/javacc/ColumnTypeParser.jj | 4 +--- .../apache/beam/sdk/io/clickhouse/TableSchemaTest.java | 9 ++++----- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index 061b1d87d423..531681bfbfac 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -480,7 +480,7 @@ abstract static class Builder { private static String tuplePreprocessing(String payload) { List l = Arrays.stream(payload.trim().split(",")) - .map(s -> s.trim().replaceAll(" +", "':;")) + .map(s -> s.trim().replaceAll(" +", "' ")) .collect(Collectors.toList()); String content = String.join(",", l).trim().replaceAll("Tuple\\(", "Tuple('").replaceAll(",", ",'"); diff --git a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj index cdf2b4f02fa5..5bb9ba4171a6 100644 --- a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj +++ b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj @@ -104,8 +104,6 @@ TOKEN : | < BOOL : "BOOL" > | < LOWCARDINALITY : "LOWCARDINALITY" > | < TUPLE : "TUPLE" > - | < COLON : ":" > - | < SEMI_COLON : ";" > } public ColumnType columnType() : @@ -278,7 +276,7 @@ private Map.Entry tupleElement() : Token token; } { - ( (key = string() ) ( value = columnType() ) ) { + ( (key = string() ) ( value = columnType() ) ) { return Maps.immutableEntry(key, value); } } diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java index 51ebf1e8f2b0..f560d6268afb 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java @@ -202,7 +202,7 @@ public void testEquivalentSchema() { public void testParseTupleSingle() { Map m1 = new HashMap<>(); m1.put("s", ColumnType.STRING); - ColumnType columnType01 = ColumnType.parse("Tuple('s':;String)"); + ColumnType columnType01 = ColumnType.parse("Tuple('s' String)"); assertEquals(ColumnType.tuple(m1), columnType01); } @@ -211,7 +211,7 @@ public void testParseTupleDouble() { Map m2 = new HashMap<>(); m2.put("a1", ColumnType.STRING); m2.put("b", ColumnType.BOOL); - ColumnType columnType02 = ColumnType.parse("Tuple('a1':;String,'b':;Bool)"); + ColumnType columnType02 = ColumnType.parse("Tuple('a1' String,'b' Bool)"); assertEquals(ColumnType.tuple(m2), columnType02); } @@ -223,8 +223,7 @@ public void testTupleNested() { m3.put("a", ColumnType.STRING); m3.put("b", ColumnType.BOOL); m3.put("c", ColumnType.tuple(m1)); - ColumnType columnType03 = - ColumnType.parse("Tuple('a':;String,'b':;Bool,'c':;Tuple('a':;String))"); + ColumnType columnType03 = ColumnType.parse("Tuple('a' String,'b' Bool,'c' Tuple('a' String))"); assertEquals(ColumnType.tuple(m3), columnType03); } @@ -245,7 +244,7 @@ public void testTupleComplex() { ColumnType columnType03 = ColumnType.parse( - "Tuple('browser':;Tuple('name':;Nullable(String),'size':;Tuple('width':;Nullable(Int64),'height':;Nullable(Int64)),'version':;Nullable(String)),'deviceCategory':; Nullable(String))"); + "Tuple('browser' Tuple('name' Nullable(String),'size' Tuple('width' Nullable(Int64),'height' Nullable(Int64)),'version' Nullable(String)),'deviceCategory' Nullable(String))"); assertEquals(ColumnType.tuple(m3), columnType03); } } From 748dca6c9e36b7d90ab3c269b3d6d9576f5c4f7c Mon Sep 17 00:00:00 2001 From: mzitnik Date: Tue, 2 Jan 2024 09:04:35 +0200 Subject: [PATCH 10/11] Fix CHANGES.md to the correct version --- CHANGES.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 26e1bacd794b..3f48ebbf8795 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,8 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Adding support for Tuples DataType in ClickHouse (Java) ([#29715](https://github.com/apache/beam/pull/29715)). + ## New Features / Improvements @@ -101,7 +103,7 @@ * Added support for handling bad records to KafkaIO (Java) ([#29546](https://github.com/apache/beam/pull/29546)) * Add support for generating text embeddings in MLTransform for Vertex AI and Hugging Face Hub models.([#29564](https://github.com/apache/beam/pull/29564)) * NATS IO connector added (Go) ([#29000](https://github.com/apache/beam/issues/29000)). -* Adding support for LowCardinality and Tuples DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533), [#29715](https://github.com/apache/beam/pull/29715)). +* Adding support for LowCardinality (Java) ([#29533](https://github.com/apache/beam/pull/29533)). ## New Features / Improvements From 57cb6a5b22e0378bf8e66faa712632499322be9e Mon Sep 17 00:00:00 2001 From: mzitnik Date: Tue, 2 Jan 2024 09:15:32 +0200 Subject: [PATCH 11/11] Change new types javadoc --- .../java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index 531681bfbfac..7ef643488a23 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -110,11 +110,11 @@ * {@link TableSchema.TypeName#ENUM8} {@link Schema.TypeName#STRING} * {@link TableSchema.TypeName#ENUM16} {@link Schema.TypeName#STRING} * {@link TableSchema.TypeName#BOOL} {@link Schema.TypeName#BOOLEAN} + * {@link TableSchema.TypeName#TUPLE} {@link Schema.TypeName#ROW} * * * Nullable row columns are supported through Nullable type in ClickHouse. Low cardinality hint is - * supported through LowCardinality DataType in ClickHouse supported through Tuple DataType in - * ClickHouse see: Example in ClickHouseIOTest.testComplexTupleType. + * supported through LowCardinality DataType in ClickHouse. * *

Nested rows should be unnested using {@link Select#flattenedSchema()}. Type casting should be * done using {@link org.apache.beam.sdk.schemas.transforms.Cast} before {@link ClickHouseIO}.