diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java index 406e72c751db..4a8a467bb49a 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.InvalidProtocolBufferException; +import org.checkerframework.checker.nullness.qual.Nullable; @SuppressWarnings({"rawtypes"}) public class ExpansionServiceSchemaTransformProvider implements TransformProvider { @@ -49,9 +50,9 @@ public class ExpansionServiceSchemaTransformProvider implements TransformProvide private Map schemaTransformProviders = new HashMap<>(); - private static ExpansionServiceSchemaTransformProvider transformProvider = null; + private static @Nullable ExpansionServiceSchemaTransformProvider transformProvider = null; - private void loadSchemaTransforms() { + private ExpansionServiceSchemaTransformProvider() { try { for (org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider schemaTransformProvider : ServiceLoader.load( @@ -68,10 +69,6 @@ private void loadSchemaTransforms() { } } - private ExpansionServiceSchemaTransformProvider() { - loadSchemaTransforms(); - } - public static ExpansionServiceSchemaTransformProvider of() { if (transformProvider == null) { transformProvider = new ExpansionServiceSchemaTransformProvider(); @@ -142,6 +139,10 @@ public PTransform getTransform(FunctionSpec spec) { String identifier = payload.getIdentifier(); org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider provider = schemaTransformProviders.get(identifier); + if (provider == null) { + throw new IllegalArgumentException( + "Could not find a SchemaTransform with identifier " + identifier); + } Schema configSchemaFromRequest = SchemaTranslation.schemaFromProto((payload.getConfigurationSchema()));