Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Anna Epishova committed Apr 11, 2019
1 parent ef408f9 commit 42a1757
Show file tree
Hide file tree
Showing 9 changed files with 975 additions and 0 deletions.
237 changes: 237 additions & 0 deletions examples/iot-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~ Copyright (C) 2018 Google Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.google.cloud.pso</groupId>
<artifactId>IOTElasticsearch</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<inceptionYear>2019</inceptionYear>
<organization>
<name>Google</name>
<url>http://www.google.com/</url>
</organization>


<name>IOTElasticsearch</name>

<properties>
<beam.version>2.8.0</beam.version>
<protobuf.java.version>3.6.0</protobuf.java.version>
<guava.version>25.1-jre</guava.version>
<hbase.version>1.1.5</hbase.version>
<bigtable.version>1.3.0</bigtable.version>
<junit.jupiter.engine.version>5.1.0</junit.jupiter.engine.version>
<mockito-core.version>2.8.9</mockito-core.version>
<hamcrest.version>1.2.1</hamcrest.version>
<truth.version>0.41</truth.version>
<httpcomponents.core.version>4.4.5</httpcomponents.core.version>
<httpcomponents.httpasyncclient.version>4.1.2</httpcomponents.httpasyncclient.version>
<httpcomponents.httpclient.version>4.5.2</httpcomponents.httpclient.version>
<maven-compiler-plugin.version>3.6.2</maven-compiler-plugin.version>
<maven-jar-plugin.version>3.1.0</maven-jar-plugin.version>
<maven-assembly-plugin.version>3.1.0</maven-assembly-plugin.version>
<maven-exec-plugin.version>1.5.0</maven-exec-plugin.version>
<auto-value.version>1.6.2</auto-value.version>
<json.version>20180813</json.version>
<compiler.error.flag>-Werror</compiler.error.flag>
<compiler.default.pkginfo.flag>-Xpkginfo:always</compiler.default.pkginfo.flag>
<compiler.default.exclude>nothing</compiler.default.exclude>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-json-jackson</artifactId>
<version>${beam.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-elasticsearch</artifactId>
<version>${beam.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.java.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-dataflow</artifactId>
<version>${bigtable.version}</version>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>${auto-value.version}</version>
</dependency>

<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${json.version}</version>
</dependency>

<!-- Testing dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.engine.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito-core.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>${truth.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<compilerArgs>
<arg>-Xlint:all</arg>
<arg>${compiler.error.flag}</arg>
<!-- Override options warnings to support cross-compilation -->
<arg>-Xlint:-options</arg>
<!-- Temporary lint overrides, to be removed over time. -->
<arg>-Xlint:-cast</arg>
<arg>-Xlint:-deprecation</arg>
<arg>-Xlint:-processing</arg>
<arg>-Xlint:-rawtypes</arg>
<arg>-Xlint:-serial</arg>
<arg>-Xlint:-try</arg>
<arg>-Xlint:-unchecked</arg>
<arg>-Xlint:-varargs</arg>
<!-- Uncomment the following args to display more warnings. -->
<!-- -Xmaxwarns -->
<!-- 10000 -->
</compilerArgs>
<showWarnings>true</showWarnings>
<!-- Another temp override, to be set to true in due course. -->
<showDeprecation>false</showDeprecation>
</configuration>
<executions>

<execution>
<id>default-compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
<configuration>
<compilerArgs>
<arg>${compiler.default.pkginfo.flag}</arg>
</compilerArgs>
<excludes>
<exclude>${compiler.default.exclude}</exclude>
</excludes>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
<systemProperties>
<systemProperty>
<key>java.util.logging.config.file</key>
<value>logging.properties</value>
</systemProperty>
</systemProperties>
</configuration>
</plugin>

</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.google.cloud.pso;

import com.google.cloud.pso.coders.ErrorMessageCoder;
import com.google.cloud.pso.common.ErrorMessage;
import com.google.cloud.pso.common.ExtractKeyFn;
import com.google.cloud.pso.common.FailSafeValidate;
import com.google.cloud.pso.options.IOTElasticsearchOptions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;

/*
mvn compile exec:java -Dexec.mainClass=com.google.cloud.pso.IOTElasticsearch \
-Dexec.args="--runner=DataflowRunner \
--project=$PROJECT \
--stagingLocation=$GS_STAGING \
--gcpTempLocation=$GS_TMP \
--inputSubscription=$PUBSUB_SUBSCRIPTION \
--rejectionTopic=$PUBSUB_REJECT_TOPIC \
--addresses=$ELASTIC_CLUSTER \
--index=$ELASTIC_INDEX \
--type=$ELASTIC_TYPE \
--idField=$ELASTIC_ID"
*/
public class IOTElasticsearch {
/**
* {@link TupleTag> to tag succesfully validated
* messages.
*/
private static final TupleTag<KV<String, String>> VALIDATION_SUCCESS_TAG =
new TupleTag<KV<String, String>>() {};

/**
* {@link TupleTag> to tag failed messages.
*/
private static final TupleTag<ErrorMessage> FAILURE_TAG = new TupleTag<ErrorMessage>() {};

public static void main(String[] args) {
IOTElasticsearchOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(IOTElasticsearchOptions.class);

run(options);
}

public static PipelineResult run(IOTElasticsearchOptions options) {
Pipeline pipeline = Pipeline.create(options);

// Set the CustomCoder for the ErrorMessage class.
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForClass(ErrorMessage.class, ErrorMessageCoder.of());

String[] addresses =
Lists.newArrayList(Splitter.on(",").trimResults().split(options.getAddresses()))
.toArray(new String[0]);

ElasticsearchIO.ConnectionConfiguration connection =
ElasticsearchIO.ConnectionConfiguration.create(
addresses, options.getIndex(), options.getType());

// Run pipeline
PCollection<String> inputMsgs = pipeline
.apply("ReadPubsubMessages",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));

// Validate messages and tag them with success and failure tags.
PCollectionTuple validated = inputMsgs
.apply(
"Validate Messages",
FailSafeValidate.newBuilder()
.withSuccessTag(VALIDATION_SUCCESS_TAG)
.withFailureTag(FAILURE_TAG)
.withKeyPath(options.getFormattedId())
.build());

validated
.get(VALIDATION_SUCCESS_TAG)
.apply("Extract Json from KV", ParDo.of(new ExtractJson()))
.apply("Write to Elasticsearch Index",
ElasticsearchIO.write()
.withConnectionConfiguration(connection)
.withIdFn(new ExtractKeyFn(options.getFormattedId()))
.withUsePartialUpdate(true));


validated
.get(FAILURE_TAG)
.apply(ParDo.of(new DoFn<ErrorMessage, String>() { // a DoFn as an anonymous inner class instance
@ProcessElement
public void processElement(@Element ErrorMessage message, OutputReceiver<String> out) {
out.output(message.toString());
}
}))
.apply("Write Failed Messages",
PubsubIO.writeStrings().to(options.getRejectionTopic()));

return pipeline.run();
}

static class ExtractJson extends DoFn<KV<String, String>, String> {
@ProcessElement
public void processElement(@Element KV<String, String> element, OutputReceiver<String> out) {
out.output(element.getValue());
}
}
}
Loading

0 comments on commit 42a1757

Please sign in to comment.