diff --git a/CHANGES.md b/CHANGES.md
index cbfb005d5ab1..3f48ebbf8795 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,6 +63,8 @@
## I/Os
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+* Adding support for Tuples DataType in ClickHouse (Java) ([#29715](https://github.com/apache/beam/pull/29715)).
+
## New Features / Improvements
@@ -98,10 +100,10 @@
* TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)).
* Python GCSIO is now implemented with GCP GCS Client instead of apitools ([#25676](https://github.com/apache/beam/issues/25676))
-* Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)).
* Added support for handling bad records to KafkaIO (Java) ([#29546](https://github.com/apache/beam/pull/29546))
* Add support for generating text embeddings in MLTransform for Vertex AI and Hugging Face Hub models.([#29564](https://github.com/apache/beam/pull/29564))
* NATS IO connector added (Go) ([#29000](https://github.com/apache/beam/issues/29000)).
+* Adding support for LowCardinality (Java) ([#29533](https://github.com/apache/beam/pull/29533)).
## New Features / Improvements
diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
index 52d520a0a191..7ef643488a23 100644
--- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
+++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
@@ -27,6 +27,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -109,6 +110,7 @@
*
{@link TableSchema.TypeName#ENUM8} {@link Schema.TypeName#STRING}
* {@link TableSchema.TypeName#ENUM16} {@link Schema.TypeName#STRING}
* {@link TableSchema.TypeName#BOOL} {@link Schema.TypeName#BOOLEAN}
+ * {@link TableSchema.TypeName#TUPLE} {@link Schema.TypeName#ROW}
*
*
* Nullable row columns are supported through Nullable type in ClickHouse. Low cardinality hint is
@@ -475,6 +477,15 @@ abstract static class Builder {
}
}
+ private static String tuplePreprocessing(String payload) {
+ List l =
+ Arrays.stream(payload.trim().split(","))
+ .map(s -> s.trim().replaceAll(" +", "' "))
+ .collect(Collectors.toList());
+ String content =
+ String.join(",", l).trim().replaceAll("Tuple\\(", "Tuple('").replaceAll(",", ",'");
+ return content;
+ }
/**
* Returns {@link TableSchema} for a given table.
*
@@ -498,7 +509,13 @@ public static TableSchema getTableSchema(String jdbcUrl, String table) {
String defaultTypeStr = rs.getString("default_type");
String defaultExpression = rs.getString("default_expression");
- ColumnType columnType = ColumnType.parse(type);
+ ColumnType columnType = null;
+ if (type.toLowerCase().trim().startsWith("tuple(")) {
+ String content = tuplePreprocessing(type);
+ columnType = ColumnType.parse(content);
+ } else {
+ columnType = ColumnType.parse(type);
+ }
DefaultType defaultType = DefaultType.parse(defaultTypeStr).orElse(null);
Object defaultValue;
diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
index 8ed62eee3b59..c8c49a656e3b 100644
--- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
+++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
@@ -21,9 +21,11 @@
import com.clickhouse.client.ClickHousePipedOutputStream;
import com.clickhouse.client.data.BinaryStreamUtils;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowWithStorage;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Days;
@@ -146,6 +148,20 @@ static void writeValue(ClickHouseOutputStream stream, ColumnType columnType, Obj
case BOOL:
BinaryStreamUtils.writeBoolean(stream, (Boolean) value);
break;
+ case TUPLE:
+ RowWithStorage rowValues = (RowWithStorage) value;
+ List tupleValues = rowValues.getValues();
+ Collection columnTypesList = columnType.tupleTypes().values();
+ int index = 0;
+ for (ColumnType ct : columnTypesList) {
+ if (ct.nullable()) {
+ writeNullableValue(stream, ct, tupleValues.get(index));
+ } else {
+ writeValue(stream, ct, tupleValues.get(index));
+ }
+ index++;
+ }
+ break;
}
}
diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
index 06ba2399a3cd..b89a88b3fae8 100644
--- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
+++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -111,6 +112,14 @@ public static Schema.FieldType getEquivalentFieldType(ColumnType columnType) {
return Schema.FieldType.STRING;
case BOOL:
return Schema.FieldType.BOOLEAN;
+ case TUPLE:
+ List fields =
+ columnType.tupleTypes().entrySet().stream()
+ .map(x -> Schema.Field.of(x.getKey(), Schema.FieldType.DATETIME))
+ .collect(Collectors.toList());
+ Schema.Field[] array = fields.toArray(new Schema.Field[fields.size()]);
+ Schema schema = Schema.of(array);
+ return Schema.FieldType.row(schema);
}
// not possible, errorprone checks for exhaustive switch
@@ -168,7 +177,9 @@ public enum TypeName {
// Composite type
ARRAY,
// Primitive type
- BOOL
+ BOOL,
+ // Composite type
+ TUPLE
}
/**
@@ -208,6 +219,7 @@ public abstract static class ColumnType implements Serializable {
public static final ColumnType UINT32 = ColumnType.of(TypeName.UINT32);
public static final ColumnType UINT64 = ColumnType.of(TypeName.UINT64);
public static final ColumnType BOOL = ColumnType.of(TypeName.BOOL);
+ public static final ColumnType TUPLE = ColumnType.of(TypeName.TUPLE);
// ClickHouse doesn't allow nested nullables, so boolean flag is enough
public abstract boolean nullable();
@@ -220,6 +232,8 @@ public abstract static class ColumnType implements Serializable {
public abstract @Nullable ColumnType arrayElementType();
+ public abstract @Nullable Map tupleTypes();
+
public ColumnType withNullable(boolean nullable) {
return toBuilder().nullable(nullable).build();
}
@@ -265,6 +279,14 @@ public static ColumnType array(ColumnType arrayElementType) {
.build();
}
+ public static ColumnType tuple(Map elements) {
+ return ColumnType.builder()
+ .typeName(TypeName.TUPLE)
+ .nullable(false)
+ .tupleTypes(elements)
+ .build();
+ }
+
/**
* Parse string with ClickHouse type to {@link ColumnType}.
*
@@ -339,6 +361,8 @@ abstract static class Builder {
public abstract Builder fixedStringSize(Integer size);
+ public abstract Builder tupleTypes(Map tupleElements);
+
public abstract ColumnType build();
}
}
diff --git a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
index abe29aff3f8d..5bb9ba4171a6 100644
--- a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
+++ b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
@@ -17,6 +17,10 @@
*/
options {
IGNORE_CASE=true;
+ DEBUG_PARSER = false;
+ DEBUG_LOOKAHEAD = false;
+ DEBUG_TOKEN_MANAGER = false;
+ STATIC = false;
}
PARSER_BEGIN(ColumnTypeParser)
@@ -99,6 +103,7 @@ TOKEN :
| < EQ : "=" >
| < BOOL : "BOOL" >
| < LOWCARDINALITY : "LOWCARDINALITY" >
+ | < TUPLE : "TUPLE" >
}
public ColumnType columnType() :
@@ -113,6 +118,7 @@ public ColumnType columnType() :
| ct = array()
| ct = nullable()
| ct = lowcardenality()
+ | ct = tuple()
)
{
return ct;
@@ -263,6 +269,33 @@ private Map enumElements() :
}
}
+private Map.Entry tupleElement() :
+{
+ String key;
+ ColumnType value;
+ Token token;
+}
+{
+ ( (key = string() ) ( value = columnType() ) ) {
+ return Maps.immutableEntry(key, value);
+ }
+}
+
+private Map tupleElements() :
+{
+ Map.Entry el;
+ List> entries = Lists.newArrayList();
+}
+{
+ (
+ ( el = tupleElement() { entries.add(el); } )
+ ( ( el = tupleElement() { entries.add(el); } ) )*
+ )
+ {
+ return ImmutableMap.copyOf(entries);
+ }
+}
+
private ColumnType enum_() :
{
Map elements;
@@ -289,4 +322,18 @@ private ColumnType lowcardenality() :
(
( (ct = primitive()) ) { return ct; }
)
+}
+
+private ColumnType tuple() :
+{
+ Map elements;
+}
+{
+ (
+ ( ( elements = tupleElements() ) )
+ {
+ return ColumnType.tuple(elements);
+ }
+ )
+
}
\ No newline at end of file
diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
index 33fe9467d45b..b31a19236cb0 100644
--- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
+++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
@@ -138,6 +138,84 @@ public void testArrayOfArrayOfInt64() throws Exception {
assertEquals(15L, sum0);
}
+ @Test
+ public void testTupleType() throws Exception {
+ Schema tupleSchema =
+ Schema.of(
+ Schema.Field.of("f0", FieldType.STRING), Schema.Field.of("f1", FieldType.BOOLEAN));
+ Schema schema = Schema.of(Schema.Field.of("t0", FieldType.row(tupleSchema)));
+ Row row1Tuple = Row.withSchema(tupleSchema).addValue("tuple").addValue(true).build();
+
+ Row row1 = Row.withSchema(schema).addValue(row1Tuple).build();
+
+ executeSql(
+ "CREATE TABLE test_named_tuples (" + "t0 Tuple(`f0` String, `f1` Bool)" + ") ENGINE=Log");
+
+ pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_named_tuples"));
+
+ pipeline.run().waitUntilFinish();
+
+ try (ResultSet rs = executeQuery("SELECT * FROM test_named_tuples")) {
+ rs.next();
+ assertEquals("('tuple',true)", rs.getString("t0"));
+ }
+
+ try (ResultSet rs = executeQuery("SELECT t0.f0 as f0, t0.f1 as f1 FROM test_named_tuples")) {
+ rs.next();
+ assertEquals("tuple", rs.getString("f0"));
+ assertEquals("true", rs.getString("f1"));
+ }
+ }
+
+ @Test
+ public void testComplexTupleType() throws Exception {
+ Schema sizeSchema =
+ Schema.of(
+ Schema.Field.of("width", FieldType.INT64.withNullable(true)),
+ Schema.Field.of("height", FieldType.INT64.withNullable(true)));
+
+ Schema browserSchema =
+ Schema.of(
+ Schema.Field.of("name", FieldType.STRING.withNullable(true)),
+ Schema.Field.of("size", FieldType.row(sizeSchema)),
+ Schema.Field.of("version", FieldType.STRING.withNullable(true)));
+
+ Schema propSchema =
+ Schema.of(
+ Schema.Field.of("browser", FieldType.row(browserSchema)),
+ Schema.Field.of("deviceCategory", FieldType.STRING.withNullable(true)));
+
+ Schema schema = Schema.of(Schema.Field.of("prop", FieldType.row(propSchema)));
+
+ Row sizeRow = Row.withSchema(sizeSchema).addValue(10L).addValue(20L).build();
+ Row browserRow =
+ Row.withSchema(browserSchema).addValue("test").addValue(sizeRow).addValue("1.0.0").build();
+ Row propRow = Row.withSchema(propSchema).addValue(browserRow).addValue("mobile").build();
+ Row row1 = Row.withSchema(schema).addValue(propRow).build();
+
+ executeSql(
+ "CREATE TABLE test_named_complex_tuples ("
+ + "`prop` Tuple(`browser` Tuple(`name` Nullable(String),`size` Tuple(`width` Nullable(Int64), `height` Nullable(Int64)),`version` Nullable(String)),`deviceCategory` Nullable(String))"
+ + ") ENGINE=Log");
+
+ pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_named_complex_tuples"));
+
+ pipeline.run().waitUntilFinish();
+
+ try (ResultSet rs = executeQuery("SELECT * FROM test_named_complex_tuples")) {
+ rs.next();
+ assertEquals("(('test',(10,20),'1.0.0'),'mobile')", rs.getString("prop"));
+ }
+
+ try (ResultSet rs =
+ executeQuery(
+ "SELECT prop.browser.name as name, prop.browser.size as size FROM test_named_complex_tuples")) {
+ rs.next();
+ assertEquals("test", rs.getString("name"));
+ assertEquals("(10,20)", rs.getString("size"));
+ }
+ }
+
@Test
public void testPrimitiveTypes() throws Exception {
Schema schema =
diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
index 174761403471..f560d6268afb 100644
--- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
+++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
@@ -19,6 +19,7 @@
import static org.junit.Assert.assertEquals;
+import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
import org.apache.beam.sdk.schemas.Schema;
@@ -196,4 +197,54 @@ public void testEquivalentSchema() {
assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
}
+
+ @Test
+ public void testParseTupleSingle() {
+ Map m1 = new HashMap<>();
+ m1.put("s", ColumnType.STRING);
+ ColumnType columnType01 = ColumnType.parse("Tuple('s' String)");
+ assertEquals(ColumnType.tuple(m1), columnType01);
+ }
+
+ @Test
+ public void testParseTupleDouble() {
+ Map m2 = new HashMap<>();
+ m2.put("a1", ColumnType.STRING);
+ m2.put("b", ColumnType.BOOL);
+ ColumnType columnType02 = ColumnType.parse("Tuple('a1' String,'b' Bool)");
+ assertEquals(ColumnType.tuple(m2), columnType02);
+ }
+
+ @Test
+ public void testTupleNested() {
+ Map m1 = new HashMap<>();
+ m1.put("a", ColumnType.STRING);
+ Map m3 = new HashMap<>();
+ m3.put("a", ColumnType.STRING);
+ m3.put("b", ColumnType.BOOL);
+ m3.put("c", ColumnType.tuple(m1));
+ ColumnType columnType03 = ColumnType.parse("Tuple('a' String,'b' Bool,'c' Tuple('a' String))");
+ assertEquals(ColumnType.tuple(m3), columnType03);
+ }
+
+ @Test
+ public void testTupleComplex() {
+ Map m1 = new HashMap<>();
+ m1.put("width", ColumnType.INT64.withNullable(true));
+ m1.put("height", ColumnType.INT64.withNullable(true));
+
+ Map m2 = new HashMap<>();
+ m2.put("name", ColumnType.STRING.withNullable(true));
+ m2.put("size", ColumnType.tuple(m1));
+ m2.put("version", ColumnType.STRING.withNullable(true));
+
+ Map m3 = new HashMap<>();
+ m3.put("browser", ColumnType.tuple(m2));
+ m3.put("deviceCategory", ColumnType.STRING.withNullable(true));
+
+ ColumnType columnType03 =
+ ColumnType.parse(
+ "Tuple('browser' Tuple('name' Nullable(String),'size' Tuple('width' Nullable(Int64),'height' Nullable(Int64)),'version' Nullable(String)),'deviceCategory' Nullable(String))");
+ assertEquals(ColumnType.tuple(m3), columnType03);
+ }
}