Skip to content

Commit

Permalink
[Flink Runner] Add UseDataStreamForBatch option to Flink runner to en…
Browse files Browse the repository at this point in the history
…able batch execution on DataStream API (apache#28614)

Co-authored-by: Jiangjie Qin <[email protected]>
Co-authored-by: tvalentyn <[email protected]>
  • Loading branch information
3 people authored and minxhe committed Nov 25, 2024
1 parent 6f9c14b commit f40f3b6
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 24 deletions.
62 changes: 41 additions & 21 deletions .test-infra/jenkins/job_PreCommit_Java_InfluxDb_IO_Direct.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,48 @@
* limitations under the License.
*/

import PrecommitJobBuilder
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;

PrecommitJobBuilder builder = new PrecommitJobBuilder(
scope: this,
nameBase: 'Java_InfluxDb_IO_Direct',
gradleTasks: [
':sdks:java:io:influxdb:build',
],
gradleSwitches: [
'-PdisableSpotlessCheck=true',
'-PdisableCheckStyle=true'
], // spotless checked in separate pre-commit
triggerPathPatterns: [
'^sdks/java/core/src/main/.*$',
'^sdks/java/io/common/.*$',
'^sdks/java/io/influxdb/.*$',
],
timeoutMins: 60,
)
builder.build {
publishers {
archiveJunit('**/build/test-results/**/*.xml')
/** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */
public abstract class AbstractStreamOperatorCompat<OutputT>
extends AbstractStreamOperator<OutputT> {

/**
* Getter for timeServiceManager, which has been made private in Flink 1.11.
*
* @return Time service manager.
*/
protected InternalTimeServiceManager<?> getTimeServiceManagerCompat() {
return getTimeServiceManager()
.orElseThrow(() -> new IllegalStateException("Time service manager is not set."));
}

/**
* This call has been removed from {@link AbstractStreamOperator} in Flink 1.12.
*
* <p>{@link InternalTimeServiceManagerImpl#numProcessingTimeTimers()}
*/
protected int numProcessingTimeTimers() {
return getTimeServiceManager()
.map(
manager -> {
InternalTimeServiceManager<?> tsm = getTimeServiceManagerCompat();
if (tsm instanceof InternalTimeServiceManagerImpl) {
final InternalTimeServiceManagerImpl<?> cast =
(InternalTimeServiceManagerImpl<?>) getTimeServiceManagerCompat();
return cast.numProcessingTimeTimers();
} else if (tsm instanceof BatchExecutionInternalTimeServiceManager) {
return 0;
} else {
throw new IllegalStateException(
String.format(
"Unknown implementation of InternalTimerServiceManager. %s", tsm));
}
})
.orElse(0);
}

/** Release all of the operator's resources. */
Expand Down
Loading

0 comments on commit f40f3b6

Please sign in to comment.