Skip to content

Commit

Permalink
[Flink Runner] Add new Source classes that are based on FLIP-27 Sourc…
Browse files Browse the repository at this point in the history
…e API. (apache#25525)
  • Loading branch information
becketqin authored and ruslan-ikhsan committed Mar 10, 2023
1 parent a1230fc commit 9abcf54
Show file tree
Hide file tree
Showing 29 changed files with 3,186 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;

import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;

public class FlinkSourceCompat {

public static Counter getNumRecordsInCounter(SourceReaderContext context) {
return ((OperatorMetricGroup) context.metricGroup())
.getIOMetricGroup()
.getNumRecordsInCounter();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;

import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;

public interface SplitEnumeratorCompat<SplitT extends SourceSplit, CheckpointT>
extends SplitEnumerator<SplitT, CheckpointT> {

CheckpointT snapshotState(long checkpointId) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Classes helping maintain backwards compatibility across Flink versions. */
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;

public class SourceTestCompat {

/** A MetricGroup implementation which records the registered gauge. */
public static class TestMetricGroup
extends UnregisteredMetricGroups.UnregisteredOperatorMetricGroup {
public final Map<String, Gauge<?>> registeredGauge = new HashMap<>();
public final Counter numRecordsInCounter = new SimpleCounter();

@Override
public <T, GaugeT extends Gauge<T>> GaugeT gauge(String name, GaugeT gauge) {
registeredGauge.put(name, gauge);
return gauge;
}

@Override
public OperatorIOMetricGroup getIOMetricGroup() {
return new OperatorIOMetricGroup(this) {
@Override
public Counter getNumRecordsInCounter() {
return numRecordsInCounter;
}
};
}
}

public interface ReaderOutputCompat<T> extends ReaderOutput<T> {
void markActive();
}

public interface SourceOutputCompat<T> extends SourceOutput<T> {
void markActive();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;

import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;

public interface SplitEnumeratorCompat<SplitT extends SourceSplit, CheckpointT>
extends SplitEnumerator<SplitT, CheckpointT> {

CheckpointT snapshotState() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;

import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.metrics.Counter;

public class FlinkSourceCompat {

public static Counter getNumRecordsInCounter(SourceReaderContext context) {
return context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;

public class SourceTestCompat {

/** A MetricGroup implementation which records the registered gauge. */
public static class TestMetricGroup extends UnregisteredMetricsGroup
implements SourceReaderMetricGroup {
public final Map<String, Gauge<?>> registeredGauge = new HashMap<>();
public final Counter numRecordsInCounter = new SimpleCounter();

@Override
public OperatorIOMetricGroup getIOMetricGroup() {
return new UnregisteredOperatorIOMetricGroup() {
@Override
public Counter getNumRecordsInCounter() {
return numRecordsInCounter;
}
};
}

@Override
public <T, GaugeT extends Gauge<T>> GaugeT gauge(String name, GaugeT gauge) {
registeredGauge.put(name, gauge);
return gauge;
}

@Override
public Counter getNumRecordsInErrorsCounter() {
return new SimpleCounter();
}

@Override
public void setPendingBytesGauge(Gauge<Long> pendingBytesGauge) {}

@Override
public void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge) {}
}

private static class UnregisteredOperatorIOMetricGroup extends UnregisteredMetricsGroup
implements OperatorIOMetricGroup {
@Override
public Counter getNumRecordsInCounter() {
return new SimpleCounter();
}

@Override
public Counter getNumRecordsOutCounter() {
return new SimpleCounter();
}

@Override
public Counter getNumBytesInCounter() {
return new SimpleCounter();
}

@Override
public Counter getNumBytesOutCounter() {
return new SimpleCounter();
}
}

public interface ReaderOutputCompat<T> extends ReaderOutput<T> {}

public interface SourceOutputCompat<T> extends SourceOutput<T> {}
}
2 changes: 2 additions & 0 deletions runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ dependencies {
if (flink_version.compareTo("1.14") >= 0) {
implementation "org.apache.flink:flink-runtime:$flink_version"
implementation "org.apache.flink:flink-optimizer:$flink_version"
implementation "org.apache.flink:flink-metrics-core:$flink_version"
testImplementation "org.apache.flink:flink-runtime:$flink_version:tests"
testImplementation "org.apache.flink:flink-rpc-akka:$flink_version"
} else {
Expand All @@ -197,6 +198,7 @@ dependencies {
testImplementation project(":sdks:java:io:google-cloud-platform")
testImplementation library.java.jackson_dataformat_yaml
testImplementation "org.apache.flink:flink-core:$flink_version:tests"
testImplementation "org.apache.flink:flink-connector-test-utils:$flink_version"
testImplementation project(":sdks:java:harness")
testRuntimeOnly library.java.slf4j_simple
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.utils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.core.io.SimpleVersionedSerializer;

/** Util methods to help with serialization / deserialization. */
public class SerdeUtils {

// Private constructor for a util class.
private SerdeUtils() {}

public static @Nonnull byte[] serializeObject(@Nullable Object obj) throws IOException {
if (obj == null) {
return new byte[0];
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(obj);
oos.close();
return baos.toByteArray();
}

@SuppressWarnings("unchecked")
public static @Nullable Object deserializeObject(byte[] serialized) throws IOException {
if (serialized == null || serialized.length == 0) {
return null;
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
ObjectInputStream ois = new ObjectInputStream(bais)) {
return ois.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}

public static <T> SimpleVersionedSerializer<T> getNaiveObjectSerializer() {
return new SimpleVersionedSerializer<T>() {
@Override
public int getVersion() {
return 0;
}

@Override
public byte[] serialize(T obj) throws IOException {
return serializeObject(obj);
}

@Override
@SuppressWarnings("unchecked")
public T deserialize(int version, byte[] serialized) throws IOException {
if (version > getVersion()) {
throw new IOException(
String.format(
"Received serialized object of version %d, which is higher than "
+ "the highest supported version %d.",
version, getVersion()));
}
return (T) deserializeObject(serialized);
}
};
}
}
Loading

0 comments on commit 9abcf54

Please sign in to comment.