forked from GoogleCloudPlatform/python-docs-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsnippets_test.py
More file actions
179 lines (145 loc) · 6.15 KB
/
snippets_test.py
File metadata and controls
179 lines (145 loc) · 6.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/env python
# Copyright 2017 Google, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import os
import random
import string
import googleapiclient.discovery
import snippets
PROJECT = os.environ['GCLOUD_PROJECT']
# Your Google Cloud Platform Key Location
LOCATION = 'global'
# Your Google Cloud Platform KeyRing name
KEY_RING = ''.join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(12))
# Your Google Cloud Platform CryptoKey name
CRYPTO_KEY = ''.join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(12))
# Your Google Cloud Platform CryptoKeyVersion name
VERSION = 1
# A member to add to our IAM policy
MEMBER = 'user:ryanmats@google.com'
# The role we want our new member to have for our IAM policy
ROLE = 'roles/owner'
def test_create_key_ring(capsys):
snippets.create_key_ring(PROJECT, LOCATION, KEY_RING)
out, _ = capsys.readouterr()
expected = 'Created KeyRing projects/{}/locations/{}/keyRings/{}.'.format(
PROJECT, LOCATION, KEY_RING)
assert expected in out
def test_create_crypto_key(capsys):
snippets.create_crypto_key(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY)
out, _ = capsys.readouterr()
expected = (
'Created CryptoKey projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}.'
.format(PROJECT, LOCATION, KEY_RING, CRYPTO_KEY))
assert expected in out
def test_encrypt_decrypt(capsys, tmpdir):
# Write to a plaintext file.
tmpdir.join('in.txt').write('SampleText')
# Construct temporary files.
plaintext_file = tmpdir.join('in.txt')
encrypted_file = tmpdir.join('out.txt')
decrypted_file = tmpdir.join('out2.txt')
# Encrypt text and then decrypt it.
snippets.encrypt(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY,
str(plaintext_file), str(encrypted_file))
snippets.decrypt(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY,
str(encrypted_file), str(decrypted_file))
# Make sure the decrypted text matches the original text.
decrypted_text = decrypted_file.read()
assert decrypted_text == 'SampleText'
# Make sure other output is as expected.
out, _ = capsys.readouterr()
assert 'Saved ciphertext to {}.'.format(str(encrypted_file)) in out
assert 'Saved plaintext to {}.'.format(str(decrypted_file)) in out
def test_disable_crypto_key_version(capsys):
snippets.disable_crypto_key_version(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION)
out, _ = capsys.readouterr()
expected = (
'CryptoKeyVersion projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/'
'cryptoKeyVersions/{}\'s state has been set to {}.'
.format(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION,
'DISABLED'))
assert expected in out
def test_enable_crypto_key_version(capsys):
snippets.enable_crypto_key_version(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION)
out, _ = capsys.readouterr()
expected = (
'CryptoKeyVersion projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/'
'cryptoKeyVersions/{}\'s state has been set to {}.'
.format(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION,
'ENABLED'))
assert expected in out
def test_destroy_crypto_key_version(capsys):
snippets.destroy_crypto_key_version(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION)
out, _ = capsys.readouterr()
expected = (
'CryptoKeyVersion projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/'
'cryptoKeyVersions/{}\'s state has been set to {}.'
.format(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION,
'DESTROY_SCHEDULED'))
assert expected in out
def test_restore_crypto_key_version(capsys):
snippets.restore_crypto_key_version(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION)
out, _ = capsys.readouterr()
expected = (
'CryptoKeyVersion projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/'
'cryptoKeyVersions/{}\'s state has been set to {}.'
.format(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION,
'DISABLED'))
assert expected in out
def test_add_member_to_crypto_key_policy(capsys):
snippets.add_member_to_crypto_key_policy(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, MEMBER, ROLE)
out, _ = capsys.readouterr()
expected = (
'Member {} added with role {} to policy for CryptoKey {} in KeyRing {}'
.format(MEMBER, ROLE, CRYPTO_KEY, KEY_RING))
assert expected in out
kms_client = googleapiclient.discovery.build('cloudkms', 'v1')
parent = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
PROJECT, LOCATION, KEY_RING, CRYPTO_KEY)
crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys()
policy_request = crypto_keys.getIamPolicy(resource=parent)
policy_response = policy_request.execute()
assert 'bindings' in policy_response.keys()
bindings = policy_response['bindings']
found_member_role_pair = False
for binding in bindings:
if binding['role'] == ROLE:
for user in binding['members']:
if user == MEMBER:
found_member_role_pair = True
assert found_member_role_pair
def test_get_key_ring_policy(capsys):
snippets.get_key_ring_policy(PROJECT, LOCATION, KEY_RING)
out, _ = capsys.readouterr()
expected_roles_exist = (
'Printing IAM policy for resource projects/{}/locations/{}/keyRings/{}'
':'.format(PROJECT, LOCATION, KEY_RING))
expected_no_roles = (
'No roles found for resource projects/{}/locations/{}/keyRings/{}.'
.format(PROJECT, LOCATION, KEY_RING))
assert (expected_roles_exist in out) or (expected_no_roles in out)