Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions v2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@
<module>kafka-to-gcs</module>
<module>pubsub-to-elasticsearch</module>
<module>file-format-conversion</module>
<module>pubsub-to-mongodb</module>
</modules>

</project>
113 changes: 113 additions & 0 deletions v2/pubsub-to-mongodb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# PubSub to MongoDB Dataflow Template

The [PubSubToMongoDB](src/main/java/com/google/cloud/teleport/v2/templates/PubSubToMongoDB.java) pipeline
ingests data from a PubSub subscription, optionally applies a Javascript UDF if supplied and writes the data to MongoDB.

## Getting Started

### Requirements
* Java 8
* Maven
* PubSub Subscription exists
* MongoDB host exists and is operational

### Building Template
This is a Flex Template meaning that the pipeline code will be containerized and the container will be
used to launch the Dataflow pipeline.

#### Building Container Image
* Set environment variables.
Set the pipeline vars
```sh
API_ROOT_URL=https://dataflow.googleapis.com
PROJECT_NAME=my-project
BUCKET_NAME=my-bucket
INPUT_SUBSCRIPTION=my-subscription
TEMPLATES_LAUNCH_API="${API_ROOT_URL}/v1b3/projects/${PROJECT_NAME}/templates:launch"
MONGODB_DATABASE_NAME=testdb
MONGODB_HOSTNAME=my-host:port
MONGODB_COLLECTION_NAME=testCollection
DEADLETTERTABLE=project:deadletter_table_name
```
* Set containerization vars
```sh
IMAGE_NAME=my-image-name
TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
BASE_CONTAINER_IMAGE=my-base-container-image
BASE_CONTAINER_IMAGE_VERSION=my-base-container-image-version
APP_ROOT=/path/to/app-root
COMMAND_SPEC=/path/to/command-spec
```

* Build and push image to Google Container Repository
```sh
mvn clean package \
-Dimage=${TARGET_GCR_IMAGE} \
-Dbase-container-image=${BASE_CONTAINER_IMAGE} \
-Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
-Dapp-root=${APP_ROOT} \
-Dcommand-spec=${COMMAND_SPEC}
```

#### Creating Image Spec

* Create file in Cloud Storage with path to container image in Google Container Repository.
```json
{
"docker_template_spec": {
"docker_image": "gcr.io/project/my-image-name"
}
}
```

### Testing Template

The template unit tests can be run using:
```sh
mvn test
```

### Executing Template

The template requires the following parameters:
* mongoDBUri: List of MongoDB node to connect to, ex: my-node1:port
* database: The database in mongoDB where the collection exists, ex: my-db
* collection: The collection in mongoDB database to put the documents to, ex: my-collection
* inputSubscription: PubSub subscription to read from, ex: projects/my-project/subscriptions/my-subscription
* deadletterTable: Deadletter table for failed inserts in form: project-id:dataset.table

The template has the following optional parameters:
* batchSize: Batch size in number of documents. Default: 1024
* batchSizeBytes: Batch size in number of bytes. Default: 5242880 (5mb)
* javascriptTextTransformGcsPath: Gcs path to javascript udf source. Udf will be preferred option for transformation if supplied. Default: null
* javascriptTextTransformFunctionName: UDF Javascript Function Name. Default: null
* maxConnectionIdleTime: Maximum Connection idle time e.g 10000. Default: 60000
* sslEnabled: Specify if SSL is enabled. Default:false
* ignoreSSLCertificate: Specify whether to ignore SSL certificate. Default: false
* withOrdered: Enable ordered nulk insertions. Default: true
* withSSLInvalidHostnameAllowed: Enable InvalidHostnameAllowed for SSL Connection. Default:false
* Will override type provided.

Template can be executed using the following API call:
```sh
API_ROOT_URL="https://dataflow.googleapis.com"
TEMPLATES_LAUNCH_API="${API_ROOT_URL}/v1b3/projects/${PROJECT_NAME}/templates:launch"
JOB_NAME="pubsub-to-mongodb-`date +%Y%m%d-%H%M%S-%N`"
time curl -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
"${TEMPLATES_LAUNCH_API}"`\
`"?validateOnly=false"` \
`"&dynamicTemplate.gcsPath=gs://${BUCKET_NAME}/pubsub-to-mongodb-image-spec.json"` \
`"&dynamicTemplate.stagingLocation=gs://${BUCKET_NAME}/staging" \
-d'{
"jobName":"'$JOB_NAME'",
"parameters": {
"inputSubscription":"'$INPUT_SUBSCRIPTION'",
"database":"'$MONGODB_DATABASE_NAME'",
"collection":"'$MONGODB_COLLECTION_NAME'",
"mongoDBUri":"'$MONGODB_HOSTNAME'",
"deadletterTable":"'$DEADLETTERTABLE'"
}
}'
```
73 changes: 73 additions & 0 deletions v2/pubsub-to-mongodb/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~ Copyright (C) 2019 Google Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->


<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.google.cloud.teleport.v2</groupId>
<artifactId>dynamic-templates</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>pubsub-to-mongodb</artifactId>

<name>PubSub To MongoDB</name>
<description>
Stream data from Google PubSub to a hosted MongoDB Database
</description>

<properties>
<checkstyle.version>8.7</checkstyle.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-mongodb</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.teleport.v2</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
<directory>${mvn-target-dir}</directory>
<plugins>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Loading