Skip to content

Commit

Permalink
[healthcare] testing: use fixture for fhir_store (#4028)
Browse files Browse the repository at this point in the history
will mitigate #4025
  • Loading branch information
Takashi Matsuo authored Jun 9, 2020
1 parent ee5be6d commit a365fad
Showing 1 changed file with 47 additions and 56 deletions.
103 changes: 47 additions & 56 deletions healthcare/api-client/fhir/fhir_stores_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

dataset_id = "test_dataset_{}".format(uuid.uuid4())
fhir_store_id = "test_fhir_store-{}".format(uuid.uuid4())
test_fhir_store_id = "test_fhir_store-{}".format(uuid.uuid4())

gcs_uri = os.environ["CLOUD_STORAGE_BUCKET"]
RESOURCES = os.path.join(os.path.dirname(__file__), "resources")
Expand All @@ -53,6 +54,39 @@ def test_dataset():
datasets.delete_dataset(service_account_json, project_id, cloud_region, dataset_id)


@pytest.fixture(scope="module")
def test_fhir_store():
resp = fhir_stores.create_fhir_store(
service_account_json, project_id, cloud_region, dataset_id,
test_fhir_store_id
)

yield resp

fhir_stores.delete_fhir_store(
service_account_json, project_id, cloud_region, dataset_id,
test_fhir_store_id
)


@pytest.fixture(scope="module")
def test_blob():
storage_client = storage.Client()
bucket = storage_client.get_bucket(gcs_uri)
blob = bucket.blob(source_file_name)

blob.upload_from_filename(resource_file)

yield blob

# Clean up
try:
blob.delete()
# If blob not found, then it's already been deleted, so no need to clean up.
except exceptions.NotFound as e:
print(f'Ignoring 404: {str(e)}')


def test_CRUD_fhir_store(test_dataset, capsys):
fhir_stores.create_fhir_store(
service_account_json, project_id, cloud_region, dataset_id, fhir_store_id
Expand All @@ -79,110 +113,67 @@ def test_CRUD_fhir_store(test_dataset, capsys):
assert "Deleted FHIR store" in out


def test_patch_fhir_store(test_dataset, capsys):
fhir_stores.create_fhir_store(
service_account_json, project_id, cloud_region, dataset_id, fhir_store_id
)

def test_patch_fhir_store(test_dataset, test_fhir_store, capsys):
fhir_stores.patch_fhir_store(
service_account_json, project_id, cloud_region, dataset_id, fhir_store_id
)

# Clean up
fhir_stores.delete_fhir_store(
service_account_json, project_id, cloud_region, dataset_id, fhir_store_id
service_account_json, project_id, cloud_region, dataset_id,
test_fhir_store_id
)

out, _ = capsys.readouterr()

assert "Patched FHIR store" in out


def test_import_fhir_store_gcs(test_dataset, capsys):
fhir_stores.create_fhir_store(
service_account_json, project_id, cloud_region, dataset_id, fhir_store_id
)

storage_client = storage.Client()
bucket = storage_client.get_bucket(gcs_uri)
blob = bucket.blob(source_file_name)

blob.upload_from_filename(resource_file)
def test_import_fhir_store_gcs(
test_dataset, test_fhir_store, test_blob, capsys):

time.sleep(10) # Give new blob time to propagate

fhir_stores.import_fhir_resources(
service_account_json,
project_id,
cloud_region,
dataset_id,
fhir_store_id,
test_fhir_store_id,
import_object,
)

# Clean up
try:
blob.delete()
# If blob not found, then it's already been deleted, so no need to clean up.
except exceptions.NotFound:
pass

fhir_stores.delete_fhir_store(
service_account_json, project_id, cloud_region, dataset_id, fhir_store_id
)

out, _ = capsys.readouterr()

assert "Imported FHIR resources" in out


def test_export_fhir_store_gcs(test_dataset, capsys):
fhir_stores.create_fhir_store(
service_account_json, project_id, cloud_region, dataset_id, fhir_store_id
)

def test_export_fhir_store_gcs(test_dataset, test_fhir_store, capsys):
fhir_stores.export_fhir_store_gcs(
service_account_json,
project_id,
cloud_region,
dataset_id,
fhir_store_id,
test_fhir_store_id,
gcs_uri,
)

# Clean up
fhir_stores.delete_fhir_store(
service_account_json, project_id, cloud_region, dataset_id, fhir_store_id
)

out, _ = capsys.readouterr()

assert "Exported FHIR resources to bucket" in out


def test_get_set_fhir_store_iam_policy(test_dataset, capsys):
fhir_stores.create_fhir_store(
service_account_json, project_id, cloud_region, dataset_id, fhir_store_id
)

def test_get_set_fhir_store_iam_policy(test_dataset, test_fhir_store, capsys):
get_response = fhir_stores.get_fhir_store_iam_policy(
service_account_json, project_id, cloud_region, dataset_id, fhir_store_id
service_account_json, project_id, cloud_region, dataset_id,
test_fhir_store_id
)

set_response = fhir_stores.set_fhir_store_iam_policy(
service_account_json,
project_id,
cloud_region,
dataset_id,
fhir_store_id,
test_fhir_store_id,
"serviceAccount:python-docs-samples-tests@appspot.gserviceaccount.com",
"roles/viewer",
)

# Clean up
fhir_stores.delete_fhir_store(
service_account_json, project_id, cloud_region, dataset_id, fhir_store_id
)

out, _ = capsys.readouterr()

assert "etag" in get_response
Expand Down

0 comments on commit a365fad

Please sign in to comment.