diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java index c2e39baefa91..b644b2d39887 100644 --- a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java @@ -39,7 +39,7 @@ class BaseIncrementalChangelogScan implements IncrementalChangelogScan { BaseIncrementalChangelogScan(Table table) { - this(table, table.schema(), new TableScanContext()); + this(table, table.schema(), TableScanContext.empty()); } private BaseIncrementalChangelogScan(Table table, Schema schema, TableScanContext context) { diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java index 5fd211f9c1f4..49c05e82985e 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java @@ -25,7 +25,7 @@ abstract class BaseMetadataTableScan extends BaseTableScan { private final MetadataTableType tableType; protected BaseMetadataTableScan(Table table, Schema schema, MetadataTableType tableType) { - super(table, schema, new TableScanContext()); + super(table, schema, TableScanContext.empty()); this.tableType = tableType; } diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index bb2e534ae739..b9ed4f8d67ce 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -70,13 +70,14 @@ public void refresh() { @Override public TableScan newScan() { - return new DataTableScan(this, schema(), new TableScanContext().reportWith(reporter)); + return new DataTableScan( + this, schema(), ImmutableTableScanContext.builder().metricsReporter(reporter).build()); } @Override public IncrementalAppendScan newIncrementalAppendScan() { return new BaseIncrementalAppendScan( - this, schema(), new TableScanContext().reportWith(reporter)); + this, schema(), ImmutableTableScanContext.builder().metricsReporter(reporter).build()); } @Override diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index 1983e0ddfce3..39b43cc413df 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -131,7 +131,7 @@ public static class PositionDeletesBatchScan extends SnapshotScan> implements BatchScan { protected PositionDeletesBatchScan(Table table, Schema schema) { - super(table, schema, new TableScanContext()); + super(table, schema, TableScanContext.empty()); } protected PositionDeletesBatchScan(Table table, Schema schema, TableScanContext context) { diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java index e12c2a8fc42b..87a2f59f6ce8 100644 --- a/core/src/main/java/org/apache/iceberg/TableScanContext.java +++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java @@ -20,8 +20,8 @@ import java.util.Collection; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ExecutorService; +import javax.annotation.Nullable; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.metrics.LoggingMetricsReporter; @@ -30,349 +30,149 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.util.ThreadPools; +import org.immutables.value.Value; /** Context object with optional arguments for a TableScan. */ -final class TableScanContext { - private final Long snapshotId; - private final Expression rowFilter; - private final boolean ignoreResiduals; - private final boolean caseSensitive; - private final boolean colStats; - private final Schema projectedSchema; - private final Collection selectedColumns; - private final ImmutableMap options; - private final Long fromSnapshotId; - private final Long toSnapshotId; - private final ExecutorService planExecutor; - private final boolean fromSnapshotInclusive; - private final MetricsReporter metricsReporter; +@Value.Immutable +abstract class TableScanContext { - TableScanContext() { - this.snapshotId = null; - this.rowFilter = Expressions.alwaysTrue(); - this.ignoreResiduals = false; - this.caseSensitive = true; - this.colStats = false; - this.projectedSchema = null; - this.selectedColumns = null; - this.options = ImmutableMap.of(); - this.fromSnapshotId = null; - this.toSnapshotId = null; - this.planExecutor = null; - this.fromSnapshotInclusive = false; - this.metricsReporter = null; - } + @Nullable + public abstract Long snapshotId(); - private TableScanContext( - Long snapshotId, - Expression rowFilter, - boolean ignoreResiduals, - boolean caseSensitive, - boolean colStats, - Schema projectedSchema, - Collection selectedColumns, - ImmutableMap options, - Long fromSnapshotId, - Long toSnapshotId, - ExecutorService planExecutor, - boolean fromSnapshotInclusive, - MetricsReporter metricsReporter) { - this.snapshotId = snapshotId; - this.rowFilter = rowFilter; - this.ignoreResiduals = ignoreResiduals; - this.caseSensitive = caseSensitive; - this.colStats = colStats; - this.projectedSchema = projectedSchema; - this.selectedColumns = selectedColumns; - this.options = options; - this.fromSnapshotId = fromSnapshotId; - this.toSnapshotId = toSnapshotId; - this.planExecutor = planExecutor; - this.fromSnapshotInclusive = fromSnapshotInclusive; - this.metricsReporter = metricsReporter; + @Value.Default + public Expression rowFilter() { + return Expressions.alwaysTrue(); } - Long snapshotId() { - return snapshotId; + @Value.Default + public boolean ignoreResiduals() { + return false; } - TableScanContext useSnapshotId(Long scanSnapshotId) { - return new TableScanContext( - scanSnapshotId, - rowFilter, - ignoreResiduals, - caseSensitive, - colStats, - projectedSchema, - selectedColumns, - options, - fromSnapshotId, - toSnapshotId, - planExecutor, - fromSnapshotInclusive, - metricsReporter); + @Value.Default + public boolean caseSensitive() { + return true; } - Expression rowFilter() { - return rowFilter; + @Value.Default + public boolean returnColumnStats() { + return false; } - TableScanContext filterRows(Expression filter) { - return new TableScanContext( - snapshotId, - filter, - ignoreResiduals, - caseSensitive, - colStats, - projectedSchema, - selectedColumns, - options, - fromSnapshotId, - toSnapshotId, - planExecutor, - fromSnapshotInclusive, - metricsReporter); + @Nullable + public abstract Collection selectedColumns(); + + @Nullable + public abstract Schema projectedSchema(); + + @Value.Default + public Map options() { + return ImmutableMap.of(); } - boolean ignoreResiduals() { - return ignoreResiduals; + @Nullable + public abstract Long fromSnapshotId(); + + @Value.Default + public boolean fromSnapshotInclusive() { + return false; } - TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) { - return new TableScanContext( - snapshotId, - rowFilter, - shouldIgnoreResiduals, - caseSensitive, - colStats, - projectedSchema, - selectedColumns, - options, - fromSnapshotId, - toSnapshotId, - planExecutor, - fromSnapshotInclusive, - metricsReporter); + @Nullable + public abstract Long toSnapshotId(); + + @Value.Default + public ExecutorService planExecutor() { + return ThreadPools.getWorkerPool(); } - boolean caseSensitive() { - return caseSensitive; + @Value.Derived + boolean planWithCustomizedExecutor() { + return !planExecutor().equals(ThreadPools.getWorkerPool()); } - TableScanContext setCaseSensitive(boolean isCaseSensitive) { - return new TableScanContext( - snapshotId, - rowFilter, - ignoreResiduals, - isCaseSensitive, - colStats, - projectedSchema, - selectedColumns, - options, - fromSnapshotId, - toSnapshotId, - planExecutor, - fromSnapshotInclusive, - metricsReporter); + @Value.Default + public MetricsReporter metricsReporter() { + return LoggingMetricsReporter.instance(); } - boolean returnColumnStats() { - return colStats; + TableScanContext useSnapshotId(Long scanSnapshotId) { + return ImmutableTableScanContext.builder().from(this).snapshotId(scanSnapshotId).build(); } - TableScanContext shouldReturnColumnStats(boolean returnColumnStats) { - return new TableScanContext( - snapshotId, - rowFilter, - ignoreResiduals, - caseSensitive, - returnColumnStats, - projectedSchema, - selectedColumns, - options, - fromSnapshotId, - toSnapshotId, - planExecutor, - fromSnapshotInclusive, - metricsReporter); + TableScanContext filterRows(Expression filter) { + return ImmutableTableScanContext.builder().from(this).rowFilter(filter).build(); } - Collection selectedColumns() { - return selectedColumns; + TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) { + return ImmutableTableScanContext.builder() + .from(this) + .ignoreResiduals(shouldIgnoreResiduals) + .build(); } - TableScanContext selectColumns(Collection columns) { - Preconditions.checkState( - projectedSchema == null, "Cannot select columns when projection schema is set"); - return new TableScanContext( - snapshotId, - rowFilter, - ignoreResiduals, - caseSensitive, - colStats, - null, - columns, - options, - fromSnapshotId, - toSnapshotId, - planExecutor, - fromSnapshotInclusive, - metricsReporter); + TableScanContext setCaseSensitive(boolean isCaseSensitive) { + return ImmutableTableScanContext.builder().from(this).caseSensitive(isCaseSensitive).build(); } - Schema projectedSchema() { - return projectedSchema; + TableScanContext shouldReturnColumnStats(boolean returnColumnStats) { + return ImmutableTableScanContext.builder() + .from(this) + .returnColumnStats(returnColumnStats) + .build(); } - TableScanContext project(Schema schema) { + TableScanContext selectColumns(Collection columns) { Preconditions.checkState( - selectedColumns == null, "Cannot set projection schema when columns are selected"); - return new TableScanContext( - snapshotId, - rowFilter, - ignoreResiduals, - caseSensitive, - colStats, - schema, - null, - options, - fromSnapshotId, - toSnapshotId, - planExecutor, - fromSnapshotInclusive, - metricsReporter); + projectedSchema() == null, "Cannot select columns when projection schema is set"); + return ImmutableTableScanContext.builder().from(this).selectedColumns(columns).build(); } - Map options() { - return options; + TableScanContext project(Schema schema) { + Preconditions.checkState( + selectedColumns() == null, "Cannot set projection schema when columns are selected"); + return ImmutableTableScanContext.builder().from(this).projectedSchema(schema).build(); } TableScanContext withOption(String property, String value) { - ImmutableMap.Builder builder = ImmutableMap.builder(); - builder.putAll(options); - builder.put(property, value); - return new TableScanContext( - snapshotId, - rowFilter, - ignoreResiduals, - caseSensitive, - colStats, - projectedSchema, - selectedColumns, - builder.build(), - fromSnapshotId, - toSnapshotId, - planExecutor, - fromSnapshotInclusive, - metricsReporter); - } - - Long fromSnapshotId() { - return fromSnapshotId; + return ImmutableTableScanContext.builder().from(this).putOptions(property, value).build(); } TableScanContext fromSnapshotIdExclusive(long id) { - return new TableScanContext( - snapshotId, - rowFilter, - ignoreResiduals, - caseSensitive, - colStats, - projectedSchema, - selectedColumns, - options, - id, - toSnapshotId, - planExecutor, - false, - metricsReporter); + return ImmutableTableScanContext.builder() + .from(this) + .fromSnapshotId(id) + .fromSnapshotInclusive(false) + .build(); } TableScanContext fromSnapshotIdInclusive(long id) { - return new TableScanContext( - snapshotId, - rowFilter, - ignoreResiduals, - caseSensitive, - colStats, - projectedSchema, - selectedColumns, - options, - id, - toSnapshotId, - planExecutor, - true, - metricsReporter); - } - - boolean fromSnapshotInclusive() { - return fromSnapshotInclusive; - } - - Long toSnapshotId() { - return toSnapshotId; + return ImmutableTableScanContext.builder() + .from(this) + .fromSnapshotId(id) + .fromSnapshotInclusive(true) + .build(); } TableScanContext toSnapshotId(long id) { - return new TableScanContext( - snapshotId, - rowFilter, - ignoreResiduals, - caseSensitive, - colStats, - projectedSchema, - selectedColumns, - options, - fromSnapshotId, - id, - planExecutor, - fromSnapshotInclusive, - metricsReporter); - } - - ExecutorService planExecutor() { - return Optional.ofNullable(planExecutor).orElseGet(ThreadPools::getWorkerPool); - } - - boolean planWithCustomizedExecutor() { - return planExecutor != null; + return ImmutableTableScanContext.builder().from(this).toSnapshotId(id).build(); } TableScanContext planWith(ExecutorService executor) { - return new TableScanContext( - snapshotId, - rowFilter, - ignoreResiduals, - caseSensitive, - colStats, - projectedSchema, - selectedColumns, - options, - fromSnapshotId, - toSnapshotId, - executor, - fromSnapshotInclusive, - metricsReporter); + return ImmutableTableScanContext.builder().from(this).planExecutor(executor).build(); } - MetricsReporter metricsReporter() { - return null == metricsReporter ? LoggingMetricsReporter.instance() : metricsReporter; + TableScanContext reportWith(MetricsReporter reporter) { + return ImmutableTableScanContext.builder() + .from(this) + .metricsReporter( + metricsReporter() instanceof LoggingMetricsReporter + ? reporter + : MetricsReporters.combine(metricsReporter(), reporter)) + .build(); } - TableScanContext reportWith(MetricsReporter reporter) { - return new TableScanContext( - snapshotId, - rowFilter, - ignoreResiduals, - caseSensitive, - colStats, - projectedSchema, - selectedColumns, - options, - fromSnapshotId, - toSnapshotId, - planExecutor, - fromSnapshotInclusive, - MetricsReporters.combine(metricsReporter, reporter)); + public static TableScanContext empty() { + return ImmutableTableScanContext.builder().build(); } } diff --git a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java index dfc14faa0b75..8dbdd9cf6b7c 100644 --- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java +++ b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java @@ -46,7 +46,7 @@ public TestScanPlanningAndReporting() { @Test public void noDuplicatesInScanContext() { - TableScanContext context = new TableScanContext(); + TableScanContext context = TableScanContext.empty(); assertThat(context.metricsReporter()).isInstanceOf(LoggingMetricsReporter.class); MetricsReporter first = report -> {};