Skip to content

Commit

Permalink
remix nested columns
Browse files Browse the repository at this point in the history
changes:
* introduce ColumnFormat to separate physical storage format from logical type. ColumnFormat is now used instead of ColumnCapabilities to get column handlers for segment creation
* introduce new 'standard type' indexer, merger, and family of serializers, which is the next logical iteration of the nested column stuff. Essentially this is an automatic type column indexer that produces the most appropriate column for the given inputs, making either STRING, ARRAY<STRING>, LONG, ARRAY<LONG>, DOUBLE, ARRAY<DOUBLE>, or COMPLEX<json>.
* revert NestedDataColumnIndexer, NestedDataColumnMerger, NestedDataColumnSerializer to their version pre #13803 behavior (v4) for backwards compatibility
* fix a bug in RoaringBitmapSerdeFactory if anything actually ever wrote out an empty bitmap using toBytes and then later tried to read it (the nerve!)
  • Loading branch information
clintropolis committed Apr 2, 2023
1 parent 518698a commit 6561769
Show file tree
Hide file tree
Showing 123 changed files with 8,890 additions and 1,383 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.StandardTypeColumnSchema;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.TransformingInputEntityReader;
Expand Down Expand Up @@ -269,14 +269,14 @@ public void testParseNestedData() throws Exception
timestampSpec,
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("event"),
new StringDimensionSchema("id"),
new StringDimensionSchema("someOtherId"),
new StringDimensionSchema("isValid"),
new StringDimensionSchema("eventType"),
new NestedDataDimensionSchema("foo"),
new NestedDataDimensionSchema("bar"),
new StringDimensionSchema("someBytesColumn")
new StandardTypeColumnSchema("event"),
new StandardTypeColumnSchema("id"),
new StandardTypeColumnSchema("someOtherId"),
new StandardTypeColumnSchema("isValid"),
new StandardTypeColumnSchema("eventType"),
new StandardTypeColumnSchema("foo"),
new StandardTypeColumnSchema("bar"),
new StandardTypeColumnSchema("someBytesColumn")
)
),
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.druid.segment.BaseProgressIndicator;
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnFormat;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -292,7 +292,7 @@ private static IncrementalIndex makeIncrementalIndex(
AggregatorFactory[] aggs,
HadoopDruidIndexerConfig config,
Iterable<String> oldDimOrder,
Map<String, ColumnCapabilities> oldCapabilities
Map<String, ColumnFormat> oldCapabilities
)
{
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
Expand Down Expand Up @@ -456,7 +456,7 @@ protected void reduce(final BytesWritable key, Iterable<BytesWritable> values, f
dimOrder.addAll(index.getDimensionOrder());
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnHandlerCapabilities());
index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnFormats());
}

index.add(value);
Expand Down Expand Up @@ -752,7 +752,7 @@ public void doRun()
combiningAggs,
config,
allDimensionNames,
persistIndex.getColumnHandlerCapabilities()
persistIndex.getColumnFormats()
);
startTime = System.currentTimeMillis();
++indexCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ private void processDimensionsSpec(final QueryableIndex index)
);

if (!uniqueDims.containsKey(dimension)) {
final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
Preconditions.checkNotNull(
dimensionHandlerMap.get(dimension),
"Cannot find dimensionHandler for dimension[%s]",
dimension
Expand All @@ -1118,11 +1118,7 @@ private void processDimensionsSpec(final QueryableIndex index)
uniqueDims.put(dimension, uniqueDims.size());
dimensionSchemaMap.put(
dimension,
createDimensionSchema(
dimension,
columnHolder.getHandlerCapabilities(),
dimensionHandler.getMultivalueHandling()
)
columnHolder.getColumnFormat().getColumnSchema(dimension)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,12 @@ public SamplerResponse sample(
if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionDesc.getName())) {
final ColumnType columnType = dimensionDesc.getCapabilities().toColumnType();
signatureBuilder.add(dimensionDesc.getName(), columnType);
// for now, use legacy types instead of standard type
logicalDimensionSchemas.add(
DimensionSchema.getDefaultSchemaForBuiltInType(dimensionDesc.getName(), dimensionDesc.getCapabilities())
);
physicalDimensionSchemas.add(
dimensionDesc.getHandler().getDimensionSchema(dimensionDesc.getCapabilities())
dimensionDesc.getIndexer().getFormat().getColumnSchema(dimensionDesc.getName())
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.StandardTypeColumnSchema;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;

import java.util.Objects;
Expand All @@ -47,7 +51,9 @@
@JsonSubTypes.Type(name = DimensionSchema.FLOAT_TYPE_NAME, value = FloatDimensionSchema.class),
@JsonSubTypes.Type(name = DimensionSchema.DOUBLE_TYPE_NAME, value = DoubleDimensionSchema.class),
@JsonSubTypes.Type(name = DimensionSchema.SPATIAL_TYPE_NAME, value = NewSpatialDimensionSchema.class),
@JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = NestedDataDimensionSchema.class)
@JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = NestedDataDimensionSchema.class),
@JsonSubTypes.Type(name = StandardTypeColumnSchema.TYPE, value = StandardTypeColumnSchema.class),
@JsonSubTypes.Type(name = "auto", value = StandardTypeColumnSchema.class)
})
public abstract class DimensionSchema
{
Expand Down Expand Up @@ -150,6 +156,17 @@ public boolean hasBitmapIndex()
@JsonIgnore
public abstract ColumnType getColumnType();

@JsonIgnore
public DimensionHandler getDimensionHandler()
{
// default implementation for backwards compatibility
return DimensionHandlerUtils.getHandlerFromCapabilities(
name,
IncrementalIndex.makeDefaultCapabilitiesFromValueType(getColumnType()),
multiValueHandling
);
}

@Override
public boolean equals(final Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.DoubleDimensionHandler;
import org.apache.druid.segment.column.ColumnType;

public class DoubleDimensionSchema extends DimensionSchema
Expand All @@ -42,4 +44,10 @@ public ColumnType getColumnType()
{
return ColumnType.DOUBLE;
}

@Override
public DimensionHandler getDimensionHandler()
{
return new DoubleDimensionHandler(getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.FloatDimensionHandler;
import org.apache.druid.segment.column.ColumnType;

public class FloatDimensionSchema extends DimensionSchema
Expand All @@ -46,4 +48,10 @@ public ColumnType getColumnType()
{
return ColumnType.FLOAT;
}

@Override
public DimensionHandler getDimensionHandler()
{
return new FloatDimensionHandler(getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.LongDimensionHandler;
import org.apache.druid.segment.column.ColumnType;


Expand All @@ -47,4 +49,10 @@ public ColumnType getColumnType()
{
return ColumnType.LONG;
}

@Override
public DimensionHandler getDimensionHandler()
{
return new LongDimensionHandler(getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.StringDimensionHandler;
import org.apache.druid.segment.column.ColumnType;

import java.util.List;
Expand Down Expand Up @@ -67,6 +69,12 @@ public ColumnType getColumnType()
return ColumnType.STRING;
}

@Override
public DimensionHandler getDimensionHandler()
{
return new StringDimensionHandler(getName(), getMultiValueHandling(), hasBitmapIndex(), true);
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.StringDimensionHandler;
import org.apache.druid.segment.column.ColumnType;

public class StringDimensionSchema extends DimensionSchema
Expand Down Expand Up @@ -61,4 +63,10 @@ public ColumnType getColumnType()
{
return ColumnType.STRING;
}

@Override
public DimensionHandler getDimensionHandler()
{
return new StringDimensionHandler(getName(), getMultiValueHandling(), hasBitmapIndex(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.druid.math.expr.ExprType;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.math.expr.NamedFunction;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.nested.NestedPathFinder;
import org.apache.druid.segment.nested.NestedPathPart;
import org.apache.druid.segment.nested.StructuredData;
Expand All @@ -47,7 +47,7 @@
public class NestedDataExpressions
{
public static final ExpressionType TYPE = Preconditions.checkNotNull(
ExpressionType.fromColumnType(NestedDataComplexTypeSerde.TYPE)
ExpressionType.fromColumnType(ColumnType.NESTED_DATA)
);

public static class JsonObjectExprMacro implements ExprMacroTable.ExprMacro
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.CapabilitiesBasedFormat;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnFormat;
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexRowHolder;
import org.apache.druid.segment.nested.FieldTypeInfo;
import org.apache.druid.segment.nested.NestedPathFinder;
import org.apache.druid.segment.nested.SortedValueDictionary;

import javax.annotation.Nullable;
import java.util.SortedMap;

/**
* Processing related interface
Expand Down Expand Up @@ -175,6 +182,18 @@ EncodedKeyComponent<EncodedKeyComponentType> processRowValsToUnsortedEncodedKeyC
CloseableIndexed<ActualType> getSortedIndexedValues();


default SortedValueDictionary getSortedValueLookups()
{
throw new UnsupportedOperationException("Column does not support value dictionaries.");
}

default void mergeNestedFields(SortedMap<String, FieldTypeInfo.MutableTypeSet> mergedFields)
{
mergedFields.put(
NestedPathFinder.JSON_PATH_ROOT,
new FieldTypeInfo.MutableTypeSet().add(getColumnCapabilities().toColumnType())
);
}
/**
* Get the minimum dimension value seen by this indexer.
*
Expand Down Expand Up @@ -237,9 +256,14 @@ ColumnValueSelector<?> makeColumnValueSelector(

ColumnCapabilities getColumnCapabilities();

default ColumnCapabilities getHandlerCapabilities()
default ColumnFormat getFormat()
{
return getColumnCapabilities();
return new CapabilitiesBasedFormat(
ColumnCapabilitiesImpl.snapshot(
getColumnCapabilities(),
CapabilitiesBasedFormat.DIMENSION_CAPABILITY_MERGE_LOGIC
)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ private void registerColumnHolder(
*/
@VisibleForTesting
ColumnHolder deserializeColumn(
String columnName, // columnName is not used in this method, but used in tests.
String columnName,
ObjectMapper mapper,
ByteBuffer byteBuffer,
SmooshedFileMapper smooshedFiles
Expand All @@ -797,7 +797,7 @@ ColumnHolder deserializeColumn(
ColumnDescriptor serde = mapper.readValue(
SERIALIZER_UTILS.readString(byteBuffer), ColumnDescriptor.class
);
return serde.read(byteBuffer, columnConfig, smooshedFiles);
return serde.read(columnName, byteBuffer, columnConfig, smooshedFiles);
}
}

Expand Down
Loading

0 comments on commit 6561769

Please sign in to comment.