Skip to content

Commit

Permalink
Merge pull request #292 from mozilla-services/tools
Browse files Browse the repository at this point in the history
feat: add spanner tools
  • Loading branch information
pjenvey authored Oct 21, 2019
2 parents 8b3bb4e + c085ff8 commit b9cd9ff
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 0 deletions.
11 changes: 11 additions & 0 deletions tools/spanner/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Spanner Tools and Scripts

These tools are supplimental scripts for working with the Google Cloud Platform. Follow [the general installation instructions](https://cloud.google.com/spanner/docs/getting-started/python/), as well as fetch the proper service account credentials file.

Remember, the `GOOGLE_APPLICATION_CREDENTIALS` environment variable should point to the absolute path location of your service account credential file.

e.g.
```bash
GOOGLE_APPLICATION_CREDENTIALS=`pwd`/keys/project-id-service-cred.json venv/bin/python purge_ttl.py
```
See each script for details about funciton and use.
55 changes: 55 additions & 0 deletions tools/spanner/purge_ttl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Purge Expired TTLs
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

import os
from urllib import parse

from google.cloud import spanner

# Change these to match your install.

client = spanner.Client()


def from_env():
try:
url = os.environ.get("SYNC_DATABASE_URL")
if not url:
raise Exception("no url")
purl = parse.urlparse(url)
if purl.scheme == "spanner":
path = purl.path.split("/")
instance_id = path[-3]
database_id = path[-1]
except Exception as e:
# Change these to reflect your Spanner instance install
print("Exception {}".format(e))
instance_id = os.environ.get("INSTANCE_ID", "spanner-test")
database_id = os.environ.get("DATABASE_ID", "sync_stage")
return (instance_id, database_id)


def spanner_read_data(request=None):
(instance_id, database_id) = from_env()
instance = client.instance(instance_id)
database = instance.database(database_id)
outputs = []

outputs.append("For {}:{}".format(instance_id, database_id))
# Delete Batches
query = 'DELETE FROM batches WHERE expiry < CURRENT_TIMESTAMP()'
result = database.execute_partitioned_dml(query)
outputs.append("batches: removed {} rows".format(result))

# Delete BSOs
query = 'DELETE FROM bso WHERE expiry < CURRENT_TIMESTAMP()'
result = database.execute_partitioned_dml(query)
outputs.append("bso: removed {} rows".format(result))
return '\n'.join(outputs)


if __name__ == "__main__":
print(spanner_read_data())
216 changes: 216 additions & 0 deletions tools/spanner/write_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# Preload Spanner Database
#
# Spanner increases efficiency when there is a minimum of 300G of
# data stored. This script preloads a minimal set of data to trigger
# that level of optimization.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import os
from urllib import parse

import random
import string
import uuid
from datetime import datetime, timedelta

import threading

from google.api_core.exceptions import AlreadyExists
from google.cloud import spanner
from google.cloud.spanner_v1 import param_types


# max batch size for this write is 2000, otherwise we run into:
"""google.api_core.exceptions.InvalidArgument: 400 The transaction
contains too many mutations. Insert and update operations count with
the multiplicity of the number of columns they affect. For example,
inserting values into one key column and four non-key columns count as
five mutations total for the insert. Delete and delete range
operations count as one mutation regardless of the number of columns
affected. The total mutation count includes any changes to indexes
that the transaction generates. Please reduce the number of writes, or
use fewer indexes. (Maximum number: 20000)
or
google.api_core.exceptions.ResourceExhausted: 429 Received message
larger than max (1000422248 vs. 104857600)
or
google.api_core.exceptions.InvalidArgument: 400 The transaction
exceeds the maximum total bytes-size that can be handled by
Spanner. Please reduce the size or number of the writes, or use fewer
indexes. (Maximum size: 104857600)
"""
# 1 Batch of 2K records with payload of 25K = 201_168_000B
# so, ~300G would need 2_982_582 batches
BATCH_SIZE = 2000
# Total number of threads to use
THREAD_COUNT = 16
# Number of batches per thread
BATCHES = 186412

# `100` is the bottom limit for reserved collections.
COLL_ID = 100

# The following can trigge OOMs
# PAYLOAD_SIZE = 2500000
# PAYLOAD_SIZE = 1000000
"""
google.api_core.exceptions.InvalidArgument: 400 The transaction exceeds
the maximum total bytes-size that can be handled by Spanner. Please reduce the
size or number of the writes, or use fewer indexes. (Maximum size: 104857600)
"""
# PAYLOAD_SIZE = 50000
PAYLOAD_SIZE = 25000
# fake a base64 like payload. Not strictly neccessary, but may help ML
# routines.
PAYLOAD = ''.join(
random.choice(
string.digits + string.ascii_uppercase + string.ascii_lowercase + "-_="
)
for _ in range(PAYLOAD_SIZE))


def load(instance, db, fxa_uid, fxa_kid, coll_id):
name = threading.current_thread().getName()
spanner_client = spanner.Client()
instance = spanner_client.instance(instance)
db = instance.database(db)
print('{name} Db: {db}'.format(name=name, db=db))
start = datetime.now()

def create_user(txn):
txn.execute_update(
"""\
INSERT INTO user_collections
(fxa_uid, fxa_kid, collection_id, modified)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified)
""",
params=dict(
fxa_uid=fxa_uid,
fxa_kid=fxa_kid,
collection_id=coll_id,
modified=start
),
param_types=dict(
fxa_uid=param_types.STRING,
fxa_kid=param_types.STRING,
collection_id=param_types.INT64,
modified=param_types.TIMESTAMP
)
)

try:
db.run_in_transaction(create_user)
print('{name} Created user (fxa_uid: {uid}, fxa_kid: {kid})'.format(
name=name, uid=fxa_uid, kid=fxa_kid))
except AlreadyExists:
print('{name} Existing user (fxa_uid: {uid}}, fxa_kid: {kid}})'.format(
name=name, uid=fxa_uid, kid=fxa_kid))

# approximately 1892 bytes
rlen = 0

print('{name} Loading..'.format(name=name))
for j in range(BATCHES):
records = []
for i in range(BATCH_SIZE):
# create a record
record = (
fxa_uid,
fxa_kid,
coll_id,
str(uuid.uuid4()),
None,
PAYLOAD,
start,
start + timedelta(days=365 * 5)
)
# determine it's size.
rlen = len(record[1]) * 4
rlen += 64
rlen += len(record[3]) * 4
rlen += 64
rlen += len(record[5]) * 4
rlen += 64
rlen += 64
records.append(record)
with db.batch() as batch:
batch.insert(
table='bso',
columns=(
'fxa_uid',
'fxa_kid',
'collection_id',
'id',
'sortindex',
'payload',
'modified',
'expiry'
),
values=records
)
print(
('{name} Wrote batch {b} of {bb}:'
' {c} records {r} bytes, {t}').format(
name=name,
b=j + 1,
bb=BATCHES,
c=BATCH_SIZE,
r=rlen,
t=datetime.now() - start))
print('{name} Total: {t} (count: {c}, size: {s} in {sec})'.format(
name=name,
t=BATCHES,
c=BATCHES * BATCH_SIZE,
s=BATCHES * BATCH_SIZE * rlen,
sec=datetime.now() - start
))


def from_env():
try:
url = os.environ.get("SYNC_DATABASE_URL")
if not url:
raise Exception("no url")
purl = parse.urlparse(url)
if purl.scheme == "spanner":
path = purl.path.split("/")
instance_id = path[-3]
database_id = path[-1]
except Exception as e:
# Change these to reflect your Spanner instance install
print("Exception {}".format(e))
instance_id = os.environ.get("INSTANCE_ID", "spanner-test")
database_id = os.environ.get("DATABASE_ID", "sync_stage")
return (instance_id, database_id)


def loader():
# Prefix uaids for easy filtering later
# Each loader thread gets it's own fake user to prevent some hotspot
# issues.
(instance_id, database_id) = from_env()
fxa_uid = "DEADBEEF" + uuid.uuid4().hex[8:]
fxa_kid = "{:013d}-{}".format(22, fxa_uid)
name = threading.current_thread().getName()
print("{} -> Loading {} {}".format(name, fxa_uid, fxa_kid))
load(instance_id, database_id, fxa_uid, fxa_kid, COLL_ID)


def main():
for c in range(THREAD_COUNT):
print("Starting thread {}".format(c))
t = threading.Thread(
name="loader_{}".format(c),
target=loader)
t.start()


if __name__ == '__main__':
main()

0 comments on commit b9cd9ff

Please sign in to comment.