Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
boredabdel authored Apr 11, 2020
2 parents f5b2b8d + 32eea81 commit d4df6dd
Show file tree
Hide file tree
Showing 94 changed files with 4,747 additions and 1,246 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ The examples folder contains example solutions across a variety of Google Cloud
* [Dataflow Scala Example: Kafka2Avro](examples/dataflow-scala-kafka2avro) - Example to read objects from Kafka, and persist them encoded in Avro in Google Cloud Storage, using Dataflow with SCIO.
* [Dataflow Streaming Benchmark](examples/dataflow-streaming-benchmark) - Utility to publish randomized fake JSON messages to a Cloud Pub/Sub topic at a configured QPS.
* [Dataflow Template Pipelines](https://github.com/GoogleCloudPlatform/DataflowTemplates) - Pre-implemented Dataflow template pipelines for solving common data tasks on Google Cloud Platform.
* [Dataproc GCS Connector](examples/dataproc-gcs-connector) - Install and test unreleased features on the GCS Connector for Dataproc.
* [Dataproc Persistent History Server for Ephemeral Clusters](examples/dataproc-persistent-history-server) - Example of writing logs from an ephemeral cluster to GCS and using a separate single node cluster to look at Spark and YARN History UIs.
* [Dialoflow Webhook Example](examples/dialogflow-webhook-example) - Webhook example for dialogflow in Python.
* [DLP API Examples](examples/dlp) - Examples of the DLP API usage.
* [GCE Access to Google AdminSDK ](examples/gce-to-adminsdk) - Example to help manage access to Google's AdminSDK using GCE's service account identity
* [Home Appliance Status Monitoring from Smart Power Readings](examples/e2e-home-appliance-status-monitoring) - An end-to-end demo system featuring a suite of Google Cloud Platform products such as IoT Core, ML Engine, BigQuery, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import csv
import logging
import os
import sys

import apache_beam as beam
from apache_beam.io.gcp import bigquery
Expand Down
2 changes: 2 additions & 0 deletions examples/dataproc-gcs-connector/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.terraform
.tfstate*
89 changes: 89 additions & 0 deletions examples/dataproc-gcs-connector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Testing GCS Connector for Dataproc

The Google Cloud Storage connector for Hadoop (HCFS) enables running [Apache Hadoop](http://hadoop.apache.org/) or [Apache Spark](http://spark.apache.org/) jobs directly on data in [GCS](https://cloud.google.com/storage) by implementing the Hadoop FileSystem interface. The connector is developed as an [open source project](https://github.com/GoogleCloudDataproc/hadoop-connectors) and comes preinstalled on Cloud Dataproc clusters.
To install a released version of the connector see the [installing instructions](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/INSTALL.md).

In some cases we want to test features that have not been released but are in the master branch (or a development branch if you are preparing a PR).
The script `build_gcs_connector.sh` will use the steps below to install the GCS Connector for Dataproc for testing purposes from the source code.

### Building the GCS connector for Dataproc on GCP

The script `build_gcs_connector.sh` will install the GCS Connector on a Dataproc cluster on GCP using Terraform. Before running this script, you will need to update the environment variables at the top of the script as shown below:

```bash
PROJECT_ID=gcs-connector
REGION=us-central1
NETWORK_NAME=dataingestion-net
DATAPROC_CLUSTER_NAME=dataproc-cluster
DATAPROC_SUBNET=dataproc-subnet
HADOOP_VERSION=hadoop2-2.2.0
```

You will also need to update the `HADOOP_VERSION` variable at the top of `connectors.sh`, the initialization script for the Dataproc cluster, as shown below:

```bash
HADOOP_VERSION=hadoop2-2.2.0
```

Finally, for testing, you will need to update the two environment variables at the top of `test_gcs_connector.sh` as shown below:

```bash
export YOUR_BUCKET=output-examples
export YOUR_CLUSTER=dataproc-cluster
```

Once all of the above variables are updated, the main script `build_gcs_connector.sh` will build the GCS connector JAR file, upload it to a GCS bucket, create a Dataproc cluster using the GCS connector, and then test the Dataproc cluster. To do this, run:

```bash
./build_gcs_connector.sh
```

The steps below outline the process executed by the script:

### 1. Clone the Hadoop connectors

Clone the Hadoop connector locally from the [GoogleCloudDataproc/hadoop-connectors](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/README.md) GitHub.

### 2. Build the shaded JAR file and upload to GCS

Build the JAR file by running the following commands from the main directory `hadoop-connectors/` of the cloned project:

```bash
# with Hadoop 2 and YARN support:
./mvnw -P hadoop2 clean package
# with Hadoop 3 and YARN support:
./mvnw -P hadoop3 clean package
```

In order to verify test coverage for specific Hadoop version, run the following commands from the main directory `hadoop-connectors/`:

```bash
# with Hadoop 2 and YARN support:
./mvnw -P hadoop2 -P coverage clean verify
# with Hadoop 3 and YARN support:
./mvnw -P hadoop3 -P coverage clean verify
```

The connector JAR file will be found in `hadoop-connectors/gcs/target/`. In order for the connector to work, use the `-shaded.jar` as it contains all the dependencies.

Terraform will create a GCS bucket and upload this JAR to the bucket at `gs://gcs-connector-init_actions/gcs-connector-${HADOOP_VERSION}-shaded.jar` so the Dataproc cluster will be able to access it.

### 3. Provide the Dataproc initialization script (with Terraform)

When creating the [Google Cloud Dataproc](https://cloud.google.com/dataproc) cluster, you can specify the [inititalization action](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/init-actions) to install the specified version of the [GCS connector](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/README.md) on the cluster. The initalization actions for updating the GCS connector for Dataproc is based off of the `connectors.sh` script from the [GoogleCloudDataproc/initalization-actions](https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/connectors) GitHub repository.

The `connectors.sh` file in this repository will be the initialization script for the Dataproc cluster. Terraform will rename this script to `dataproc-init-script.sh` before uploading it to the `gs://gcs-connector-init_actions/` GCS bucket. It will then specify `dataproc-init-script.sh` as an initialization action for the Dataproc cluster.

### 4. Run Terraform

From the main directory of this project `gcs-connector-poc/`, the script will run the following commands to build the Dataproc cluster using the GCS connector.

```bash
cd terraform
terraform init
terraform apply
```

### 6. Test the Dataproc cluster

The script `test_gcs_connector.sh` will test the GCS Connector on your Dataproc cluster. This script will create a table `Names` in [HDFS](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html#Introduction), load data from a public GCS directory into the table, and then write the data from the `Names` table into a new table in the specified bucked (`$YOUR_BUCKET`) in GCS. If the GCS Connector is set up correctly, the job will succeed and the list of sample names from the public directory will be printed.
67 changes: 67 additions & 0 deletions examples/dataproc-gcs-connector/build_gcs_connector.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/bin/bash
# Copyright 2019 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Update variables
PROJECT_ID=gcs-connector
REGION=us-central1
NETWORK_NAME=dataingestion-net
DATAPROC_CLUSTER_NAME=dataproc-cluster
DATAPROC_SUBNET=dataproc-subnet
HADOOP_VERSION=hadoop2-2.2.0

# Use environment variables to set Terraform variables
export TF_VAR_project_id=${PROJECT_ID}
export TF_VAR_region=${REGION}
export TF_VAR_network_name=${NETWORK_NAME}
export TF_VAR_dataproc_cluster=${DATAPROC_CLUSTER_NAME}
export TF_VAR_dataproc_subnet=${DATAPROC_SUBNET}
export TF_VAR_hadoop_version=${HADOOP_VERSION}

echo "Cloning https://github.com/GoogleCloudDataproc/hadoop-connectors"
git clone https://github.com/GoogleCloudDataproc/hadoop-connectors

cd hadoop-connectors || exit

echo "Building JAR file"
if [[ $HADOOP_VERSION == *"hadoop2"* ]]
then
if ! ./mvnw -P hadoop2 clean package
then
echo 'Error building JAR file from https://github.com/GoogleCloudDataproc/hadoop-connectors';
exit
fi
elif [[ $HADOOP_VERSION == *"hadoop3"* ]]
then
if ! ./mvnw -P hadoop3 clean package
then
echo 'Error building JAR file from https://github.com/GoogleCloudDataproc/hadoop-connectors';
exit
fi
else
echo "Unsupported Hadoop version at https://github.com/GoogleCloudDataproc/hadoop-connectors"
fi

cd ..

echo "Running Terraform to build Dataproc cluster"
cd terraform || exit
terraform init
terraform apply -auto-approve

cd ..

echo "Running test script on Dataproc cluster"
chmod u+x test_gcs_connector.sh
./test_gcs_connector.sh
211 changes: 211 additions & 0 deletions examples/dataproc-gcs-connector/connectors.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#!/bin/bash
# Copyright 2019 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Installs any of the Cloud Storage, Hadoop BigQuery and/or Spark BigQuery connectors
# onto a Cloud Dataproc cluster.

set -euxo pipefail

# Update $HADOOP_VERSION
HADOOP_VERSION=hadoop2-2.2.0

readonly VM_CONNECTORS_HADOOP_DIR=/usr/lib/hadoop/lib
readonly VM_CONNECTORS_DATAPROC_DIR=/usr/local/share/google/dataproc/lib

declare -A MIN_CONNECTOR_VERSIONS
MIN_CONNECTOR_VERSIONS=(
["bigquery"]="0.12.0"
["gcs"]="1.8.0"
["spark-bigquery"]="0.12.0-beta")

readonly BIGQUERY_CONNECTOR_VERSION=$(/usr/share/google/get_metadata_value attributes/bigquery-connector-version || true)
readonly GCS_CONNECTOR_VERSION=$(/usr/share/google/get_metadata_value attributes/gcs-connector-version || true)
readonly SPARK_BIGQUERY_CONNECTOR_VERSION=$(/usr/share/google/get_metadata_value attributes/spark-bigquery-connector-version || true)

readonly BIGQUERY_CONNECTOR_URL=$(/usr/share/google/get_metadata_value attributes/bigquery-connector-url || true)
readonly GCS_CONNECTOR_URL=$(/usr/share/google/get_metadata_value attributes/gcs-connector-url || true)
readonly SPARK_BIGQUERY_CONNECTOR_URL=$(/usr/share/google/get_metadata_value attributes/spark-bigquery-connector-url || true)

UPDATED_GCS_CONNECTOR=false

is_worker() {
local role
role="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
if [[ $role != Master ]]; then
return 0
fi
return 1
}

min_version() {
echo -e "$1\n$2" | sort -r -t'.' -n -k1,1 -k2,2 -k3,3 | tail -n1
}

get_connector_url() {
# Connector names have changed as of certain versions.
#
# bigquery + gcs connectors:
# bigquery-connector version < 0.13.5 / gcs-connector version < 1.9.5:
# gs://hadoop-lib/${name}/${name}-connector-${version}-hadoop2.jar
#
# bigquery-connector version >= 0.13.5 / gcs-connector version >= 1.9.5:
# gs://hadoop-lib/${name}/${name}-connector-hadoop2-${version}.jar
#
# spark-bigquery-connector:
# gs://spark-lib/bigquery/${name}-with-dependencies_${scala_version}-${version}.jar
local -r name=$1
local -r version=$2

if [[ $name == spark-bigquery ]]; then
# DATAPROC_VERSION is an environment variable set on the cluster.
# We will use this to determine the appropriate connector to use
# based on the scala version.
if [[ $(min_version "$DATAPROC_VERSION" 1.5) != 1.5 ]]; then
local -r scala_version=2.11
else
local -r scala_version=2.12
fi

local -r jar_name="spark-bigquery-with-dependencies_${scala_version}-${version}.jar"

echo "gs://spark-lib/bigquery/${jar_name}"
return
fi

if [[ $name == gcs && $(min_version "$version" 1.9.5) != 1.9.5 ]] ||
[[ $name == bigquery && $(min_version "$version" 0.13.5) != 0.13.5 ]]; then
local -r jar_name="${name}-connector-${version}-hadoop2.jar"
else
local -r jar_name="${name}-connector-hadoop2-${version}.jar"
fi

echo "gs://hadoop-lib/${name}/${jar_name}"
}

validate_version() {
local name=$1 # connector name: "bigquery", "gcs" or "spark-bigquery"
local version=$2 # connector version
local min_valid_version=${MIN_CONNECTOR_VERSIONS[$name]}
if [[ "$(min_version "$min_valid_version" "$version")" != "$min_valid_version" ]]; then
echo "ERROR: ${name}-connector version should be greater than or equal to $min_valid_version, but was $version"
return 1
fi
}

update_connector_url() {
local -r name=$1
local -r url=$2

if [[ $name == gcs ]]; then
UPDATED_GCS_CONNECTOR=true
fi

if [[ -d ${VM_CONNECTORS_DATAPROC_DIR} ]]; then
local vm_connectors_dir=${VM_CONNECTORS_DATAPROC_DIR}
else
local vm_connectors_dir=${VM_CONNECTORS_HADOOP_DIR}
fi

# Remove old connector if exists
if [[ $name == spark-bigquery ]]; then
find "${vm_connectors_dir}/" -name "${name}*.jar" -delete
else
find "${vm_connectors_dir}/" -name "${name}-connector-*.jar" -delete
fi

# UPDATED this line to pull correct GCS connector
gsutil cp "gs://gcs-connector-init_actions/gcs-connector-${HADOOP_VERSION}-shaded.jar" "${vm_connectors_dir}/"

local -r jar_name=${url##*/}

# Update or create version-less connector link
ln -s -f "${vm_connectors_dir}/${jar_name}" "${vm_connectors_dir}/${name}-connector.jar"
}

update_connector_version() {
local -r name=$1 # connector name: "bigquery", "gcs" or "spark-bigquery"
local -r version=$2 # connector version

# validate new connector version
validate_version "$name" "$version"

local -r connector_url=$(get_connector_url "$name" "$version")

update_connector_url "$name" "$connector_url"
}

update_connector() {
local -r name=$1
local -r version=$2
local -r url=$3

if [[ -n $version && -n $url ]]; then
echo "ERROR: Both, connector version and URL are specified for the same connector"
exit 1
fi

if [[ -n $version ]]; then
update_connector_version "$name" "$version"
fi

if [[ -n $url ]]; then
update_connector_url "$name" "$url"
fi
}

if [[ -z $BIGQUERY_CONNECTOR_VERSION && -z $BIGQUERY_CONNECTOR_URL ]] &&
[[ -z $GCS_CONNECTOR_VERSION && -z $GCS_CONNECTOR_URL ]] &&
[[ -z $SPARK_BIGQUERY_CONNECTOR_VERSION && -z $SPARK_BIGQUERY_CONNECTOR_URL ]]; then
echo "ERROR: None of connector versions or URLs are specified"
exit 1
fi

update_connector "bigquery" "$BIGQUERY_CONNECTOR_VERSION" "$BIGQUERY_CONNECTOR_URL"
update_connector "gcs" "$GCS_CONNECTOR_VERSION" "$GCS_CONNECTOR_URL"
update_connector "spark-bigquery" "$SPARK_BIGQUERY_CONNECTOR_VERSION" "$SPARK_BIGQUERY_CONNECTOR_URL"

if [[ $UPDATED_GCS_CONNECTOR != true ]]; then
echo "GCS connector wasn't updated - no need to restart any services"
exit 0
fi

# Restart YARN NodeManager service on worker nodes so they can pick up updated GCS connector
if is_worker; then
systemctl kill -s KILL hadoop-yarn-nodemanager
fi

# Restarts Dataproc Agent after successful initialization
# WARNING: this function relies on undocumented and not officially supported Dataproc Agent
# "sentinel" files to determine successful Agent initialization and not guaranteed
# to work in the future. Use at your own risk!
restart_dataproc_agent() {
# Because Dataproc Agent should be restarted after initialization, we need to wait until
# it will create a sentinel file that signals initialization competition (success or failure)
while [[ ! -f /var/lib/google/dataproc/has_run_before ]]; do
sleep 1
done
# If Dataproc Agent didn't create a sentinel file that signals initialization
# failure then it means that initialization succeded and it should be restarted
if [[ ! -f /var/lib/google/dataproc/has_failed_before ]]; then
systemctl kill -s KILL google-dataproc-agent
fi
}
export -f restart_dataproc_agent

# Schedule asynchronous Dataproc Agent restart so it will use updated connectors.
# It could not be restarted sycnhronously because Dataproc Agent should be restarted
# after its initialization, including init actions execution, has been completed.
bash -c restart_dataproc_agent &
disown
Loading

0 comments on commit d4df6dd

Please sign in to comment.