Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more storage samples for the cloud client libraries. #432

Merged
merged 4 commits into from
Jul 29, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adding a few more samples.
Change-Id: I0deadd241534cf7c45137182a5d0103f14074f0f
  • Loading branch information
Jon Wayne Parrott committed Jul 29, 2016
commit 4608ac46a3797e7f22c9d5e7b7cee148e5616587
2 changes: 1 addition & 1 deletion storage/cloud-client/encryption_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_upload_encrypted_blob(cloud_config):
def test_blob(cloud_config):
"""Provides a pre-existing blob in the test bucket."""
bucket = storage.Client().bucket(cloud_config.storage_bucket)
blob = bucket.blob('encrption_test_sigil')
blob = bucket.blob('encryption_test_sigil')
content = 'Hello, is it me you\'re looking for?'
blob.upload_from_string(
content,
Expand Down
66 changes: 66 additions & 0 deletions storage/cloud-client/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""

import argparse
import datetime

from gcloud import storage

Expand Down Expand Up @@ -126,6 +127,33 @@ def delete_blob(bucket_name, blob_name):
print('Blob {} deleted.'.format(blob_name))


def blob_metadata(bucket_name, blob_name):
"""Prints out a blob's metadata."""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)

print('Blob: {}'.format(blob.name))
print('Bucket: {}'.format(blob.bucket.name))
print('Storage class: {}'.format(blob.storage_class))
print('ID: {}'.format(blob.id))
print('Size: {} bytes'.format(blob.size))
print('Updated: {}'.format(blob.updated))
print('Generation: {}'.format(blob.generation))
print('Metageneration: {}'.format(blob.metageneration))
print('Etag: {}'.format(blob.etag))
print('Owner: {}'.format(blob.owner))
print('Component count: {}'.format(blob.component_count))
print('Crc32c: {}'.format(blob.crc32c))
print('md5_hash: {}'.format(blob.md5_hash))
print('Cache-control: {}'.format(blob.cache_control))
print('Content-type: {}'.format(blob.content_type))
print('Content-disposition: {}'.format(blob.content_disposition))
print('Content-encoding: {}'.format(blob.content_encoding))
print('Content-language: {}'.format(blob.content_language))
print('Metadata: {}'.format(blob.metadata))


def make_blob_public(bucket_name, blob_name):
"""Makes a blob publicly accessible."""
storage_client = storage.Client()
Expand All @@ -138,6 +166,26 @@ def make_blob_public(bucket_name, blob_name):
blob.name, blob.public_url))


def generate_signed_url(bucket_name, blob_name):
"""Generates a signed URL for a blob.

Note that this method requires a service account key file. You can not use
this if you are using Application Default Credentials from Google Compute
Engine or from the Google Cloud SDK.
"""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(blob_name)

url = blob.generate_signed_url(
# This URL is valid for 1 hour
expiration=datetime.timedelta(hours=1),
# Allow GET requests using this URL.
method='GET')

print('The signed url for {} is {}'.format(blob.name, url))


def rename_blob(bucket_name, blob_name, new_name):
"""Renames a blob."""
storage_client = storage.Client()
Expand Down Expand Up @@ -193,6 +241,18 @@ def copy_blob(bucket_name, blob_name, new_bucket_name, new_blob_name):
delete_parser = subparsers.add_parser('delete', help=delete_blob.__doc__)
delete_parser.add_argument('blob_name')

metadata_parser = subparsers.add_parser(
'metadata', help=blob_metadata.__doc__)
metadata_parser.add_argument('blob_name')

make_public_parser = subparsers.add_parser(
'make-public', help=make_blob_public.__doc__)
make_public_parser.add_argument('blob_name')

signed_url_parser = subparsers.add_parser(
'signed-url', help=generate_signed_url.__doc__)
signed_url_parser.add_argument('blob_name')

rename_parser = subparsers.add_parser('rename', help=rename_blob.__doc__)
rename_parser.add_argument('blob_name')
rename_parser.add_argument('new_name')
Expand Down Expand Up @@ -224,6 +284,12 @@ def copy_blob(bucket_name, blob_name, new_bucket_name, new_blob_name):
args.destination_file_name)
elif args.command == 'delete':
delete_blob(args.bucket_name, args.blob_name)
elif args.command == 'metadata':
blob_metadata(args.bucket_name, args.blob_name)
elif args.command == 'make-public':
make_blob_public(args.bucket_name, args.blob_name)
elif args.command == 'signed-url':
generate_signed_url(args.bucket_name, args.blob_name)
elif args.command == 'rename':
rename_blob(args.bucket_name, args.blob_name, args.new_name)
elif args.command == 'copy':
Expand Down
18 changes: 18 additions & 0 deletions storage/cloud-client/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def test_download_blob(test_blob, cloud_config):
assert dest_file.read()


def test_blob_metadata(test_blob, cloud_config, capsys):
snippets.blob_metadata(cloud_config.storage_bucket, test_blob.name)
out, _ = capsys.readouterr()
assert test_blob.name in out


def test_delete_blob(test_blob, cloud_config):
snippets.delete_blob(
cloud_config.storage_bucket,
Expand All @@ -79,6 +85,18 @@ def test_make_blob_public(test_blob, cloud_config):
assert r.text == 'Hello, is it me you\'re looking for?'


def test_generate_signed_url(test_blob, cloud_config, capsys):
snippets.generate_signed_url(
cloud_config.storage_bucket,
test_blob.name)

out, _ = capsys.readouterr()
url = out.rsplit().pop()

r = requests.get(url)
assert r.text == 'Hello, is it me you\'re looking for?'


def test_rename_blob(test_blob, cloud_config):
bucket = storage.Client().bucket(cloud_config.storage_bucket)

Expand Down