Skip to content

Commit

Permalink
Kafka-14748: Relax non-null FK left-join requirement (apache#14107)
Browse files Browse the repository at this point in the history
Relax non-null FK left-join requirement.

Testing Strategy: Inject extractor which returns null on first or second element.

Reviewers: Walker Carlson <[email protected]>
  • Loading branch information
florin-akermann authored and ex172000 committed Dec 15, 2023
1 parent c855b8a commit 6165f53
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 119 deletions.
4 changes: 0 additions & 4 deletions docs/streams/developer-guide/dsl-api.html
Original file line number Diff line number Diff line change
Expand Up @@ -2542,10 +2542,6 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key
<blockquote>
<div>
<ul class="simple">
<li>
Records for which the <code class="docutils literal"><span class="pre">foreignKeyExtractor</span></code> produces <code class="docutils literal"><span class="pre">null</span></code> are ignored and do not trigger a join.
If you want to join with <code class="docutils literal"><span class="pre">null</span></code> foreign keys, use a suitable sentinel value to do so (i.e. <code class="docutils literal"><span class="pre">"NULL"</span></code> for a String field, or <code class="docutils literal"><span class="pre">-1</span></code> for an auto-incrementing integer field).
</li>
<li>Input records with a <code class="docutils
literal"><span class="pre">null</span></code>
value are interpreted as <em>tombstones</em>
Expand Down
31 changes: 16 additions & 15 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,7 @@ <h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"><
<h3><a id="streams_api_changes_370" href="#streams_api_changes_370">Streams API changes in 3.7.0</a></h3>
<p>
IQv2 supports <code>RangeQuery</code> that allows to specify unbounded, bounded, or half-open key-ranges, which return data in ascending (byte[]-lexicographical) order (per partition).
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2">KIP-985</a> extends this functionality by adding <code>.withDescendingKeys()<code> to allow user to receive data in descending order.
</p>

<h3><a id="streams_api_changes_360" href="#streams_api_changes_360">Streams API changes in 3.6.0</a></h3>
<p>
Rack aware task assignment was introduced in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams">KIP-925</a>.
Rack aware task assignment can be enabled for <code>StickyTaskAssignor</code> or <code>HighAvailabilityTaskAssignor</code> to compute task assignments which can minimize cross rack traffic under certain conditions.
For more information, including how it can be enabled and further configured, see the <a href="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka Streams Developer Guide</b></a>.
</p>

<p>
IQv2 supports a <code>RangeQuery</code> that allows to specify unbounded, bounded, or half-open key-ranges. Users have to use <code>withUpperBound(K)</code>, <code>withLowerBound(K)</code>,
or <code>withNoBounds()</code> to specify half-open or unbounded ranges, but cannot use <code>withRange(K lower, K upper)</code> for the same.
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds">KIP-941</a> closes this gap by allowing to pass in <code>null</code>
as upper and lower bound (with semantics "no bound") to simplify the usage of the <code>RangeQuery</code> class.
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2">KIP-985</a> extends this functionality by adding <code>.withDescendingKeys()</code> to allow user to receive data in descending order.
</p>

<p>
Expand Down Expand Up @@ -198,6 +184,21 @@ <h3><a id="streams_api_changes_360" href="#streams_api_changes_360">Streams API
</code>
</pre>
</p>

<h3><a id="streams_api_changes_360" href="#streams_api_changes_360">Streams API changes in 3.6.0</a></h3>
<p>
Rack aware task assignment was introduced in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams">KIP-925</a>.
Rack aware task assignment can be enabled for <code>StickyTaskAssignor</code> or <code>HighAvailabilityTaskAssignor</code> to compute task assignments which can minimize cross rack traffic under certain conditions.
For more information, including how it can be enabled and further configured, see the <a href="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka Streams Developer Guide</b></a>.
</p>

<p>
IQv2 supports a <code>RangeQuery</code> that allows to specify unbounded, bounded, or half-open key-ranges. Users have to use <code>withUpperBound(K)</code>, <code>withLowerBound(K)</code>,
or <code>withNoBounds()</code> to specify half-open or unbounded ranges, but cannot use <code>withRange(K lower, K upper)</code> for the same.
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds">KIP-941</a> closes this gap by allowing to pass in <code>null</code>
as upper and lower bound (with semantics "no bound") to simplify the usage of the <code>RangeQuery</code> class.
</p>

<h3><a id="streams_api_changes_350" href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3>
<p>
A new state store type, versioned key-value stores, was introduced in
Expand Down
21 changes: 10 additions & 11 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2237,7 +2237,7 @@ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* extract is null, then the right hand side of the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
Expand All @@ -2254,8 +2254,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
* result is null, the update is ignored as invalid.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* extract is null, then the right hand side of the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param <VR> the value type of the result {@code KTable}
Expand All @@ -2279,9 +2279,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
* result is null, the update is ignored as invalid.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* extract is null, then the right hand side of the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
* @param <VR> the value type of the result {@code KTable}
Expand All @@ -2301,7 +2300,7 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* extract is null, then the right hand side of the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
Expand All @@ -2321,8 +2320,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
* result is null, the update is ignored as invalid.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* extract is null, then the right hand side of the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
Expand Down Expand Up @@ -2350,8 +2349,8 @@ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
* result is null, the update is ignored as invalid.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* extract is null, then the right hand side of the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class CombinedKey<KF, KP> {
private final KP primaryKey;

CombinedKey(final KF foreignKey, final KP primaryKey) {
Objects.requireNonNull(foreignKey, "foreignKey can't be null");
Objects.requireNonNull(primaryKey, "primaryKey can't be null");
this.foreignKey = foreignKey;
this.primaryKey = primaryKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ public void process(final Record<CombinedKey<KO, K>, Change<ValueAndTimestamp<Su
throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
}

final ValueAndTimestamp<VO> foreignValueAndTime = foreignValues.get(record.key().getForeignKey());
final ValueAndTimestamp<VO> foreignValueAndTime =
record.key().getForeignKey() == null ?
null :
foreignValues.get(record.key().getForeignKey());

final long resultTimestamp =
foreignValueAndTime == null ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,8 @@ public void init(final ProcessorContext<CombinedKey<KO, K>, Change<ValueAndTimes

@Override
public void process(final Record<KO, SubscriptionWrapper<K>> record) {
if (record.key() == null) {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null foreign key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null foreign key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
if (record.key() == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().getInstruction())) {
dropRecord();
return;
}
if (record.value().getVersion() > SubscriptionWrapper.CURRENT_VERSION) {
Expand All @@ -97,7 +85,22 @@ public void process(final Record<KO, SubscriptionWrapper<K>> record) {
//from older SubscriptionWrapper versions to newer versions.
throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
}
context().forward(
record.withKey(new CombinedKey<>(record.key(), record.value().getPrimaryKey()))
.withValue(inferChange(record))
.withTimestamp(record.timestamp())
);
}

private Change<ValueAndTimestamp<SubscriptionWrapper<K>>> inferChange(final Record<KO, SubscriptionWrapper<K>> record) {
if (record.key() == null) {
return new Change<>(ValueAndTimestamp.make(record.value(), record.timestamp()), null);
} else {
return inferBasedOnState(record);
}
}

private Change<ValueAndTimestamp<SubscriptionWrapper<K>>> inferBasedOnState(final Record<KO, SubscriptionWrapper<K>> record) {
final Bytes subscriptionKey = keySchema.toBytes(record.key(), record.value().getPrimaryKey());

final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = ValueAndTimestamp.make(record.value(), record.timestamp());
Expand All @@ -110,14 +113,23 @@ public void process(final Record<KO, SubscriptionWrapper<K>> record) {
} else {
store.put(subscriptionKey, newValue);
}
final Change<ValueAndTimestamp<SubscriptionWrapper<K>>> change = new Change<>(newValue, oldValue);
// note: key is non-nullable
// note: newValue is non-nullable
context().forward(
record.withKey(new CombinedKey<>(record.key(), record.value().getPrimaryKey()))
.withValue(change)
.withTimestamp(newValue.timestamp())
);
return new Change<>(newValue, oldValue);
}

private void dropRecord() {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null foreign key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null foreign key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
}
};
}
Expand Down
Loading

0 comments on commit 6165f53

Please sign in to comment.