Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
363 changes: 363 additions & 0 deletions examples/aga-standalone-bq-spark.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,363 @@
{
"cells": [
{
"cell_type": "markdown",
"source": [
"# BigQuery + Spark + Neo4j AGA\n"
],
"metadata": {
"id": "OlAd6uQkNIU3"
}
},
{
"cell_type": "markdown",
"source": [
"## Setup"
],
"metadata": {
"id": "ACEMXLnpGFuh"
}
},
{
"cell_type": "markdown",
"source": [
"We need to do a little setup before we can run this notebook.\n",
"In order to allow the spark workers connect to our session we need to create a new `NAT` network router that routes the workers traffice to the internet.\n",
"\n",
"```shell\n",
"# 1) Cloud Router\n",
"gcloud compute routers create nat-router --network=YOUR_VPC --region=REGION\n",
"\n",
"# 2) (Optional) reserve a static egress IP to allowlist at the third-party\n",
"gcloud compute addresses create spark-egress --region=REGION\n",
"\n",
"# 3) Cloud NAT config (use static IP if you reserved one)\n",
"gcloud compute routers nats create spark-nat \\\n",
" --router=nat-router --router-region=REGION \\\n",
" --nat-all-subnet-ip-ranges \\\n",
" --auto-allocate-nat-external-ips\n",
"```"
],
"metadata": {
"id": "pQOufV-lEczj"
}
},
{
"cell_type": "code",
"source": [
"%pip install graphdatascience==1.16"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "AcD9BL1hb04_",
"executionInfo": {
"status": "ok",
"timestamp": 1761742505180,
"user_tz": -60,
"elapsed": 5476,
"user": {
"displayName": "",
"userId": ""
}
},
"outputId": "044002a0-292b-4d60-dcc9-3cef2f8d293a"
},
"outputs": [],
"execution_count": null
},
{
"cell_type": "markdown",
"source": [
"## Create a Spark session"
],
"metadata": {
"id": "sBUd0iF5YsbS"
}
},
{
"cell_type": "code",
"source": [
"from google.cloud.dataproc_spark_connect import DataprocSparkSession\n",
"from google.cloud.dataproc_v1 import Session\n",
"\n",
"session = Session()\n",
"session.environment_config.execution_config.subnetwork_uri = \"projects/team-graph-analytics/regions/europe-west2/subnetworks/default\"\n",
"spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()\n",
"spark.addArtifacts(\"graphdatascience==1.16\", pypi=True)"
],
"metadata": {
"id": "p2NN8wwF7YRa",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 177
},
"executionInfo": {
"status": "ok",
"timestamp": 1761742286558,
"user_tz": -60,
"elapsed": 108523,
"user": {
"displayName": "",
"userId": ""
}
},
"outputId": "8e0dfb94-3c64-46bf-cba8-5516591db9c1"
},
"outputs": [],
"execution_count": null
},
{
"cell_type": "markdown",
"source": [
"## Load data\n",
"\n",
"Connect to the Big Query Dataset and make it accessible to PySpark"
],
"metadata": {
"id": "TFrNuh06V3i7"
}
},
{
"cell_type": "code",
"source": [
"# Load data from BigQuery\n",
"trips_table = spark.read.format('bigquery') \\\n",
" .option('table', 'bigquery-public-data.new_york.citibike_trips') \\\n",
" .load()\n",
"trips_table.createOrReplaceTempView('trips')"
],
"metadata": {
"id": "9AyZuQMpZE92",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 46
},
"executionInfo": {
"status": "ok",
"timestamp": 1761742475388,
"user_tz": -60,
"elapsed": 9423,
"user": {
"displayName": "",
"userId": ""
}
},
"outputId": "b6246315-6582-41e3-e61a-9f373ffbb5cd"
},
"outputs": [],
"execution_count": null
},
{
"cell_type": "markdown",
"source": [
"## Creating a session\n",
"\n"
],
"metadata": {
"id": "VCRfM48LZ9px"
}
},
{
"cell_type": "code",
"source": [
"from graphdatascience.session import AuraAPICredentials, GdsSessions, CloudLocation, SessionMemory\n",
"from datetime import timedelta\n",
"\n",
"# you can also use AuraAPICredentials.from_env() to load credentials from environment variables\n",
"api_credentials = AuraAPICredentials(\n",
" client_id=\"\",\n",
" client_secret=\"\",\n",
" # If your account is a member of several project, you must also specify the project ID to use\n",
" project_id=\"\",\n",
")\n",
"\n",
"sessions = GdsSessions(api_credentials=api_credentials)\n",
"\n",
"# Create a GDS session!\n",
"gds = sessions.get_or_create(\n",
" session_name=\"trips\",\n",
" memory=SessionMemory.m_16GB,\n",
" ttl=timedelta(minutes=30),\n",
" cloud_location=CloudLocation(\"gcp\", \"europe-west1\"),\n",
")"
],
"metadata": {
"id": "ioUJZdSjcFJf"
},
"outputs": [],
"execution_count": null
},
{
"cell_type": "markdown",
"source": [
"## Graph projections"
],
"metadata": {
"id": "CeTSJz_JGW16"
}
},
{
"cell_type": "code",
"source": [
"arrow_client = gds._query_runner._query_runner._gds_arrow_client\n",
"arrow_client.create_graph_from_triplets(\"trips\", \"neo4j\")\n"
],
"metadata": {
"id": "amReY8Xactah"
},
"outputs": [],
"execution_count": null
},
{
"cell_type": "code",
"source": [
"import pyarrow\n",
"def upload_batch(iterator):\n",
" for batch in iterator:\n",
" arrow_client.upload_triplets(\"trips\", [batch])\n",
" yield pyarrow.RecordBatch.from_pydict({})"
],
"metadata": {
"id": "88-t13CxkkkE"
},
"outputs": [],
"execution_count": null
},
{
"cell_type": "code",
"source": [
"# Total number of sales broken down by product in descending order\n",
"spark.sql(\"\"\"\n",
" SELECT start_station_id AS sourceNode, end_station_id AS targetNode FROM trips LIMIT 10000000\n",
"\"\"\").mapInArrow(upload_batch, \"\").show()"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 140
},
"id": "yPnxlpyFlrHe",
"executionInfo": {
"status": "ok",
"timestamp": 1761743727293,
"user_tz": -60,
"elapsed": 28772,
"user": {
"displayName": "",
"userId": ""
}
},
"outputId": "e6566086-43c2-4585-9179-5096d3c1f8b2"
},
"outputs": [],
"execution_count": null
},
{
"cell_type": "code",
"source": [
"arrow_client.triplet_load_done(\"trips\")"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "ujdFtl5iE2Gb",
"executionInfo": {
"status": "ok",
"timestamp": 1761743185856,
"user_tz": -60,
"elapsed": 1003,
"user": {
"displayName": "",
"userId": ""
}
},
"outputId": "1dec7c0f-3a62-4eea-d4f5-4738b3a4611c"
},
"outputs": [],
"execution_count": null
},
{
"cell_type": "markdown",
"source": [
"## Running an algorithm"
],
"metadata": {
"id": "3n1JHbbPAXD-"
}
},
{
"cell_type": "code",
"source": [
"from graphdatascience import Graph\n",
"G = gds.graph.get(\"trips\")\n",
"gds.degree.stream(G)"
],
"metadata": {
"id": "UXzIHwHb8PWB",
"executionInfo": {
"status": "ok",
"timestamp": 1761743455379,
"user_tz": -60,
"elapsed": 145,
"user": {
"displayName": "",
"userId": ""
}
},
"colab": {
"base_uri": "https://localhost:8080/",
"height": 680
},
"outputId": "3bc49938-66e7-49dd-8945-228b9f4dd43e"
},
"outputs": [],
"execution_count": null
},
{
"cell_type": "markdown",
"source": [
"# Cleaning up\n",
"\n",
"To clean up all Google Cloud resources used in this project, you can [shut down the Google Cloud project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.\n",
"\n",
"Otherwise, you can delete the individual resources you created in this tutorial:"
],
"metadata": {
"id": "Szt3be79N53A"
}
},
{
"cell_type": "code",
"source": [
"# Stop the Spark session and release all resources\n",
"sessions.delete(session_name=\"trips\")\n",
"spark.stop()"
],
"metadata": {
"id": "CcUvIx6-N7TN"
},
"outputs": [],
"execution_count": null
}
],
"metadata": {
"colab": {
"provenance": [],
"toc_visible": true,
"cell_execution_strategy": "setup",
"name": "BigQuery + Spark + AGA"
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}