Skip to content

Latest commit

 

History

History

spark-connector

Using Spark SQL Streaming with Pub/Sub Lite

The samples in this directory show how to read messages from and write messages to Pub/Sub Lite from an Apache Spark cluster created with Cloud Dataproc using the Pub/Sub Lite Spark Connector.

Get the connector's uber jar from this public Cloud Storage location. Alternatively, visit this Maven link to download the connector's uber jar. The uber jar has a "with-dependencies" suffix. You will need to include it on the driver and executor classpaths when submitting a Spark job, typically in the --jars flag.

Before you begin

  1. Install the Cloud SDK.

    Note: This is not required in Cloud Shell because Cloud Shell has the Cloud SDK pre-installed.

  2. Create a new Google Cloud project via the New Project page or via the gcloud command line tool.

    export PROJECT_ID=your-google-cloud-project-id
    gcloud projects create $PROJECT_ID

    Or use an existing Google Cloud project.

    export PROJECT_ID=$(gcloud config get-value project)
  3. Enable billing.

  4. Setup the Cloud SDK to your GCP project.

    gcloud init
  5. Enable the APIs: Pub/Sub Lite, Dataproc, Cloud Storage.

  6. Create a Pub/Sub Lite topic and subscription in a supported location.

    export TOPIC_ID=your-topic-id
    export SUBSCRIPTION_ID=your-subscription-id
    export PUBSUBLITE_LOCATION=your-location
    
    gcloud pubsub lite-topics create $TOPIC_ID \
      --location=$PUBSUBLITE_LOCATION \
      --partitions=2 \
      --per-partition-bytes=30GiB
    
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION_ID \
       --location=$PUBSUBLITE_LOCATION \
       --topic=$TOPIC_ID
  7. Create a Cloud Storage bucket.

    export BUCKET_ID=your-gcs-bucket-id
    
    gsutil mb gs://$BUCKET_ID

Python setup

  1. Install Python and virtualenv.

  2. Clone the python-docs-samples repository.

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
  3. Navigate to the sample code directory.

    cd python-docs-samples/pubsublite/spark-connector
  4. Create a virtual environment and activate it.

    python -m venv env
    source env/bin/activate

    Once you are finished with the tutorial, you can deactivate the virtualenv and go back to your global Python environment by running deactivate.

  5. Install the required packages.

    python -m pip install -U -r requirements.txt --use-pep517

    --use-pep517 is needed for pip≥23.1 (setup.py install deprecation) unless you choose to build the Spark JARs and install the source distribution (Spark documentation).

Creating a Spark cluster on Dataproc

  1. Go to Cloud Console for Dataproc.

  2. Go to Clusters, then Create Cluster.

    Note: Choose Dataproc Image Version 1.5 under Versioning for Spark 2.4.8. Choose Dataproc Image Version 2.0 for Spark 3. The latest connector works with Spark 3. See compatibility. Additionally, in Manage security (optional), you must enable the cloud-platform scope for your cluster by checking "Allow API access to all Google Cloud services in the same project" under Project access.

    Here is an equivalent example using a gcloud command, with an additional optional argument to enable component gateway:

    export CLUSTER_ID=your-cluster-id
    export DATAPROC_REGION=your-dataproc-region
    
    gcloud dataproc clusters create $CLUSTER_ID \
      --region $DATAPROC_REGION \
      --image-version 2.0-debian10 \
      --scopes 'https://www.googleapis.com/auth/cloud-platform' \
      --enable-component-gateway

Writing to Pub/Sub Lite

spark_streaming_to_pubsublite_example.py creates a streaming source of consecutive numbers with timestamps for 60 seconds and writes them to a Pub/Sub topic.

To submit a write job:

export PROJECT_NUMBER=$(gcloud projects list --filter="projectId:$PROJECT_ID" --format="value(PROJECT_NUMBER)")

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC_ID

Visit the job URL in the command output or the jobs panel in Cloud Console for Dataproc to monitor the job progress.

You should see INFO logging like the following in the output:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Reading from Pub/Sub Lite

spark_streaming_from_pubsublite_example.py reads messages formatted as dataframe rows from a Pub/Sub subscription and prints them out to the console.

To submit a read job:

gcloud dataproc jobs submit pyspark spark_streaming_from_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION_ID

Here is an example output:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|