Skip to content

Commit

Permalink
[kms] fix flaky test [(#3268)](#3268)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche authored and rsamborski committed Nov 8, 2022
1 parent 3bf98fd commit c8af3e8
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 42 deletions.
2 changes: 2 additions & 0 deletions kms/snippets/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pytest==5.3.2
gcp-devrel-py-tools==0.0.15
google-cloud-core
99 changes: 57 additions & 42 deletions kms/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import time
from os import environ

from google.api_core.exceptions import GoogleAPICallError
from google.api_core.exceptions import Aborted, GoogleAPICallError
from google.cloud import kms_v1
from google.cloud.kms_v1 import enums
from google.iam.v1.policy_pb2 import Policy
Expand All @@ -25,6 +25,8 @@

import snippets

from gcp_devrel.testing import eventually_consistent


def create_key_helper(key_id, purpose, algorithm, t):
try:
Expand All @@ -51,7 +53,7 @@ def setup_module(module):
except GoogleAPICallError:
# keyring already exists
pass
s = create_key_helper(t.symId,
s = create_key_helper(t.sym_id,
enums.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT,
enums.CryptoKeyVersion.CryptoKeyVersionAlgorithm.
GOOGLE_SYMMETRIC_ENCRYPTION,
Expand All @@ -69,9 +71,9 @@ class TestKMSSnippets:
keyring_path = '{}/keyRings/{}'.format(parent, keyring_id)
version = '1'

symId = 'symmetric'
sym_id = 'symmetric'

sym = '{}/cryptoKeys/{}'.format(keyring_path, symId)
sym = '{}/cryptoKeys/{}'.format(keyring_path, sym_id)
sym_version = '{}/cryptoKeyVersions/{}'.format(sym, version)

message = 'test message 123'
Expand All @@ -94,7 +96,7 @@ def test_create_key_ring(self):
@pytest.mark.skip(reason="Deleting keys isn't instant, so we should avoid \
creating a large number of them in our tests")
def test_create_crypto_key(self):
key_id = self.symId + '-test' + str(int(time.time()))
key_id = self.sym_id + '-test' + str(int(time.time()))
snippets.create_crypto_key(self.project_id, self.location,
self.keyring_id, key_id)
c = kms_v1.KeyManagementServiceClient()
Expand All @@ -108,30 +110,30 @@ def test_create_crypto_key(self):
def test_key_change_version_state(self):
client = kms_v1.KeyManagementServiceClient()
name = client.crypto_key_version_path(self.project_id, self.location,
self.keyring_id, self.symId,
self.keyring_id, self.sym_id,
self.version)
state_enum = enums.CryptoKeyVersion.CryptoKeyVersionState
# test disable
snippets.disable_crypto_key_version(self.project_id, self.location,
self.keyring_id, self.symId,
self.keyring_id, self.sym_id,
self.version)
response = client.get_crypto_key_version(name)
assert response.state == state_enum.DISABLED
# test destroy
snippets.destroy_crypto_key_version(self.project_id, self.location,
self.keyring_id, self.symId,
self.keyring_id, self.sym_id,
self.version)
response = client.get_crypto_key_version(name)
assert response.state == state_enum.DESTROY_SCHEDULED
# test restore
snippets.restore_crypto_key_version(self.project_id, self.location,
self.keyring_id, self.symId,
self.keyring_id, self.sym_id,
self.version)
response = client.get_crypto_key_version(name)
assert response.state == state_enum.DISABLED
# test re-enable
snippets.enable_crypto_key_version(self.project_id, self.location,
self.keyring_id, self.symId,
self.keyring_id, self.sym_id,
self.version)
response = client.get_crypto_key_version(name)
assert response.state == state_enum.ENABLED
Expand Down Expand Up @@ -171,48 +173,61 @@ def test_ring_policy(self):
# tests get/add/remove policy members
def test_key_policy(self):
# add member
snippets.add_member_to_crypto_key_policy(self.project_id,
self.location,
self.keyring_id,
self.symId,
self.member,
self.role)
policy = snippets.get_crypto_key_policy(self.project_id,
self.location,
self.keyring_id,
self.symId)
found = False
for b in list(policy.bindings):
if b.role == self.role and self.member in b.members:
found = True
assert found
snippets.add_member_to_crypto_key_policy(
self.project_id,
self.location,
self.keyring_id,
self.sym_id,
self.member,
self.role)

def check_policy():
policy = snippets.get_crypto_key_policy(
self.project_id,
self.location,
self.keyring_id,
self.sym_id)
found = False
for b in list(policy.bindings):
if b.role == self.role and self.member in b.members:
found = True
assert found
eventually_consistent.call(check_policy,
exceptions=(Aborted, AssertionError))
# remove member
snippets.remove_member_from_crypto_key_policy(self.project_id,
self.location,
self.keyring_id,
self.symId,
self.member,
self.role)
policy = snippets.get_crypto_key_policy(self.project_id,
self.location,
self.keyring_id,
self.symId)
found = False
for b in list(policy.bindings):
if b.role == self.role and self.member in b.members:
found = True
assert not found
snippets.remove_member_from_crypto_key_policy(
self.project_id,
self.location,
self.keyring_id,
self.sym_id,
self.member,
self.role)

def check_policy():
policy = snippets.get_crypto_key_policy(
self.project_id,
self.location,
self.keyring_id,
self.sym_id)
found = False
for b in list(policy.bindings):
if b.role == self.role and self.member in b.members:
found = True
assert not found
eventually_consistent.call(
check_policy,
exceptions=(Aborted, AssertionError))

def test_symmetric_encrypt_decrypt(self):
cipher_bytes = snippets.encrypt_symmetric(self.project_id,
self.location,
self.keyring_id,
self.symId,
self.sym_id,
self.message_bytes)
plain_bytes = snippets.decrypt_symmetric(self.project_id,
self.location,
self.keyring_id,
self.symId,
self.sym_id,
cipher_bytes)
assert plain_bytes == self.message_bytes
assert cipher_bytes != self.message_bytes
Expand Down

0 comments on commit c8af3e8

Please sign in to comment.