Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IAM handlers to blobs #3311

Merged
merged 4 commits into from
Apr 19, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add IAM methods for blobs.
Closes #1679.
  • Loading branch information
tseaver committed Apr 18, 2017
commit f94c1afa6c5a106ba403b59f306f58db2913cb81
78 changes: 78 additions & 0 deletions storage/google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from google.cloud.credentials import generate_signed_url
from google.cloud.exceptions import NotFound
from google.cloud.exceptions import make_exception
from google.cloud.iam import Policy
from google.cloud.storage._helpers import _PropertyMixin
from google.cloud.storage._helpers import _scalar_property
from google.cloud.storage.acl import ObjectACL
Expand Down Expand Up @@ -794,6 +795,83 @@ def create_resumable_upload_session(

return resumable_upload_session_url

def get_iam_policy(self, client=None):
"""Retrieve the IAM policy for the object.

See:
https://cloud.google.com/storage/docs/json_api/v1/objects/getIamPolicy

:type client: :class:`~google.cloud.storage.client.Client` or
``NoneType``
:param client: Optional. The client to use. If not passed, falls back
to the ``client`` stored on the current object's bucket.

:rtype: :class:`google.cloud.iam.Policy`
:returns: the policy instance, based on the resource returned from
the ``getIamPolicy`` API request.
"""
client = self._require_client(client)
info = client._connection.api_request(
method='GET',
path='%s/iam' % (self.path,),
_target_object=None)
return Policy.from_api_repr(info)

def set_iam_policy(self, policy, client=None):
"""Update the IAM policy for the bucket.

See:
https://cloud.google.com/storage/docs/json_api/v1/objects/setIamPolicy

:type policy: :class:`google.cloud.iam.Policy`
:param policy: policy instance used to update bucket's IAM policy.

:type client: :class:`~google.cloud.storage.client.Client` or
``NoneType``
:param client: Optional. The client to use. If not passed, falls back
to the ``client`` stored on the current bucket.

:rtype: :class:`google.cloud.iam.Policy`
:returns: the policy instance, based on the resource returned from
the ``setIamPolicy`` API request.
"""
client = self._require_client(client)
resource = policy.to_api_repr()
resource['resourceId'] = self.path
info = client._connection.api_request(
method='PUT',
path='%s/iam' % (self.path,),
data=resource,
_target_object=None)
return Policy.from_api_repr(info)

def test_iam_permissions(self, permissions, client=None):
"""API call: test permissions

See:
https://cloud.google.com/storage/docs/json_api/v1/objects/testIamPermissions

:type permissions: list of string
:param permissions: the permissions to check

:type client: :class:`~google.cloud.storage.client.Client` or
``NoneType``
:param client: Optional. The client to use. If not passed, falls back
to the ``client`` stored on the current bucket.

:rtype: list of string
:returns: the permissions returned by the ``testIamPermissions`` API
request.
"""
client = self._require_client(client)
query = {'permissions': permissions}
path = '%s/iam/testPermissions' % (self.path,)
resp = client._connection.api_request(
method='GET',
path=path,
query_params=query)
return resp.get('permissions', [])

def make_public(self, client=None):
"""Make this blob public giving all users read access.

Expand Down
138 changes: 138 additions & 0 deletions storage/tests/unit/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,144 @@ def test_create_resumable_upload_session_args(self):
self.assertEqual(
headers['Origin'], ORIGIN)

def test_get_iam_policy(self):
from six.moves.http_client import OK
from google.cloud.storage.iam import STORAGE_OWNER_ROLE
from google.cloud.storage.iam import STORAGE_EDITOR_ROLE
from google.cloud.storage.iam import STORAGE_VIEWER_ROLE
from google.cloud.iam import Policy

BLOB_NAME = 'blob-name'
PATH = '/b/name/o/%s' % (BLOB_NAME,)
ETAG = 'DEADBEEF'

This comment was marked as spam.

VERSION = 17
OWNER1 = 'user:phred@example.com'
OWNER2 = 'group:cloud-logs@google.com'
EDITOR1 = 'domain:google.com'
EDITOR2 = 'user:phred@example.com'
VIEWER1 = 'serviceAccount:1234-abcdef@service.example.com'
VIEWER2 = 'user:phred@example.com'
RETURNED = {
'resourceId': PATH,
'etag': ETAG,
'version': VERSION,
'bindings': [
{'role': STORAGE_OWNER_ROLE, 'members': [OWNER1, OWNER2]},
{'role': STORAGE_EDITOR_ROLE, 'members': [EDITOR1, EDITOR2]},
{'role': STORAGE_VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]},
],
}
after = ({'status': OK}, RETURNED)
EXPECTED = {
binding['role']: set(binding['members'])
for binding in RETURNED['bindings']}
connection = _Connection(after)
client = _Client(connection)
bucket = _Bucket(client=client)
blob = self._make_one(BLOB_NAME, bucket=bucket)

policy = blob.get_iam_policy()

self.assertIsInstance(policy, Policy)
self.assertEqual(policy.etag, RETURNED['etag'])
self.assertEqual(policy.version, RETURNED['version'])
self.assertEqual(dict(policy), EXPECTED)

kw = connection._requested
self.assertEqual(len(kw), 1)
self.assertEqual(kw[0]['method'], 'GET')
self.assertEqual(kw[0]['path'], '%s/iam' % (PATH,))

def test_set_iam_policy(self):
import operator
from six.moves.http_client import OK
from google.cloud.storage.iam import STORAGE_OWNER_ROLE
from google.cloud.storage.iam import STORAGE_EDITOR_ROLE
from google.cloud.storage.iam import STORAGE_VIEWER_ROLE
from google.cloud.iam import Policy

BLOB_NAME = 'blob-name'
PATH = '/b/name/o/%s' % (BLOB_NAME,)
ETAG = 'DEADBEEF'
VERSION = 17
OWNER1 = 'user:phred@example.com'
OWNER2 = 'group:cloud-logs@google.com'
EDITOR1 = 'domain:google.com'
EDITOR2 = 'user:phred@example.com'
VIEWER1 = 'serviceAccount:1234-abcdef@service.example.com'
VIEWER2 = 'user:phred@example.com'
BINDINGS = [
{'role': STORAGE_OWNER_ROLE, 'members': [OWNER1, OWNER2]},
{'role': STORAGE_EDITOR_ROLE, 'members': [EDITOR1, EDITOR2]},
{'role': STORAGE_VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]},
]
RETURNED = {
'etag': ETAG,
'version': VERSION,
'bindings': BINDINGS,
}
after = ({'status': OK}, RETURNED)
policy = Policy()
for binding in BINDINGS:
policy[binding['role']] = binding['members']

connection = _Connection(after)
client = _Client(connection)
bucket = _Bucket(client=client)
blob = self._make_one(BLOB_NAME, bucket=bucket)

returned = blob.set_iam_policy(policy)

self.assertEqual(returned.etag, ETAG)
self.assertEqual(returned.version, VERSION)
self.assertEqual(dict(returned), dict(policy))

kw = connection._requested
self.assertEqual(len(kw), 1)
self.assertEqual(kw[0]['method'], 'PUT')
self.assertEqual(kw[0]['path'], '%s/iam' % (PATH,))
sent = kw[0]['data']
self.assertEqual(sent['resourceId'], PATH)
self.assertEqual(len(sent['bindings']), len(BINDINGS))
key = operator.itemgetter('role')
for found, expected in zip(
sorted(sent['bindings'], key=key),
sorted(BINDINGS, key=key)):
self.assertEqual(found['role'], expected['role'])
self.assertEqual(
sorted(found['members']), sorted(expected['members']))

def test_test_iam_permissions(self):
from six.moves.http_client import OK
from google.cloud.storage.iam import STORAGE_OBJECTS_LIST
from google.cloud.storage.iam import STORAGE_BUCKETS_GET
from google.cloud.storage.iam import STORAGE_BUCKETS_UPDATE

BLOB_NAME = 'blob-name'
PATH = '/b/name/o/%s' % (BLOB_NAME,)
PERMISSIONS = [
STORAGE_OBJECTS_LIST,
STORAGE_BUCKETS_GET,
STORAGE_BUCKETS_UPDATE,
]
ALLOWED = PERMISSIONS[1:]
RETURNED = {'permissions': ALLOWED}
after = ({'status': OK}, RETURNED)
connection = _Connection(after)
client = _Client(connection)
bucket = _Bucket(client=client)
blob = self._make_one(BLOB_NAME, bucket=bucket)

allowed = blob.test_iam_permissions(PERMISSIONS)

self.assertEqual(allowed, ALLOWED)

kw = connection._requested
self.assertEqual(len(kw), 1)
self.assertEqual(kw[0]['method'], 'GET')
self.assertEqual(kw[0]['path'], '%s/iam/testPermissions' % (PATH,))
self.assertEqual(kw[0]['query_params'], {'permissions': PERMISSIONS})

def test_make_public(self):
from six.moves.http_client import OK
from google.cloud.storage.acl import _ACLEntity
Expand Down