Skip to content

Commit

Permalink
Added a notebook with PipelineDP demo(#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym authored Jan 11, 2022
1 parent 3e91a09 commit 93a982b
Showing 1 changed file with 317 additions and 0 deletions.
317 changes: 317 additions & 0 deletions examples/restaurant_visits.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "PipelineDP demo",
"provenance": [],
"collapsed_sections": []
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "markdown",
"source": [
"<table class=\"tfo-notebook-buttons\" align=\"left\">\n",
" <td>\n",
" <a target=\"_blank\" href=\"https://colab.research.google.com/github/OpenMined/PipelineDP/blob/main/examples/restaurant_visits.ipynb\"><img src=\"https://www.tensorflow.org/images/colab_logo_32px.png\" />Run in Google Colab</a>\n",
" </td>\n",
" <td>\n",
" <a target=\"_blank\" href=\"https://github.com/OpenMined/PipelineDP/blob/main/examples/restaurant_visits.ipynb\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" />View source on GitHub</a>\n",
" </td>\n",
"</table>"
],
"metadata": {
"id": "bW1gifIe0pUt"
}
},
{
"cell_type": "code",
"metadata": {
"id": "E8yzpKYNbHTF",
"cellView": "form"
},
"source": [
"#@title Install dependencies and download data\n",
"import os\n",
"os.chdir('/content')\n",
"!git clone https://github.com/OpenMined/PipelineDP.git\n",
"!pip install -r PipelineDP/requirements.dev.txt\n",
"\n",
"import sys\n",
"sys.path.insert(0,'/content/PipelineDP')\n",
"\n",
"#Download restaurant dataset from github\n",
"!wget https://raw.githubusercontent.com/google/differential-privacy/main/examples/go/data/week_data.csv\n",
"\n",
"from IPython.display import clear_output\n",
"clear_output()"
],
"execution_count": 6,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "HB0I_itwba17",
"cellView": "form"
},
"source": [
"#@title Import libraries\n",
"import apache_beam as beam\n",
"from apache_beam.runners.portability import fn_api_runner\n",
"from apache_beam.runners.interactive import interactive_runner\n",
"from apache_beam.runners.interactive.interactive_beam import *\n",
"import pyspark\n",
"from dataclasses import dataclass\n",
"import pipeline_dp\n",
"\n",
"import pandas as pd\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt"
],
"execution_count": 7,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"cellView": "form",
"id": "Mimkjqt9h9gr"
},
"source": [
"#@title Load data to DataFrame\n",
"df = pd.read_csv('week_data.csv')\n",
"df.rename(inplace=True, columns={'VisitorId' : 'user_id', 'Time entered' : 'enter_time', 'Time spent (minutes)' : 'spent_minutes', 'Money spent (euros)' : 'spent_money', 'Day' : 'day'})\n",
"rows = [index_row[1] for index_row in df.iterrows()]"
],
"execution_count": 8,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"The dataset considered in this Colab is a simulated dataset of visits of some restaurant during a 7 day period. Let's have a look at this dataset."
],
"metadata": {
"id": "Do-Kw9uuhk82"
}
},
{
"cell_type": "code",
"metadata": {
"id": "RYlCWuPDiFXM"
},
"source": [
"# View data\n",
"df.head()"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"The goal of this Colab is to show how to compute with DP number of visits and amount of money spent for each day. \n",
"\n",
"It can be computed w/o differential privacy in the following way."
],
"metadata": {
"id": "e2SOjo8qiNnw"
}
},
{
"cell_type": "code",
"metadata": {
"id": "YYOmEroMXCqG"
},
"source": [
"# Computing w/o DP.\n",
"df[['day', 'spent_money']].groupby(by='day').agg({\"spent_money\": [len, np.sum]})"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"PipelineDP can be run on different pipeline frameworks. Currently Apache Beam and Apache Spark are supported. Also it's possible to run w/o any frameworks, locally. \n",
"\n",
"Let us at first see how to run computations locally."
],
"metadata": {
"id": "SMUQhchLwQ0N"
}
},
{
"cell_type": "code",
"metadata": {
"id": "k_Nu7EO3jPI1"
},
"source": [
"#Local demo\n",
"\n",
"# 1. Create PipelineOperations\n",
"# PipelineOperations is an object which encapsulates pipeline framework.\n",
"ops = pipeline_dp.LocalPipelineOperations()\n",
"\n",
"# 2. Set the total privacy budget.\n",
"# BudgetAccountant automatically splits over all DP aggregations.\n",
"# Atm only NaiveBudgetAccountant is implemented. In future some \n",
"# advance budget composition Accountant will be implemented.\n",
"budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1,\n",
" total_delta=1e-6)\n",
"\n",
"# 3. Create a DPEngine instance.\n",
"# DPEngine object performs all DP aggregations.\n",
"dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)\n",
"\n",
"# 4. Specify which DP aggregated metrics to compute.\n",
"# AggregateParams constains specifications of metrics that we would like \n",
"# to compute.\n",
"params = pipeline_dp.AggregateParams(\n",
" noise_kind=pipeline_dp.NoiseKind.LAPLACE,\n",
" metrics=[pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM],\n",
" max_partitions_contributed=7,\n",
" max_contributions_per_partition=2,\n",
" low=0,\n",
" high=100,\n",
" public_partitions=list(range(1, 9)))\n",
"\n",
"# 5. Specify how to extract privacy_id, partition_key and value from an\n",
"# element of movie view collection.\n",
"data_extractors = pipeline_dp.DataExtractors(\n",
" partition_extractor=lambda row: row.day,\n",
" privacy_id_extractor=lambda row: row.user_id,\n",
" value_extractor=lambda row: row.spent_money)\n",
"\n",
"# 6. Build computational graph for aggregation\n",
"# This opertation is lazy.\n",
"# In case if we'd like to compute metrics on different variable\n",
"# we can make multiple calls of aggregate().\n",
"dp_result = dp_engine.aggregate(rows, params, data_extractors)\n",
"\n",
"# 7. Compute budget per each DP operation. \n",
"# This should be called after computation graph is constructed.\n",
"budget_accountant.compute_budgets()\n",
"\n",
"# 8. Run computation.\n",
"# dp_result is iterable, creating list form it, forces pipeline to run.\n",
"dp_result = list(dp_result)\n",
"\n",
"print(dp_result)"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Let's see how to transform code above in order to run it on different frameworks.\n",
"\n",
"At first let's extract the code to a function:"
],
"metadata": {
"id": "3Xi3CItEyqLf"
}
},
{
"cell_type": "code",
"metadata": {
"id": "7QLfcZg7V-Zt"
},
"source": [
"# Framework independent function\n",
"def run_pipeline(data, ops):\n",
" budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1, total_delta=1e-7)\n",
"\n",
" dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)\n",
"\n",
" params = pipeline_dp.AggregateParams(noise_kind = pipeline_dp.NoiseKind.LAPLACE,\n",
" metrics=[pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM],\n",
" max_partitions_contributed=7,\n",
" max_contributions_per_partition=2,\n",
" low=0,\n",
" high=100)\n",
"\n",
" data_extractors = pipeline_dp.DataExtractors(privacy_id_extractor = lambda row: row.user_id,\n",
" partition_extractor = lambda row: row.day,\n",
" value_extractor = lambda row: row.spent_money)\n",
"\n",
" dp_result = dp_engine.aggregate(data, params, data_extractors)\n",
"\n",
" budget_accountant.compute_budgets()\n",
"\n",
" return dp_result"
],
"execution_count": 12,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Let's use `run_pipeline` to get result with local run:"
],
"metadata": {
"id": "4q3VWJ-my4GU"
}
},
{
"cell_type": "code",
"source": [
"# Local demo with run_pipeline\n",
"ops = pipeline_dp.LocalPipelineOperations()\n",
"dp_result = list(run_pipeline(rows, ops))\n",
"print(dp_result)"
],
"metadata": {
"id": "yKWq-3xyzP9k"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Let's use `run_pipeline` to run locally with Apache Beam: "
],
"metadata": {
"id": "IYFm6KZXz9_o"
}
},
{
"cell_type": "code",
"metadata": {
"id": "zmoDdEwWaZk9"
},
"source": [
"# Beam demo\n",
"runner = fn_api_runner.FnApiRunner() # local runner\n",
"\n",
"with beam.Pipeline(runner=runner) as pipeline:\n",
" beam_data = pipeline | beam.Create(rows)\n",
" ops = pipeline_dp.BeamOperations()\n",
" dp_result = run_pipeline(beam_data, ops) \n",
" dp_result | beam.Map(print)\n"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"TODO: create Apache Spark Demo with `run_pipeline`"
],
"metadata": {
"id": "iQIZ_F2ry46e"
}
}
]
}

0 comments on commit 93a982b

Please sign in to comment.