diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java index 8e47f4f2bc9c..e0a3199d0c69 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java @@ -178,7 +178,12 @@ public CoderProviderForCoder(TypeDescriptor type, Coder coder) { @Override public Coder coderFor(TypeDescriptor type, List> componentCoders) throws CannotProvideCoderException { - if (!this.type.equals(type)) { + boolean isTypeEqual = this.type.equals(type); + boolean isAutoValueConcrete = + type.getRawType().getName().contains("AutoValue_") + && this.type.getRawType().isAssignableFrom(type.getRawType()); + + if (!isTypeEqual && !isAutoValueConcrete) { throw new CannotProvideCoderException( String.format( "Unable to provide coder for %s, this factory can only provide coders for %s", diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java index 52718fcde2af..782a77cde685 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java @@ -88,6 +88,14 @@ public Coder coderFor( Class clazz = typeDescriptor.getRawType(); DefaultCoder defaultAnnotation = clazz.getAnnotation(DefaultCoder.class); + if (defaultAnnotation == null) { + // check if the superclass has DefaultCoder annotation if the class is generated using + // AutoValue + if (clazz.getName().contains("AutoValue_")) { + clazz = clazz.getSuperclass(); + defaultAnnotation = clazz.getAnnotation(DefaultCoder.class); + } + } if (defaultAnnotation == null) { throw new CannotProvideCoderException( String.format("Class %s does not have a @DefaultCoder annotation.", clazz.getName())); diff --git a/sdks/java/io/splunk/build.gradle b/sdks/java/io/splunk/build.gradle index dd1b15e10dde..41a7a409e890 100644 --- a/sdks/java/io/splunk/build.gradle +++ b/sdks/java/io/splunk/build.gradle @@ -37,6 +37,7 @@ dependencies { implementation library.java.joda_time implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre + implementation library.java.commons_io testImplementation library.java.junit testImplementation group: 'org.mock-server', name: 'mockserver-junit-rule', version: '5.10.0' testImplementation group: 'org.mock-server', name: 'mockserver-client-java', version: '5.10.0' diff --git a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java index 6c5537990bdf..f34fcb7c4e0e 100644 --- a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java +++ b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java @@ -22,9 +22,11 @@ import com.google.api.client.http.ByteArrayContent; import com.google.api.client.http.GZipEncoding; import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpBackOffIOExceptionHandler; import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler; import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler.BackOffRequired; import com.google.api.client.http.HttpContent; +import com.google.api.client.http.HttpIOExceptionHandler; import com.google.api.client.http.HttpMediaType; import com.google.api.client.http.HttpRequest; import com.google.api.client.http.HttpRequestFactory; @@ -139,6 +141,9 @@ HttpResponse execute(List events) throws IOException { responseHandler.setBackOffRequired(BackOffRequired.ON_SERVER_ERROR); request.setUnsuccessfulResponseHandler(responseHandler); + HttpIOExceptionHandler ioExceptionHandler = + new HttpBackOffIOExceptionHandler(getConfiguredBackOff()); + request.setIOExceptionHandler(ioExceptionHandler); setHeaders(request, token()); return request.execute(); @@ -180,6 +185,10 @@ void close() throws IOException { */ private void setHeaders(HttpRequest request, String token) { request.getHeaders().setAuthorization(String.format(AUTHORIZATION_SCHEME, token)); + + if (enableGzipHttpCompression()) { + request.getHeaders().setContentEncoding("gzip"); + } } /** diff --git a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java index 7dd78e1754b4..177900a2d09a 100644 --- a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java +++ b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java @@ -20,9 +20,9 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; +import com.google.gson.JsonObject; import com.google.gson.annotations.SerializedName; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.coders.DefaultCoder; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -39,7 +39,7 @@ *
  • index * */ -@DefaultSchema(AutoValueSchema.class) +@DefaultCoder(SplunkEventCoder.class) @AutoValue public abstract class SplunkEvent { @@ -59,6 +59,8 @@ public static Builder newBuilder() { public abstract @Nullable String index(); + public abstract @Nullable JsonObject fields(); + public abstract @Nullable String event(); /** A builder class for creating a {@link SplunkEvent}. */ @@ -75,6 +77,8 @@ public abstract static class Builder { abstract Builder setIndex(String index); + abstract Builder setFields(JsonObject fields); + abstract Builder setEvent(String event); abstract String event(); @@ -136,6 +140,17 @@ public Builder withIndex(String index) { return setIndex(index); } + /** + * Assigns fields value to the event metadata. + * + * @param fields fields value to assign + */ + public Builder withFields(JsonObject fields) { + checkNotNull(fields, "withFields(fields) called with null input."); + + return setFields(fields); + } + /** * Assigns the event payload to be sent to the HEC endpoint. * diff --git a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventCoder.java b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventCoder.java new file mode 100644 index 000000000000..35d5314ae9ee --- /dev/null +++ b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventCoder.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.splunk; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.commons.io.IOUtils; + +/** A {@link org.apache.beam.sdk.coders.Coder} for {@link SplunkEvent} objects. */ +public class SplunkEventCoder extends AtomicCoder { + + private static final SplunkEventCoder SPLUNK_EVENT_CODER = new SplunkEventCoder(); + + private static final TypeDescriptor TYPE_DESCRIPTOR = + new TypeDescriptor() {}; + private static final StringUtf8Coder STRING_UTF_8_CODER = StringUtf8Coder.of(); + private static final NullableCoder STRING_NULLABLE_CODER = + NullableCoder.of(STRING_UTF_8_CODER); + private static final NullableCoder LONG_NULLABLE_CODER = + NullableCoder.of(BigEndianLongCoder.of()); + + private static final Gson GSON = new Gson(); + + // Version markers must be >= 2. + private static final int VERSION_3 = 3; + + public static SplunkEventCoder of() { + return SPLUNK_EVENT_CODER; + } + + public static CoderProvider getCoderProvider() { + return CoderProviders.forCoder(TYPE_DESCRIPTOR, SplunkEventCoder.of()); + } + + @Override + @SuppressWarnings("nullness") + public void encode(SplunkEvent value, OutputStream out) throws IOException { + out.write(VERSION_3); + + LONG_NULLABLE_CODER.encode(value.time(), out); + STRING_NULLABLE_CODER.encode(value.host(), out); + STRING_NULLABLE_CODER.encode(value.source(), out); + STRING_NULLABLE_CODER.encode(value.sourceType(), out); + STRING_NULLABLE_CODER.encode(value.index(), out); + String fields = value.fields() == null ? null : value.fields().toString(); + STRING_NULLABLE_CODER.encode(fields, out); + STRING_UTF_8_CODER.encode(value.event(), out); + } + + @Override + public SplunkEvent decode(InputStream in) throws CoderException, IOException { + SplunkEvent.Builder builder = SplunkEvent.newBuilder(); + + int v = in.read(); + + // Versions 1 and 2 of this coder had no version marker field, but 1st byte in the serialized + // data was always 0 or 1 (present/not present indicator for a nullable field). + // So here we assume if the first byte is >= 2 then it's the version marker. + + if (v >= 2) { + decodeWithVersion(v, in, builder); + } else { + // It's impossible to distinguish between V1 and V2 without re-reading portions of the input + // stream twice (and without the version marker), so we must have a ByteArrayInputStream copy, + // which is guaranteed to support mark()/reset(). + + ByteArrayOutputStream os = new ByteArrayOutputStream(); + os.write(v); + IOUtils.copy(in, os); + ByteArrayInputStream streamCopy = new ByteArrayInputStream(os.toByteArray()); + + decodeVersion1or2(streamCopy, builder); + } + + return builder.build(); + } + + private void decodeWithVersion(int version, InputStream in, SplunkEvent.Builder builder) + throws IOException { + + decodeCommonFields(in, builder); + + if (version >= VERSION_3) { + String fields = STRING_NULLABLE_CODER.decode(in); + if (fields != null) { + builder.withFields(GSON.fromJson(fields, JsonObject.class)); + } + + String event = STRING_UTF_8_CODER.decode(in); + builder.withEvent(event); + } + } + + private void decodeVersion1or2(ByteArrayInputStream in, SplunkEvent.Builder builder) + throws IOException { + + decodeCommonFields(in, builder); + + in.mark(Integer.MAX_VALUE); + + // The following fields may be different between V1 and V2. + + // V1 format: <... common fields...> + // V2 format: <... common fields...> + // + + // We try to read this as V2 first. If any exception, fall back to V1. + + // Note: it's impossible to incorrectly parse V1 data with V2 decoder (potentially causing + // corrupted fields in the message). If we try that and the 1st byte is: + // - 2 or more: decoding fails because V2 expects it to be either 0 or 1 (present indicator). + // - 1: this means the "event" string length is 1, so we have only 1 more byte in the stream. + // V2 decoding fails with EOF assuming 1 is the "fields" string length and reading + // at least 1 more byte. + // - 0: this means the "event" string is empty, so we have no more bytes in the stream. + // V2 decoding fails with EOF assuming 0 is the "fields" string length and reading + // the next "event" field. + + JsonObject fields = null; + String event; + + try { + // Assume V2 first. + String fieldsString = STRING_NULLABLE_CODER.decode(in); + if (fieldsString != null) { + fields = GSON.fromJson(fieldsString, JsonObject.class); + } + event = STRING_UTF_8_CODER.decode(in); + } catch (CoderException e) { + // If failed, reset the stream and parse as V1. + in.reset(); + event = STRING_UTF_8_CODER.decode(in); + } + + if (fields != null) { + builder.withFields(fields); + } + builder.withEvent(event); + } + + private void decodeCommonFields(InputStream in, SplunkEvent.Builder builder) throws IOException { + Long time = LONG_NULLABLE_CODER.decode(in); + if (time != null) { + builder.withTime(time); + } + + String host = STRING_NULLABLE_CODER.decode(in); + if (host != null) { + builder.withHost(host); + } + + String source = STRING_NULLABLE_CODER.decode(in); + if (source != null) { + builder.withSource(source); + } + + String sourceType = STRING_NULLABLE_CODER.decode(in); + if (sourceType != null) { + builder.withSourceType(sourceType); + } + + String index = STRING_NULLABLE_CODER.decode(in); + if (index != null) { + builder.withIndex(index); + } + } + + @Override + public TypeDescriptor getEncodedTypeDescriptor() { + return TYPE_DESCRIPTOR; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException( + this, "SplunkEvent can hold arbitrary instances, which may be non-deterministic."); + } +} diff --git a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java index 8ec2a064ee0d..615d4e932f4d 100644 --- a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java +++ b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java @@ -33,8 +33,9 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; -import java.time.Instant; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.IOUtils; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MatchResult; @@ -53,8 +54,11 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InetAddresses; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InternetDomainName; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.slf4j.Logger; @@ -70,7 +74,7 @@ }) abstract class SplunkEventWriter extends DoFn, SplunkWriteError> { - private static final Integer DEFAULT_BATCH_COUNT = 1; + private static final Integer DEFAULT_BATCH_COUNT = 10; private static final Boolean DEFAULT_DISABLE_CERTIFICATE_VALIDATION = false; private static final Boolean DEFAULT_ENABLE_BATCH_LOGS = true; private static final Boolean DEFAULT_ENABLE_GZIP_HTTP_COMPRESSION = true; @@ -98,6 +102,13 @@ abstract class SplunkEventWriter extends DoFn, SplunkWr private static final String COUNT_STATE_NAME = "count"; private static final String TIME_ID_NAME = "expiry"; + private static final Pattern URL_PATTERN = Pattern.compile("^http(s?)://([^:]+)(:[0-9]+)?$"); + + @VisibleForTesting + protected static final String INVALID_URL_FORMAT_MESSAGE = + "Invalid url format. Url format should match PROTOCOL://HOST[:PORT], where PORT is optional. " + + "Supported Protocols are http and https. eg: http://hostname:8088"; + @StateId(BUFFER_STATE_NAME) private final StateSpec> buffer = StateSpecs.bag(); @@ -139,6 +150,7 @@ static Builder newBuilder() { public void setup() { checkArgument(url().isAccessible(), "url is required for writing events."); + checkArgument(isValidUrlFormat(url().get()), INVALID_URL_FORMAT_MESSAGE); checkArgument(token().isAccessible(), "Access token is required for writing events."); // Either user supplied or default batchCount. @@ -287,7 +299,7 @@ private void flush( response = publisher.execute(events); if (!response.isSuccessStatusCode()) { - UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime); + UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - startTime)); FAILED_WRITES.inc(countState.read()); int statusCode = response.getStatusCode(); if (statusCode >= 400 && statusCode < 500) { @@ -305,7 +317,7 @@ private void flush( events, response.getStatusMessage(), response.getStatusCode(), receiver); } else { - SUCCESSFUL_WRITE_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime); + SUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - startTime)); SUCCESS_WRITES.inc(countState.read()); VALID_REQUESTS.inc(); SUCCESSFUL_WRITE_BATCH_SIZE.update(countState.read()); @@ -321,7 +333,7 @@ private void flush( e.getStatusCode(), e.getContent(), e.getStatusMessage()); - UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime); + UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - startTime)); FAILED_WRITES.inc(countState.read()); int statusCode = e.getStatusCode(); if (statusCode >= 400 && statusCode < 500) { @@ -336,7 +348,7 @@ private void flush( } catch (IOException ioe) { LOG.error("Error writing to Splunk: {}", ioe.getMessage()); - UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime); + UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - startTime)); FAILED_WRITES.inc(countState.read()); INVALID_REQUESTS.inc(); @@ -350,8 +362,21 @@ private void flush( bufferState.clear(); countState.clear(); - if (response != null) { - response.disconnect(); + // We've observed cases where errors at this point can cause the pipeline to keep retrying + // the same events over and over (e.g. from Dataflow Runner's Pub/Sub implementation). Since + // the events have either been published or wrapped for error handling, we can safely + // ignore this error, though there may or may not be a leak of some type depending on + // HttpResponse's implementation. However, any potential leak would still happen if we let + // the exception fall through, so this isn't considered a major issue. + try { + if (response != null) { + response.ignore(); + } + } catch (IOException e) { + LOG.warn( + "Error ignoring response from Splunk. Messages should still have published, but there" + + " might be a connection leak.", + e); } } } @@ -426,6 +451,26 @@ public static byte[] getCertFromGcsAsBytes(String filePath) throws IOException { } } + @VisibleForTesting + static boolean isValidUrlFormat(String url) { + Matcher matcher = URL_PATTERN.matcher(url); + if (matcher.find()) { + String host = matcher.group(2); + return InetAddresses.isInetAddress(host) || InternetDomainName.isValid(host); + } + return false; + } + + /** + * Converts Nanoseconds to Milliseconds. + * + * @param ns time in nanoseconds + * @return time in milliseconds + */ + private static long nanosToMillis(long ns) { + return Math.round(((double) ns) / 1e6); + } + @AutoValue.Builder abstract static class Builder { @@ -458,6 +503,9 @@ abstract Builder setDisableCertificateValidation( */ Builder withUrl(ValueProvider url) { checkArgument(url != null, "withURL(url) called with null input."); + if (url.isAccessible()) { + checkArgument(isValidUrlFormat(url.get()), INVALID_URL_FORMAT_MESSAGE); + } return setUrl(url); } @@ -469,6 +517,7 @@ Builder withUrl(ValueProvider url) { */ Builder withUrl(String url) { checkArgument(url != null, "withURL(url) called with null input."); + checkArgument(isValidUrlFormat(url), INVALID_URL_FORMAT_MESSAGE); return setUrl(ValueProvider.StaticValueProvider.of(url)); } diff --git a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java index bd1e716951d4..2127cc55752d 100644 --- a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java +++ b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java @@ -159,7 +159,6 @@ public PCollection expand(PCollection input) { .withRootCaCertificatePath(rootCaCertificatePath()) .withEnableBatchLogs(enableBatchLogs()) .withEnableGzipHttpCompression(enableGzipHttpCompression()); - ; SplunkEventWriter writer = builder.build(); LOG.info("SplunkEventWriter configured"); diff --git a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventCoderTest.java b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventCoderTest.java new file mode 100644 index 000000000000..8267e406960a --- /dev/null +++ b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventCoderTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.splunk; + +import static org.junit.Assert.assertEquals; + +import com.google.gson.JsonObject; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; +import org.junit.Test; + +/** Unit tests for {@link SplunkEventCoder} class. */ +public class SplunkEventCoderTest { + + /** + * Test whether {@link SplunkEventCoder} is able to encode/decode a {@link SplunkEvent} correctly. + * + * @throws IOException + */ + @Test + public void testEncodeDecode() throws IOException { + + String event = "test-event"; + String host = "test-host"; + String index = "test-index"; + String source = "test-source"; + String sourceType = "test-source-type"; + Long time = 123456789L; + + SplunkEvent actualEvent = + SplunkEvent.newBuilder() + .withEvent(event) + .withHost(host) + .withIndex(index) + .withSource(source) + .withSourceType(sourceType) + .withTime(time) + .build(); + + SplunkEventCoder coder = SplunkEventCoder.of(); + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + coder.encode(actualEvent, bos); + try (ByteArrayInputStream bin = new ByteArrayInputStream(bos.toByteArray())) { + SplunkEvent decodedEvent = coder.decode(bin); + assertEquals(decodedEvent, actualEvent); + } + } + } + + /** + * Test whether {@link SplunkEventCoder} is able to encode/decode a {@link SplunkEvent} with + * metadata 'fields'. + * + * @throws IOException + */ + @Test + public void testEncodeDecodeFields() throws IOException { + + String event = "test-event"; + JsonObject fields = new JsonObject(); + fields.addProperty("test-key", "test-value"); + + SplunkEvent actualEvent = SplunkEvent.newBuilder().withEvent(event).withFields(fields).build(); + + SplunkEventCoder coder = SplunkEventCoder.of(); + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + coder.encode(actualEvent, bos); + try (ByteArrayInputStream bin = new ByteArrayInputStream(bos.toByteArray())) { + SplunkEvent decodedEvent = coder.decode(bin); + assertEquals(decodedEvent, actualEvent); + } + } + } + + /** + * Tests whether {@link SplunkEventCoder} is able to decode a {@link SplunkEvent} encoded using + * the older coder version 1 (commit f0ff6cc). + */ + @Test + public void testBackwardsCompatibility_canDecodeVersion1() throws IOException, DecoderException { + + SplunkEvent expectedEvent = + SplunkEvent.newBuilder() + .withEvent("e") + .withHost("h") + .withIndex("i") + .withSource("s") + .withSourceType("st") + .withTime(1234L) + .build(); + + String hex = "0100000000000004d2010168010173010273740101690165"; + SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex)); + + assertEquals(expectedEvent, actualEvent); + } + + /** + * Tests whether {@link SplunkEventCoder} is able to decode a {@link SplunkEvent} encoded using + * the older coder version 1 (commit f0ff6cc) and having an empty "event" field. + * + *

    An empty field is encoded as 00, which may look like the present/not present + * marker for the "fields" field in V2. + */ + @Test + public void testBackwardsCompatibility_canDecodeVersion1withEmptyEvent() + throws IOException, DecoderException { + + SplunkEvent expectedEvent = + SplunkEvent.newBuilder() + .withEvent("") + .withHost("h") + .withIndex("i") + .withSource("s") + .withSourceType("st") + .withTime(1234L) + .build(); + + String hex = "0100000000000004d20101680101730102737401016900"; + SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex)); + + assertEquals(expectedEvent, actualEvent); + } + + /** + * Tests whether {@link SplunkEventCoder} is able to decode a {@link SplunkEvent} encoded using + * the older coder version 1 (commit f0ff6cc) and having the "event" field of length 1. + * + *

    This is a special case when "event" is of length 1 and the first character code is 00. This + * is encoded as byte sequence 01 00 by V1 coder, which can be treated as an empty "fields" field + * by V2 decoder. + */ + @Test + public void testBackwardsCompatibility_canDecodeVersion1withEventLength1() + throws IOException, DecoderException { + + SplunkEvent expectedEvent = + SplunkEvent.newBuilder() + .withEvent(new String(new byte[] {0}, StandardCharsets.UTF_8)) + .withHost("h") + .withIndex("i") + .withSource("s") + .withSourceType("st") + .withTime(1234L) + .build(); + + String hex = "0100000000000004d2010168010173010273740101690100"; + SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex)); + + assertEquals(expectedEvent, actualEvent); + } + + /** + * Tests whether {@link SplunkEventCoder} is able to decode a {@link SplunkEvent} encoded using + * the older coder version 2 (commit 5e53040), without the newly added "fields" field. + */ + @Test + public void testBackwardsCompatibility_canDecodeVersion2() throws IOException, DecoderException { + + SplunkEvent expectedEvent = + SplunkEvent.newBuilder() + .withEvent("e") + .withHost("h") + .withIndex("i") + .withSource("s") + .withSourceType("st") + .withTime(1234L) + .build(); + + String hex = "0100000000000004d201016801017301027374010169000165"; + SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex)); + + assertEquals(expectedEvent, actualEvent); + } + + /** + * Tests whether {@link SplunkEventCoder} is able to decode a {@link SplunkEvent} encoded using + * the older coder version 2 (commit 5e53040), with the newly added "fields" field. + */ + @Test + public void testBackwardsCompatibility_canDecodeVersion2withFields() + throws IOException, DecoderException { + + JsonObject fields = new JsonObject(); + fields.addProperty("k", "v"); + + SplunkEvent expectedEvent = + SplunkEvent.newBuilder() + .withEvent("e") + .withHost("h") + .withIndex("i") + .withSource("s") + .withSourceType("st") + .withTime(1234L) + .withFields(fields) + .build(); + + String hex = "0100000000000004d20101680101730102737401016901097b226b223a2276227d0165"; + SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex)); + + assertEquals(expectedEvent, actualEvent); + } + + private static InputStream fromHex(String hex) throws DecoderException { + byte[] b = Hex.decodeHex(hex); + return new ByteArrayInputStream(b); + } +} diff --git a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java index 29769526d248..749086bac435 100644 --- a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java +++ b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import com.google.gson.JsonObject; import org.junit.Test; /** Unit tests for {@link SplunkEvent} class. */ @@ -34,6 +35,8 @@ public void testEquals() { String source = "test-source"; String sourceType = "test-source-type"; Long time = 123456789L; + JsonObject fields = new JsonObject(); + fields.addProperty("test-key", "test-value"); SplunkEvent actualEvent = SplunkEvent.newBuilder() @@ -43,6 +46,7 @@ public void testEquals() { .withSource(source) .withSourceType(sourceType) .withTime(time) + .withFields(fields) .create(); assertEquals( @@ -53,6 +57,7 @@ public void testEquals() { .withSource(source) .withSourceType(sourceType) .withTime(time) + .withFields(fields) .create(), actualEvent); diff --git a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java index 3633844ab6d2..f4d8c1a5e137 100644 --- a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java +++ b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.splunk; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -62,6 +63,21 @@ public static void setup() { private MockServerClient mockServerClient; + @Test + public void testMissingURLProtocol() { + assertFalse(SplunkEventWriter.isValidUrlFormat("test-url")); + } + + @Test + public void testInvalidURL() { + assertFalse(SplunkEventWriter.isValidUrlFormat("http://1.2.3")); + } + + @Test + public void testValidURL() { + assertTrue(SplunkEventWriter.isValidUrlFormat("http://test-url")); + } + @Test public void eventWriterMissingURL() { @@ -71,13 +87,51 @@ public void eventWriterMissingURL() { assertTrue(thrown.getMessage().contains("url needs to be provided")); } + @Test + public void eventWriterMissingURLProtocol() { + Exception thrown = + assertThrows( + IllegalArgumentException.class, + () -> SplunkEventWriter.newBuilder().withUrl("test-url").build()); + + assertTrue(thrown.getMessage().contains(SplunkEventWriter.INVALID_URL_FORMAT_MESSAGE)); + } + + /** Test building {@link SplunkEventWriter} with an invalid URL. */ + @Test + public void eventWriterInvalidURL() { + Exception thrown = + assertThrows( + IllegalArgumentException.class, + () -> SplunkEventWriter.newBuilder().withUrl("http://1.2.3").build()); + + assertTrue(thrown.getMessage().contains(SplunkEventWriter.INVALID_URL_FORMAT_MESSAGE)); + } + + /** + * Test building {@link SplunkEventWriter} with the 'services/collector/event' path appended to + * the URL. + */ + @Test + public void eventWriterFullEndpoint() { + Exception thrown = + assertThrows( + IllegalArgumentException.class, + () -> + SplunkEventWriter.newBuilder() + .withUrl("http://test-url:8088/services/collector/event") + .build()); + + assertTrue(thrown.getMessage().contains(SplunkEventWriter.INVALID_URL_FORMAT_MESSAGE)); + } + @Test public void eventWriterMissingToken() { Exception thrown = assertThrows( NullPointerException.class, - () -> SplunkEventWriter.newBuilder().withUrl("test-url").build()); + () -> SplunkEventWriter.newBuilder().withUrl("http://test-url").build()); assertTrue(thrown.getMessage().contains("token needs to be provided")); } @@ -86,7 +140,7 @@ public void eventWriterMissingToken() { public void eventWriterDefaultBatchCountAndValidation() { SplunkEventWriter writer = - SplunkEventWriter.newBuilder().withUrl("test-url").withToken("test-token").build(); + SplunkEventWriter.newBuilder().withUrl("http://test-url").withToken("test-token").build(); assertNull(writer.inputBatchCount()); assertNull(writer.disableCertificateValidation()); @@ -99,7 +153,7 @@ public void eventWriterCustomBatchCountAndValidation() { Boolean certificateValidation = false; SplunkEventWriter writer = SplunkEventWriter.newBuilder() - .withUrl("test-url") + .withUrl("http://test-url") .withToken("test-token") .withInputBatchCount(StaticValueProvider.of(batchCount)) .withDisableCertificateValidation(StaticValueProvider.of(certificateValidation)) @@ -144,7 +198,6 @@ public void successfulSplunkWriteSingleBatchTest() { PCollection actual = pipeline .apply("Create Input data", Create.of(testEvents)) - // .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), SplunkEventCoder.of()))) .apply( "SplunkEventWriter", ParDo.of( @@ -200,7 +253,6 @@ public void successfulSplunkWriteMultiBatchTest() { PCollection actual = pipeline .apply("Create Input data", Create.of(testEvents)) - // .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), SplunkEventCoder.of()))) .apply( "SplunkEventWriter", ParDo.of( @@ -246,7 +298,6 @@ public void failedSplunkWriteSingleBatchTest() { PCollection actual = pipeline .apply("Create Input data", Create.of(testEvents)) - // .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), SplunkEventCoder.of()))) .apply( "SplunkEventWriter", ParDo.of( diff --git a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java index 32c98513ea24..d2cfd59aace2 100644 --- a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java +++ b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.splunk; +import com.google.gson.JsonObject; import java.util.List; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -63,7 +64,8 @@ public void successfulSplunkIOMultiBatchNoParallelismTest() { int testPort = mockServerRule.getPort(); String url = Joiner.on(':').join("http://localhost", testPort); String token = "test-token"; - + JsonObject fields = new JsonObject(); + fields.addProperty("customfield", 1); List testEvents = ImmutableList.of( SplunkEvent.newBuilder() @@ -73,6 +75,7 @@ public void successfulSplunkIOMultiBatchNoParallelismTest() { .withSource("test-source-1") .withSourceType("test-source-type-1") .withTime(12345L) + .withFields(fields) .create(), SplunkEvent.newBuilder() .withEvent("test-event-2") @@ -81,11 +84,12 @@ public void successfulSplunkIOMultiBatchNoParallelismTest() { .withSource("test-source-2") .withSourceType("test-source-type-2") .withTime(12345L) + .withFields(fields) .create()); PCollection actual = pipeline - .apply("Create Input data", Create.of(testEvents)) // .withCoder(SplunkEventCoder.of())) + .apply("Create Input data", Create.of(testEvents)) .apply( "SplunkIO", SplunkIO.write(url, token).withParallelism(1).withBatchCount(testEvents.size())); @@ -132,7 +136,7 @@ public void successfulSplunkIOMultiBatchParallelismTest() { PCollection actual = pipeline - .apply("Create Input data", Create.of(testEvents)) // .withCoder(SplunkEventCoder.of())) + .apply("Create Input data", Create.of(testEvents)) .apply( "SplunkIO", SplunkIO.write(url, token) @@ -182,7 +186,7 @@ public void successfulSplunkIOSingleBatchParallelismTest() { PCollection actual = pipeline - .apply("Create Input data", Create.of(testEvents)) // .withCoder(SplunkEventCoder.of())) + .apply("Create Input data", Create.of(testEvents)) .apply( "SplunkIO", SplunkIO.write(url, token).withParallelism(testParallelism).withBatchCount(1));