From 42a17579534f1adb6d9a143a5d3aa66bae66d40d Mon Sep 17 00:00:00 2001 From: Anna Epishova Date: Thu, 11 Apr 2019 11:29:31 -0400 Subject: [PATCH] Initial commit --- examples/iot-elasticsearch/pom.xml | 237 ++++++++++++++++++ .../google/cloud/pso/IOTElasticsearch.java | 118 +++++++++ .../cloud/pso/coders/ErrorMessageCoder.java | 66 +++++ .../google/cloud/pso/common/ErrorMessage.java | 69 +++++ .../google/cloud/pso/common/ExtractKeyFn.java | 62 +++++ .../cloud/pso/common/FailSafeValidate.java | 142 +++++++++++ .../pso/options/IOTElasticsearchOptions.java | 66 +++++ .../cloud/pso/common/ExtractKeyFnTest.java | 66 +++++ .../pso/common/FailSafeValidateTest.java | 149 +++++++++++ 9 files changed, 975 insertions(+) create mode 100644 examples/iot-elasticsearch/pom.xml create mode 100644 examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/IOTElasticsearch.java create mode 100644 examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/coders/ErrorMessageCoder.java create mode 100644 examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/ErrorMessage.java create mode 100644 examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/ExtractKeyFn.java create mode 100644 examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/FailSafeValidate.java create mode 100644 examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/options/IOTElasticsearchOptions.java create mode 100644 examples/iot-elasticsearch/src/test/java/com/google/cloud/pso/common/ExtractKeyFnTest.java create mode 100644 examples/iot-elasticsearch/src/test/java/com/google/cloud/pso/common/FailSafeValidateTest.java diff --git a/examples/iot-elasticsearch/pom.xml b/examples/iot-elasticsearch/pom.xml new file mode 100644 index 0000000000..0c0155647c --- /dev/null +++ b/examples/iot-elasticsearch/pom.xml @@ -0,0 +1,237 @@ + + + + 4.0.0 + + com.google.cloud.pso + IOTElasticsearch + 1.0.0 + jar + 2019 + + Google + http://www.google.com/ + + + + IOTElasticsearch + + + 2.8.0 + 3.6.0 + 25.1-jre + 1.1.5 + 1.3.0 + 5.1.0 + 2.8.9 + 1.2.1 + 0.41 + 4.4.5 + 4.1.2 + 4.5.2 + 3.6.2 + 3.1.0 + 3.1.0 + 1.5.0 + 1.6.2 + 20180813 + -Werror + -Xpkginfo:always + nothing + 1.8 + + + + + com.googlecode.json-simple + json-simple + 1.1.1 + + + + org.apache.beam + beam-sdks-java-io-google-cloud-platform + ${beam.version} + + + + org.apache.beam + beam-sdks-java-extensions-json-jackson + ${beam.version} + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${beam.version} + + + + org.apache.beam + beam-runners-direct-java + ${beam.version} + + + + org.apache.beam + beam-sdks-java-io-elasticsearch + ${beam.version} + + + + org.apache.hbase + hbase-client + ${hbase.version} + + + + com.google.protobuf + protobuf-java + ${protobuf.java.version} + + + + com.google.guava + guava + ${guava.version} + + + + com.google.cloud.bigtable + bigtable-hbase-dataflow + ${bigtable.version} + + + + com.google.auto.value + auto-value + ${auto-value.version} + + + + org.json + json + ${json.version} + + + + + org.junit.jupiter + junit-jupiter-engine + ${junit.jupiter.engine.version} + test + + + + org.mockito + mockito-core + ${mockito-core.version} + test + + + + org.hamcrest + hamcrest-core + ${hamcrest.version} + test + + + + org.hamcrest + hamcrest-library + ${hamcrest.version} + test + + + + com.google.truth + truth + ${truth.version} + test + + + + + + + + maven-compiler-plugin + ${maven-compiler-plugin.version} + + ${java.version} + ${java.version} + + -Xlint:all + ${compiler.error.flag} + + -Xlint:-options + + -Xlint:-cast + -Xlint:-deprecation + -Xlint:-processing + -Xlint:-rawtypes + -Xlint:-serial + -Xlint:-try + -Xlint:-unchecked + -Xlint:-varargs + + + + + true + + false + + + + + default-compile + + compile + + compile + + + ${compiler.default.pkginfo.flag} + + + ${compiler.default.exclude} + + + + + + + + org.codehaus.mojo + exec-maven-plugin + ${maven-exec-plugin.version} + + false + + + java.util.logging.config.file + logging.properties + + + + + + + + diff --git a/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/IOTElasticsearch.java b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/IOTElasticsearch.java new file mode 100644 index 0000000000..72fbba9d9b --- /dev/null +++ b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/IOTElasticsearch.java @@ -0,0 +1,118 @@ +package com.google.cloud.pso; + +import com.google.cloud.pso.coders.ErrorMessageCoder; +import com.google.cloud.pso.common.ErrorMessage; +import com.google.cloud.pso.common.ExtractKeyFn; +import com.google.cloud.pso.common.FailSafeValidate; +import com.google.cloud.pso.options.IOTElasticsearchOptions; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; + +/* +mvn compile exec:java -Dexec.mainClass=com.google.cloud.pso.IOTElasticsearch \ +-Dexec.args="--runner=DataflowRunner \ +--project=$PROJECT \ +--stagingLocation=$GS_STAGING \ +--gcpTempLocation=$GS_TMP \ +--inputSubscription=$PUBSUB_SUBSCRIPTION \ +--rejectionTopic=$PUBSUB_REJECT_TOPIC \ +--addresses=$ELASTIC_CLUSTER \ +--index=$ELASTIC_INDEX \ +--type=$ELASTIC_TYPE \ +--idField=$ELASTIC_ID" + */ +public class IOTElasticsearch { + /** + * {@link TupleTag> to tag succesfully validated + * messages. + */ + private static final TupleTag> VALIDATION_SUCCESS_TAG = + new TupleTag>() {}; + + /** + * {@link TupleTag> to tag failed messages. + */ + private static final TupleTag FAILURE_TAG = new TupleTag() {}; + + public static void main(String[] args) { + IOTElasticsearchOptions options = + PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(IOTElasticsearchOptions.class); + + run(options); + } + + public static PipelineResult run(IOTElasticsearchOptions options) { + Pipeline pipeline = Pipeline.create(options); + + // Set the CustomCoder for the ErrorMessage class. + CoderRegistry coderRegistry = pipeline.getCoderRegistry(); + coderRegistry.registerCoderForClass(ErrorMessage.class, ErrorMessageCoder.of()); + + String[] addresses = + Lists.newArrayList(Splitter.on(",").trimResults().split(options.getAddresses())) + .toArray(new String[0]); + + ElasticsearchIO.ConnectionConfiguration connection = + ElasticsearchIO.ConnectionConfiguration.create( + addresses, options.getIndex(), options.getType()); + + // Run pipeline + PCollection inputMsgs = pipeline + .apply("ReadPubsubMessages", + PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); + + // Validate messages and tag them with success and failure tags. + PCollectionTuple validated = inputMsgs + .apply( + "Validate Messages", + FailSafeValidate.newBuilder() + .withSuccessTag(VALIDATION_SUCCESS_TAG) + .withFailureTag(FAILURE_TAG) + .withKeyPath(options.getFormattedId()) + .build()); + + validated + .get(VALIDATION_SUCCESS_TAG) + .apply("Extract Json from KV", ParDo.of(new ExtractJson())) + .apply("Write to Elasticsearch Index", + ElasticsearchIO.write() + .withConnectionConfiguration(connection) + .withIdFn(new ExtractKeyFn(options.getFormattedId())) + .withUsePartialUpdate(true)); + + + validated + .get(FAILURE_TAG) + .apply(ParDo.of(new DoFn() { // a DoFn as an anonymous inner class instance + @ProcessElement + public void processElement(@Element ErrorMessage message, OutputReceiver out) { + out.output(message.toString()); + } + })) + .apply("Write Failed Messages", + PubsubIO.writeStrings().to(options.getRejectionTopic())); + + return pipeline.run(); + } + + static class ExtractJson extends DoFn, String> { + @ProcessElement + public void processElement(@Element KV element, OutputReceiver out) { + out.output(element.getValue()); + } + } +} diff --git a/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/coders/ErrorMessageCoder.java b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/coders/ErrorMessageCoder.java new file mode 100644 index 0000000000..eebe2d48e0 --- /dev/null +++ b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/coders/ErrorMessageCoder.java @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.pso.coders; + +import com.google.cloud.pso.common.ErrorMessage; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** {@link org.apache.beam.sdk.coders.Coder} for {@link ErrorMessage} */ +public class ErrorMessageCoder extends CustomCoder { + + private static final NullableCoder STRING_CODER = NullableCoder.of(StringUtf8Coder.of()); + private static final StringUtf8Coder JSON_PAYLOAD_CODER = StringUtf8Coder.of(); + private static final StringUtf8Coder MESSAGE_CODER = StringUtf8Coder.of(); + + private static final ErrorMessageCoder ERROR_MESSAGE_CODER_INSTANCE = new ErrorMessageCoder(); + + private ErrorMessageCoder() {} + + public static ErrorMessageCoder of() { + return ERROR_MESSAGE_CODER_INSTANCE; + } + + @Override + public void encode(ErrorMessage value, OutputStream outStream) throws IOException { + if (value == null) { + throw new CoderException("The ErrorMessageCoder cannot encode a null object!"); + } + + JSON_PAYLOAD_CODER.encode(value.jsonPayload(), outStream); + MESSAGE_CODER.encode(value.errorMessage(), outStream); + STRING_CODER.encode(value.errorStackTrace(), outStream); + } + + @Override + public ErrorMessage decode(InputStream inStream) throws IOException { + + String jsonPayload = JSON_PAYLOAD_CODER.decode(inStream); + String message = MESSAGE_CODER.decode(inStream); + String stackTrace = STRING_CODER.decode(inStream); + + ErrorMessage.Builder builder = + ErrorMessage.newBuilder().withJsonPayload(jsonPayload).withErrorMessage(message); + + return (stackTrace == null) ? builder.build() : builder.withErrorStackTrace(stackTrace).build(); + } +} diff --git a/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/ErrorMessage.java b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/ErrorMessage.java new file mode 100644 index 0000000000..ae90369de5 --- /dev/null +++ b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/ErrorMessage.java @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.pso.common; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * A class to hold messages that fail validation either because the json is not well formed or + * because the key element required to be present within the json is missing. The class encapsulates + * the original payload in addition to a error message and an optional stack trace to help with + * identifying the root cause in a subsequent reprocessing attempt/debugging. + */ +@AutoValue +public abstract class ErrorMessage { + public static Builder newBuilder() { + return new AutoValue_ErrorMessage.Builder(); + } + + public abstract String jsonPayload(); + + public abstract String errorMessage(); + + @Nullable + public abstract String errorStackTrace(); + + @AutoValue.Builder + public abstract static class Builder { + abstract Builder setJsonPayload(String jsonPayload); + + abstract Builder setErrorMessage(String errorMessage); + + abstract Builder setErrorStackTrace(String errorStackTrace); + + public abstract ErrorMessage build(); + + public Builder withJsonPayload(String jsonPayload) { + checkArgument(jsonPayload != null, "withJsonPayload(jsonPayload) called with null value."); + return setJsonPayload(jsonPayload); + } + + public Builder withErrorMessage(String errorMessage) { + checkArgument(errorMessage != null, "withErrorMessage(errorMessage) called with null value."); + return setErrorMessage(errorMessage); + } + + public Builder withErrorStackTrace(String errorStackTrace) { + checkArgument( + errorStackTrace != null, "withErrorStackTrace(errorStackTrace) called with null value."); + return setErrorStackTrace(errorStackTrace); + } + } +} diff --git a/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/ExtractKeyFn.java b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/ExtractKeyFn.java new file mode 100644 index 0000000000..4a2c2ac84f --- /dev/null +++ b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/ExtractKeyFn.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.pso.common; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + +/** + * Implementation of {@link ElasticsearchIO.Write.FieldValueExtractFn} to extract Elasticsearch ID + * field from JSON message. + */ +public class ExtractKeyFn implements ElasticsearchIO.Write.FieldValueExtractFn { + + private String jsonKeyPath; + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final Logger LOG = LoggerFactory.getLogger(ExtractKeyFn.class); + + public ExtractKeyFn(String jsonKeyPath) { + this.jsonKeyPath = jsonKeyPath; + } + + @Override + public String apply(JsonNode input) { + String fieldValue; + if (this.jsonKeyPath.isEmpty()) { + fieldValue = UUID.randomUUID().toString(); + } + else { + try { + JsonNode valueAtPath = input.at(jsonKeyPath); + if (valueAtPath.isMissingNode()) { + throw new RuntimeException("Unable to extract id field: " + jsonKeyPath); + } + fieldValue = MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(valueAtPath); + } catch (JsonProcessingException e) { + LOG.error("Unable to parse json input: " + input.asText()); + throw new RuntimeException(e); + } + } + return fieldValue; + } +} diff --git a/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/FailSafeValidate.java b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/FailSafeValidate.java new file mode 100644 index 0000000000..15eaa65820 --- /dev/null +++ b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/common/FailSafeValidate.java @@ -0,0 +1,142 @@ +/* + * Copyright (C) 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.pso.common; + +import com.fasterxml.jackson.core.JsonPointer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.google.auto.value.AutoValue; +import com.google.common.base.Throwables; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * A {@link PTransform} that validates a {@link PCollection} and performs the following + * validation: 1. The string is a well formed json document. 2. The well formed json document + * contains key element specified as a json pointer The PTransform returns a {@link + * PCollectionTuple} tagged with the {@link TupleTag} provided. + */ +@AutoValue +public abstract class FailSafeValidate extends PTransform, PCollectionTuple> { + + public static Builder newBuilder() { + return new AutoValue_FailSafeValidate.Builder().setObjectReader((new ObjectMapper()).reader()); + } + + abstract TupleTag> successTag(); + + abstract TupleTag failureTag(); + + abstract ObjectReader objectReader(); + + abstract ExtractKeyFn extractKeyFn(); + + @Override + public PCollectionTuple expand(PCollection input) { + return input.apply( + "ValidateAndTagMessages", + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement(ProcessContext context) { + String input = context.element(); + + try { + JsonNode jsonNode = objectReader().readTree(input); + +// if (extractKeyFn().apply() == "") { +// context.output(successTag(), KV.of(key, input)); +// } +// else { + + String key = null; + + try { + key = extractKeyFn().apply(jsonNode); + + } catch (RuntimeException e) { + ErrorMessage em = + ErrorMessage.newBuilder() + .withJsonPayload(input) + .withErrorMessage("Missing key element") + .build(); + context.output(failureTag(), em); + } + + if (key != null) { + context.output(successTag(), KV.of(key, input)); + } + //} + + } catch (IOException e) { + ErrorMessage em = + ErrorMessage.newBuilder() + .withJsonPayload(input) + .withErrorMessage("Invalid Json message") + .withErrorStackTrace(Throwables.getStackTraceAsString(e)) + .build(); + + context.output(failureTag(), em); + } + } + }) + .withOutputTags(successTag(), TupleTagList.of(failureTag()))); + } + + @AutoValue.Builder + public abstract static class Builder { + abstract Builder setSuccessTag(TupleTag> successTag); + + abstract Builder setFailureTag(TupleTag failureTag); + + abstract Builder setObjectReader(ObjectReader objectReader); + + abstract Builder setExtractKeyFn(ExtractKeyFn extractKeyFn); + + public abstract FailSafeValidate build(); + + public Builder withKeyPath(String keyPath) { + checkArgument(keyPath != null, "withKeyPath(keyPath) called with null value."); + try { + JsonPointer.compile(keyPath); + } catch (IllegalArgumentException e) { + throw new RuntimeException(e); + } + return setExtractKeyFn(new ExtractKeyFn(keyPath)); + } + + public Builder withSuccessTag(TupleTag> successTag) { + checkArgument(successTag != null, "withSuccessTag(successTag) called with null value."); + return setSuccessTag(successTag); + } + + public Builder withFailureTag(TupleTag failureTag) { + checkArgument(failureTag != null, "withFailureTag(failureTag) called with null value."); + return setFailureTag(failureTag); + } + } +} \ No newline at end of file diff --git a/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/options/IOTElasticsearchOptions.java b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/options/IOTElasticsearchOptions.java new file mode 100644 index 0000000000..09ee07f0e9 --- /dev/null +++ b/examples/iot-elasticsearch/src/main/java/com/google/cloud/pso/options/IOTElasticsearchOptions.java @@ -0,0 +1,66 @@ +package com.google.cloud.pso.options; + +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.options.*; + + +public interface IOTElasticsearchOptions extends DataflowPipelineOptions { + + @Description("The Cloud Pub/Sub topic to publish rejected messages to. " + + "The name should be in the format of " + + "projects//topics/.") + @Validation.Required + String getRejectionTopic(); + void setRejectionTopic(String rejectionTopic); + + @Description("The Cloud Pub/Sub subscription to consume from. " + + "The name should be in the format of " + + "projects//subscriptions/.") + @Validation.Required + String getInputSubscription(); + void setInputSubscription(String inputSubscription); + + @Description("Elasticsearch cluster address(es).") + @Validation.Required + String getAddresses(); + void setAddresses(String addresses); + + @Description("Name of the Elasticsearch index.") + @Validation.Required + String getIndex(); + void setIndex(String index); + + @Description("Name of the Elasticsearch type.") + @Validation.Required + String getType(); + void setType(String type); + + @Description( + "Field in json message which is used as Elasticseaarch document id. " + + "This field can be nested using / to identify a path in json message. " + + "If no path provided the field is considered as a root node in json message.") + @Default.String("") + String getIdField(); + void setIdField(String idField); + + @Description("Returns formatted idField. " + + "It adds leading / if the user omitted it while specifying idField option.") + @Hidden + @Default.InstanceFactory(FormattedIdFactory.class) + String getFormattedId(); + void setFormattedId(String formattedId); + + /** + * Returns formatted idField. It adds leading / if the user omitted it while specifying idField option. + */ + public static class FormattedIdFactory implements DefaultValueFactory { + @Override + public String create(PipelineOptions options) { + String idField = options.as(IOTElasticsearchOptions.class).getIdField(); + if (!idField.isEmpty() && idField.charAt(0) != '/') { + idField = "/" + idField; + } + return idField; + } + } +} \ No newline at end of file diff --git a/examples/iot-elasticsearch/src/test/java/com/google/cloud/pso/common/ExtractKeyFnTest.java b/examples/iot-elasticsearch/src/test/java/com/google/cloud/pso/common/ExtractKeyFnTest.java new file mode 100644 index 0000000000..c97d277bfd --- /dev/null +++ b/examples/iot-elasticsearch/src/test/java/com/google/cloud/pso/common/ExtractKeyFnTest.java @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.pso.common; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import java.io.IOException; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** Test class for {@link com.google.cloud.pso.common.ExtractKeyFn} */ +@RunWith(JUnit4.class) +public class ExtractKeyFnTest { + + private static final String ID_KEY = "/sku"; + private static final String ID_VALUE = "123"; + private static final String TEST_JSON = "{ \"sku\": 123}"; + private ObjectMapper mapper; + + @Before + public void setup() { + mapper = new ObjectMapper(); + } + + @Test + public void testJsonWithValidId() throws IOException { + ExtractKeyFn extractKeyFn = new ExtractKeyFn(ID_KEY); + JsonNode input = mapper.readTree(TEST_JSON); + String actual = extractKeyFn.apply(input); + assertThat(actual).isEqualTo(ID_VALUE); + } + + @Test + public void testJsonWithInvalidId() throws IOException { + ExtractKeyFn extractKeyFn = new ExtractKeyFn("/invalid"); + JsonNode input = mapper.readTree(TEST_JSON); + + RuntimeException thrown = + assertThrows( + RuntimeException.class, + () -> { + String actual = extractKeyFn.apply(input); + }); + + assertThat(thrown).hasMessageThat().contains("Unable to extract id field: /invalid"); + } +} diff --git a/examples/iot-elasticsearch/src/test/java/com/google/cloud/pso/common/FailSafeValidateTest.java b/examples/iot-elasticsearch/src/test/java/com/google/cloud/pso/common/FailSafeValidateTest.java new file mode 100644 index 0000000000..e79c4bd363 --- /dev/null +++ b/examples/iot-elasticsearch/src/test/java/com/google/cloud/pso/common/FailSafeValidateTest.java @@ -0,0 +1,149 @@ +/* + * Copyright (C) 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.pso.common; + +import com.google.cloud.pso.coders.ErrorMessageCoder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import static com.google.common.truth.Truth.assertThat; + +/** Test class for {@link FailSafeValidate} */ +@RunWith(JUnit4.class) +public class FailSafeValidateTest { + + private static final String VALID_JSON = "{ \"sku\": 123 }"; + private static final String MALFORMED_JSON = "{ \"sku\": 666 "; + private static final String INCORRECT_KEY_JSON = "{ \"xyz\": 123 }"; + + private static final String KEY_PATH = "/sku"; + private static final TupleTag> SUCCESS_TAG = + new TupleTag>() {}; + private static final TupleTag FAILURE_TAG = new TupleTag() {}; + + private static final ErrorMessage MALFORMED_ERROR_MESSAGE = + ErrorMessage.newBuilder() + .withJsonPayload(MALFORMED_JSON) + .withErrorMessage("Invalid Json message") + .build(); + + private static final ErrorMessage INCORRECT_KEY_ERROR_MESSAGE = + ErrorMessage.newBuilder() + .withJsonPayload(INCORRECT_KEY_JSON) + .withErrorMessage("Missing key element") + .build(); + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Before + public void setup() { + pipeline.getCoderRegistry().registerCoderForClass(ErrorMessage.class, ErrorMessageCoder.of()); + } + + @Test + @Category(NeedsRunner.class) + public void failSafeMessagesAreCorrectlyTaggedTest() { + + FailSafeValidate transform = + FailSafeValidate.newBuilder() + .withSuccessTag(SUCCESS_TAG) + .withFailureTag(FAILURE_TAG) + .withKeyPath(KEY_PATH) + .build(); + + PCollectionTuple tuple = pipeline.apply(Create.of(VALID_JSON, MALFORMED_JSON)).apply(transform); + + PCollection> successActual = tuple.get(SUCCESS_TAG); + PCollection failureActual = tuple.get(FAILURE_TAG); + + PAssert.thatSingleton(successActual).isEqualTo(KV.of("123", VALID_JSON)); + PAssert.thatSingleton(failureActual).satisfies(new MalformedErrorMessageCheckFn()); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void onlyBadMessagesTest() { + + FailSafeValidate transform = + FailSafeValidate.newBuilder() + .withSuccessTag(SUCCESS_TAG) + .withFailureTag(FAILURE_TAG) + .withKeyPath(KEY_PATH) + .build(); + + PCollectionTuple tuple = pipeline.apply(Create.of(MALFORMED_JSON)).apply(transform); + + PCollection> successActual = tuple.get(SUCCESS_TAG); + PCollection failureActual = tuple.get(FAILURE_TAG); + + PAssert.that(successActual).empty(); + PAssert.thatSingleton(failureActual).satisfies(new MalformedErrorMessageCheckFn()); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void incorrectKeyTest() { + + FailSafeValidate transform = + FailSafeValidate.newBuilder() + .withSuccessTag(SUCCESS_TAG) + .withFailureTag(FAILURE_TAG) + .withKeyPath("/incorrect") + .build(); + + PCollectionTuple tuple = pipeline.apply(Create.of(INCORRECT_KEY_JSON)).apply(transform); + + PCollection> successActual = tuple.get(SUCCESS_TAG); + PCollection failureActual = tuple.get(FAILURE_TAG); + + PAssert.that(successActual).empty(); + PAssert.thatSingleton(failureActual).isEqualTo(INCORRECT_KEY_ERROR_MESSAGE); + + pipeline.run(); + } + + private static class MalformedErrorMessageCheckFn + implements SerializableFunction { + + @Override + public Void apply(ErrorMessage input) { + + assertThat(input.jsonPayload()).isEqualTo(MALFORMED_ERROR_MESSAGE.jsonPayload()); + assertThat(input.errorMessage()).isEqualTo(MALFORMED_ERROR_MESSAGE.errorMessage()); + assertThat(input.errorStackTrace()) + .contains("com.fasterxml.jackson.core.io.JsonEOFException"); + return null; + } + } +}