Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -191,6 +192,50 @@ public abstract static class Builder {
}
}

/**
* The {@link WritePubsubMessageErrors} class is a transform which can be used to write messages
* which failed processing to an error records table. Each record is saved to the error table is
* enriched with the timestamp of that record and the details of the error including an error
* message and stacktrace for debugging.
*/
@AutoValue
public abstract static class WritePubsubMessageErrors
extends PTransform<PCollection<FailsafeElement<PubsubMessage, String>>, WriteResult> {

public static Builder newBuilder() {
return new AutoValue_ErrorConverters_WritePubsubMessageErrors.Builder();
}

public abstract String getErrorRecordsTable();

public abstract String getErrorRecordsTableSchema();

@Override
public WriteResult expand(
PCollection<FailsafeElement<PubsubMessage, String>> failedRecords) {

return failedRecords
.apply("FailedRecordToTableRow", ParDo.of(new FailedPubsubMessageToTableRowFn()))
.apply(
"WriteFailedRecordsToBigQuery",
BigQueryIO.writeTableRows()
.to(getErrorRecordsTable())
.withJsonSchema(getErrorRecordsTableSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
}

/** Builder for {@link WritePubsubMessageErrors}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setErrorRecordsTable(String errorRecordsTable);

public abstract Builder setErrorRecordsTableSchema(String errorRecordsTableSchema);

public abstract WritePubsubMessageErrors build();
}
}

/**
* The {@link FailedMessageToTableRowFn} converts Kafka message which have failed processing into
* {@link TableRow} objects which can be output to a dead-letter table.
Expand Down Expand Up @@ -237,4 +282,45 @@ public void processElement(ProcessContext context) {
context.output(failedRow);
}
}

/**
* The {@link FailedPubsubMessageToTableRowFn} converts PubSub message which have failed processing into
* {@link TableRow} objects which can be output to a dead-letter table.
*/
public static class FailedPubsubMessageToTableRowFn
extends DoFn<FailsafeElement<PubsubMessage, String>, TableRow> {

/**
* The formatter used to convert timestamps into a BigQuery compatible <a
* href="https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp-type">format</a>.
*/
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");

@ProcessElement
public void processElement(ProcessContext context) {
FailsafeElement<PubsubMessage, String> failsafeElement = context.element();
PubsubMessage pubsubMessage = failsafeElement.getOriginalPayload();
String message =
pubsubMessage.getPayload().length > 0
? new String(pubsubMessage.getPayload())
: pubsubMessage.getAttributeMap().toString();

// Format the timestamp for insertion
String timestamp =
TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC));


// Build the table row
TableRow failedRow =
new TableRow()
.set("timestamp", timestamp)
.set("errorMessage", failsafeElement.getErrorMessage())
.set("stacktrace", failsafeElement.getStacktrace())
.set("payloadString", message)
.set("payloadBytes", message.getBytes(StandardCharsets.UTF_8));

context.output(failedRow);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ public static <T> Builder<T> newBuilder() {
return new AutoValue_JavascriptTextTransformer_FailsafeJavascriptUdf.Builder<>();
}

private Counter successCounter = Metrics.counter(FailsafeJavascriptUdf.class, "SuccessFailsafeUdfCounter");
private Counter successCounter = Metrics.counter(FailsafeJavascriptUdf.class, "udf-transform-success-count");

private Counter failedCounter = Metrics.counter(FailsafeJavascriptUdf.class, "FailedFailsafeUdfCounter");
private Counter failedCounter = Metrics.counter(FailsafeJavascriptUdf.class, "udf-transform-failed-count");

/** Builder for {@link FailsafeJavascriptUdf}. */
@AutoValue.Builder
Expand Down
1 change: 1 addition & 0 deletions v2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@
<module>common</module>
<module>csv-to-elasticsearch</module>
<module>bigquery-to-elasticsearch</module>
<module>pubsub-to-elasticsearch</module>
</modules>

</project>
114 changes: 114 additions & 0 deletions v2/pubsub-to-elasticsearch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# PubSub to Elasticsearch Dataflow Template

The [PubSubToElasticsearch](src/main/java/com/google/cloud/teleport/v2/templates/PubSubToElasticsearch.java) pipeline
ingests data from a PubSub subscription, optionally applies a Javascript UDF if supplied and writes the data to Elasticsearch.

## Getting Started

### Requirements
* Java 8
* Maven
* PubSub Subscription exists
* Elasticsearch host(s) exists and is operational

### Building Template
This is a Flex Template meaning that the pipeline code will be containerized and the container will be
used to launch the Dataflow pipeline.

#### Building Container Image
* Set environment variables.
```sh
export PROJECT=my-project
export IMAGE_NAME=my-image-name
export BUCKET_NAME=gs://<bucket-name>
export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java8-template-launcher-base
export BASE_CONTAINER_IMAGE_VERSION=latest
export APP_ROOT=/template/<template-class>
export COMMAND_SPEC=${APP_ROOT}/resources/csv-to-elasticsearch-command-spec.json
export NODE_ADDRESSES=comma-separated-list-nodes
export SUBSCRIPTION=my-subscription
export INDEX=my-index
export DOCUMENT_TYPE=my-type
export DEADLETTER_TABLE=my-project:my-dataset.my-deadletter-table
```

* Build and push image to Google Container Repository

```sh
mvn clean package \
-Dimage=${TARGET_GCR_IMAGE} \
-Dbase-container-image=${BASE_CONTAINER_IMAGE} \
-Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
-Dapp-root=${APP_ROOT} \
-Dcommand-spec=${COMMAND_SPEC}
```

#### Creating Image Spec

* Create file in Cloud Storage with path to container image in Google Container Repository.
```json
{
"docker_template_spec": {
"docker_image": "gcr.io/project/my-image-name"
}
}
```

### Testing Template

The template unit tests can be run using:
```sh
mvn test
```

### Executing Template

The template requires the following parameters:
* nodeAddresses: List of Elasticsearch nodes to connect to, ex: http://my-node1,http://my-node2
* index: The index toward which the requests will be issued, ex: my-index
* documentType: The document type toward which the requests will be issued, ex: my-document-type
* inputSubscription: PubSub subscription to read from, ex: projects/my-project/subscriptions/my-subscription
* deadletterTable: Deadletter table for failed inserts in form: project-id:dataset.table

The template has the following optional parameters:
* batchSize: Batch size in number of documents. Default: 1000
* batchSizeBytes: Batch size in number of bytes. Default: 5242880 (5mb)
* javascriptTextTransformGcsPath: Gcs path to javascript udf source. Udf will be preferred option for transformation if supplied. Default: null
* javascriptTextTransformFunctionName: UDF Javascript Function Name. Default: null
* maxRetryAttempts: Max retry attempts, must be > 0. Default: no retries
* maxRetryDuration: Max retry duration in milliseconds, must be > 0. Default: no retries
* usePartialUpdates: Set to true to issue partial updates. Default: false
* idFnPath: Path to javascript file containing function to extract Id from document, ex: gs://path/to/idFn.js. Default: null
* idFnName: Name of javascript function to extract Id from document. Default: null
* indexFnPath: Path to javascript file containing function to extract Index from document, ex: gs://path/to/indexFn.js. Default: null
* indexFnName: Name of javascript function to extract Index from document. Default: null
* Will override index provided.
* typeFnPath: Path to javascript file containing function to extract Type from document, ex: gs://path/to/typeFn.js. Default: null
* typeFnName: Name of javascript function to extract Type from document. Default: null
* Will override type provided.

Template can be executed using the following API call:
```sh
API_ROOT_URL="https://dataflow.googleapis.com"
TEMPLATES_LAUNCH_API="${API_ROOT_URL}/v1b3/projects/${PROJECT_NAME}/templates:launch"
JOB_NAME="pubsub-to-elasticsearch-`date +%Y%m%d-%H%M%S-%N`"
time curl -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
"${TEMPLATES_LAUNCH_API}"`\
`"?validateOnly=false"` \
`"&dynamicTemplate.gcsPath=gs://${BUCKET_NAME}/pubsub-to-elasticsearch-image-spec.json"` \
`"&dynamicTemplate.stagingLocation=gs://${BUCKET_NAME}/staging" \
-d
'{
"jobName":"$JOB_NAME",
"parameters": {
"inputSubscription":"'SUBSCRIPTION'",
"nodeAddresses":"'$NODE_ADDRESSES'",
"index":"'$INDEX'",
"documentType":"'$DOCUMENT_TYPE'",
"deadletterTable":"'$DEADLETTER_TABLE'"
}
}'
```
62 changes: 62 additions & 0 deletions v2/pubsub-to-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~ Copyright (C) 2019 Google Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.google.cloud.teleport.v2</groupId>
<artifactId>dynamic-templates</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>pubsub-to-elasticsearch</artifactId>

<description>
Stream data from Google PubSub to a hosted Elasticsearch Server
</description>

<dependencies>
<dependency>
<groupId>com.google.cloud.teleport.v2</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

<build>
<directory>${mvn-target-dir}</directory>
<plugins>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

Loading