Skip to content

Commit

Permalink
Airlock API - instrument the airlock with notifications events (#2195)
Browse files Browse the repository at this point in the history
* Add notification topic

* implement notification sending

* fix tests

* CI errors fixes

* api version change

Co-authored-by: Anat Balzam <anatbalzam@microsoft.com>
  • Loading branch information
anatbal and anatbal authored Jul 5, 2022
1 parent 1c9305f commit 05c6f31
Show file tree
Hide file tree
Showing 18 changed files with 187 additions and 67 deletions.
1 change: 1 addition & 0 deletions api_app/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ SERVICE_BUS_STEP_RESULT_QUEUE=airlock-step-result
# Event grid configuration
# -------------------------
EVENT_GRID_STATUS_CHANGED_TOPIC_ENDPOINT=__CHANGE_ME__
EVENT_GRID_AIRLOCK_NOTIFICATION_TOPIC_ENDPOINT=__CHANGE_ME__

# Logging and monitoring
# ----------------------
Expand Down
2 changes: 1 addition & 1 deletion api_app/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.3.36"
__version__ = "0.3.37"
6 changes: 5 additions & 1 deletion api_app/api/routes/airlock_resource_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from models.domain.airlock_review import AirlockReview
from db.repositories.airlock_requests import AirlockRequestRepository
from models.domain.airlock_request import AirlockRequest, AirlockRequestStatus
from event_grid.helpers import send_status_changed_event
from event_grid.event_sender import send_status_changed_event, send_airlock_notification_event
from models.domain.authentication import User

from resources import strings
Expand Down Expand Up @@ -35,6 +35,8 @@ async def save_and_publish_event_airlock_request(airlock_request: AirlockRequest
try:
logging.debug(f"Sending status changed event for airlock request item: {airlock_request.id}")
await send_status_changed_event(airlock_request)
# TODO - replace with the emails list
await send_airlock_notification_event(airlock_request, [], [])
except Exception as e:
airlock_request_repo.delete_item(airlock_request.id)
logging.error(f"Failed sending status_changed message: {e}")
Expand All @@ -55,6 +57,8 @@ async def update_status_and_publish_event_airlock_request(airlock_request: Airlo
try:
logging.debug(f"Sending status changed event for airlock request item: {airlock_request.id}")
await send_status_changed_event(updated_airlock_request)
# TODO - replace with the emails list
await send_airlock_notification_event(updated_airlock_request, [], [])
return updated_airlock_request
except Exception as e:
logging.error(f"Failed sending status_changed message: {e}")
Expand Down
1 change: 1 addition & 0 deletions api_app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

# Event grid configuration
EVENT_GRID_STATUS_CHANGED_TOPIC_ENDPOINT: str = config("EVENT_GRID_STATUS_CHANGED_TOPIC_ENDPOINT", default="")
EVENT_GRID_AIRLOCK_NOTIFICATION_TOPIC_ENDPOINT: str = config("EVENT_GRID_AIRLOCK_NOTIFICATION_TOPIC_ENDPOINT", default="")

# Managed identity configuration
MANAGED_IDENTITY_CLIENT_ID: str = config("MANAGED_IDENTITY_CLIENT_ID", default="")
Expand Down
37 changes: 37 additions & 0 deletions api_app/event_grid/event_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
from azure.eventgrid import EventGridEvent
from models.domain.events import StatusChangedData, AirlockNotificationData
from event_grid.helpers import publish_event
from core import config
from models.domain.airlock_request import AirlockRequest


async def send_status_changed_event(airlock_request: AirlockRequest):
request_id = airlock_request.id
status = airlock_request.status.value
request_type = airlock_request.requestType.value
short_workspace_id = airlock_request.workspaceId[-4:]

status_changed_event = EventGridEvent(
event_type="statusChanged",
data=StatusChangedData(request_id=request_id, status=status, type=request_type, workspace_id=short_workspace_id).__dict__,
subject=f"{request_id}/statusChanged",
data_version="2.0"
)
logging.info(f"Sending status changed event with request ID {request_id}, status: {status}")
await publish_event(status_changed_event, config.EVENT_GRID_STATUS_CHANGED_TOPIC_ENDPOINT)


async def send_airlock_notification_event(airlock_request: AirlockRequest, researchers_emails, owners_emails):
request_id = airlock_request.id
status = airlock_request.status.value
short_workspace_id = airlock_request.workspaceId[-4:]

airlock_notification = EventGridEvent(
event_type="airlockNotification",
data=AirlockNotificationData(request_id=request_id, event_type="status_changed", event_value=status, researchers_emails=researchers_emails, owners_emails=owners_emails, workspace_id=short_workspace_id).__dict__,
subject=f"{request_id}/airlockNotification",
data_version="2.0"
)
logging.info(f"Sending airlock notification event with request ID {request_id}, status: {status}")
await publish_event(airlock_notification, config.EVENT_GRID_AIRLOCK_NOTIFICATION_TOPIC_ENDPOINT)
25 changes: 1 addition & 24 deletions api_app/event_grid/helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import logging
from azure.eventgrid import EventGridEvent
from azure.eventgrid.aio import EventGridPublisherClient
from azure.identity.aio import DefaultAzureCredential
from models.domain.airlock_request import AirlockRequest
from core import config
from contextlib import asynccontextmanager

Expand All @@ -17,29 +15,8 @@ async def default_credentials():
await credential.close()


async def _publish_event(event: EventGridEvent, topic_endpoint: str):
async def publish_event(event: EventGridEvent, topic_endpoint: str):
async with default_credentials() as credential:
client = EventGridPublisherClient(topic_endpoint, credential)
async with client:
await client.send([event])


async def send_status_changed_event(airlock_request: AirlockRequest):
request_id = airlock_request.id
status = airlock_request.status.value
request_type = airlock_request.requestType.value
short_workspace_id = airlock_request.workspaceId[-4:]

status_changed_event = EventGridEvent(
event_type="statusChanged",
data={
"request_id": request_id,
"status": status,
"type": request_type,
"workspace_id": short_workspace_id
},
subject=f"{request_id}/statusChanged",
data_version="2.0"
)
logging.info(f"Sending status changed event with request ID {request_id}, status: {status}")
await _publish_event(status_changed_event, config.EVENT_GRID_STATUS_CHANGED_TOPIC_ENDPOINT)
18 changes: 18 additions & 0 deletions api_app/models/domain/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import List
from models.domain.azuretremodel import AzureTREModel


class AirlockNotificationData(AzureTREModel):
request_id: str
event_type: str
event_value: str
researchers_emails: List[str]
owners_emails: List[str]
workspace_id: str


class StatusChangedData(AzureTREModel):
request_id: str
status: str
type: str
workspace_id: str
6 changes: 3 additions & 3 deletions api_app/tests_ma/test_api/test_routes/test_airlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def test_post_airlock_request_with_state_store_endpoint_not_responding_ret
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE

@patch("api.routes.airlock.AirlockRequestRepository.delete_item")
@patch("event_grid.helpers.send_status_changed_event", side_effect=HttpResponseError)
@patch("event_grid.event_sender.send_status_changed_event", side_effect=HttpResponseError)
async def test_post_airlock_request_with_event_grid_not_responding_returns_503(self, _, __, app, client, sample_airlock_request_input_data):
response = await client.post(app.url_path_for(strings.API_CREATE_AIRLOCK_REQUEST, workspace_id=WORKSPACE_ID), json=sample_airlock_request_input_data)
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
Expand Down Expand Up @@ -152,7 +152,7 @@ async def test_post_submit_airlock_request_with_state_store_endpoint_not_respond
@patch("api.routes.airlock.AirlockRequestRepository.read_item_by_id", return_value=sample_airlock_request_object())
@patch("api.routes.airlock.AirlockRequestRepository.update_airlock_request_status")
@patch("api.routes.airlock.AirlockRequestRepository.delete_item")
@patch("event_grid.helpers.send_status_changed_event", side_effect=HttpResponseError)
@patch("event_grid.event_sender.send_status_changed_event", side_effect=HttpResponseError)
async def test_post_submit_airlock_request_with_event_grid_not_responding_returns_503(self, _, __, ___, ____, app, client):
response = await client.post(app.url_path_for(strings.API_SUBMIT_AIRLOCK_REQUEST, workspace_id=WORKSPACE_ID, airlock_request_id=AIRLOCK_REQUEST_ID))
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
Expand Down Expand Up @@ -246,7 +246,7 @@ async def test_post_create_airlock_review_with_state_store_endpoint_not_respondi
@patch("api.routes.airlock.AirlockReviewRepository.create_airlock_review_item", return_value=sample_airlock_review_object())
@patch("api.routes.airlock.AirlockReviewRepository.save_item")
@patch("api.routes.airlock.AirlockRequestRepository.update_airlock_request_status")
@patch("event_grid.helpers.send_status_changed_event", side_effect=HttpResponseError)
@patch("event_grid.event_sender.send_status_changed_event", side_effect=HttpResponseError)
async def test_post_create_airlock_review_with_event_grid_not_responding_returns_503(self, _, __, ___, ____, _____, app, client, sample_airlock_review_input_data):
response = await client.post(app.url_path_for(strings.API_REVIEW_AIRLOCK_REQUEST, workspace_id=WORKSPACE_ID, airlock_request_id=AIRLOCK_REQUEST_ID), json=sample_airlock_review_input_data)
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
from mock import AsyncMock, patch, MagicMock

from models.domain.events import AirlockNotificationData, StatusChangedData
from api.routes.airlock_resource_helpers import save_airlock_review, save_and_publish_event_airlock_request, \
update_status_and_publish_event_airlock_request
from db.repositories.airlock_reviews import AirlockReviewRepository
Expand Down Expand Up @@ -47,18 +48,23 @@ def sample_airlock_request(status=AirlockRequestStatus.Draft):
def sample_status_changed_event(status="draft"):
status_changed_event = EventGridEvent(
event_type="statusChanged",
data={
"request_id": AIRLOCK_REQUEST_ID,
"status": status,
"type": AirlockRequestType.Import,
"workspace_id": WORKSPACE_ID[-4:]
},
data=StatusChangedData(request_id=AIRLOCK_REQUEST_ID, status=status, type=AirlockRequestType.Import, workspace_id=WORKSPACE_ID[-4:]).__dict__,
subject=f"{AIRLOCK_REQUEST_ID}/statusChanged",
data_version="2.0"
)
return status_changed_event


def sample_airlock_notification_event(status="draft"):
status_changed_event = EventGridEvent(
event_type="airlockNotification",
data=AirlockNotificationData(request_id=AIRLOCK_REQUEST_ID, event_type="status_changed", event_value=status, researchers_emails=[], owners_emails=[], workspace_id=WORKSPACE_ID[-4:]).__dict__,
subject=f"{AIRLOCK_REQUEST_ID}/airlockNotification",
data_version="2.0"
)
return status_changed_event


def sample_airlock_review(review_decision=AirlockReviewDecision.Approved):
airlock_review = AirlockReview(
id=AIRLOCK_REVIEW_ID,
Expand All @@ -77,6 +83,7 @@ async def test_save_and_publish_event_airlock_request_saves_item(event_grid_publ
airlock_request_mock = sample_airlock_request()
airlock_request_repo_mock.save_item = MagicMock(return_value=None)
status_changed_event_mock = sample_status_changed_event()
airlock_notification_event_mock = sample_airlock_notification_event()
event_grid_sender_client_mock = event_grid_publisher_client_mock.return_value
event_grid_sender_client_mock.send = AsyncMock()

Expand All @@ -87,10 +94,12 @@ async def test_save_and_publish_event_airlock_request_saves_item(event_grid_publ

airlock_request_repo_mock.save_item.assert_called_once_with(airlock_request_mock)

event_grid_sender_client_mock.send.assert_awaited_once()
assert event_grid_sender_client_mock.send.call_count == 2
# Since the eventgrid object has the update time attribute which differs, we only compare the data that was sent
actual_status_changed_event = event_grid_sender_client_mock.send.await_args[0][0][0]
assert (actual_status_changed_event.data == status_changed_event_mock.data)
actual_status_changed_event = event_grid_sender_client_mock.send.await_args_list[0].args[0][0]
assert actual_status_changed_event.data == status_changed_event_mock.data
actual_airlock_notification_event = event_grid_sender_client_mock.send.await_args_list[1].args[0][0]
assert actual_airlock_notification_event.data == airlock_notification_event_mock.data


async def test_save_and_publish_event_airlock_request_raises_503_if_save_to_db_fails(airlock_request_repo_mock):
Expand Down Expand Up @@ -129,6 +138,7 @@ async def test_update_status_and_publish_event_airlock_request_updates_item(even
airlock_request_mock = sample_airlock_request()
updated_airlock_request_mock = sample_airlock_request(status=AirlockRequestStatus.Submitted)
status_changed_event_mock = sample_status_changed_event(status="submitted")
airlock_notification_event_mock = sample_airlock_notification_event(status="submitted")
airlock_request_repo_mock.update_airlock_request_status = MagicMock(return_value=updated_airlock_request_mock)
event_grid_sender_client_mock = event_grid_publisher_client_mock.return_value
event_grid_sender_client_mock.send = AsyncMock()
Expand All @@ -142,10 +152,12 @@ async def test_update_status_and_publish_event_airlock_request_updates_item(even
airlock_request_repo_mock.update_airlock_request_status.assert_called_once()
assert (actual_updated_airlock_request == updated_airlock_request_mock)

event_grid_sender_client_mock.send.assert_awaited_once()
assert event_grid_sender_client_mock.send.call_count == 2
# Since the eventgrid object has the update time attribute which differs, we only compare the data that was sent
actual_status_changed_event = event_grid_sender_client_mock.send.await_args[0][0][0]
assert (actual_status_changed_event.data == status_changed_event_mock.data)
actual_status_changed_event = event_grid_sender_client_mock.send.await_args_list[0].args[0][0]
assert actual_status_changed_event.data == status_changed_event_mock.data
actual_airlock_notification_event = event_grid_sender_client_mock.send.await_args_list[1].args[0][0]
assert actual_airlock_notification_event.data == airlock_notification_event_mock.data


async def test_update_status_and_publish_event_airlock_request_raises_400_if_status_update_invalid(airlock_request_repo_mock):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,12 @@ async def test_receiving_good_message(app, sb_client, logging_mock, airlock_requ
eg_client().send = AsyncMock()
expected_airlock_request = sample_airlock_request()
airlock_request_repo().get_airlock_request_by_id.return_value = expected_airlock_request
airlock_request_repo().update_airlock_request_status.return_value = sample_airlock_request(status=AirlockRequestStatus.InReview)
await receive_step_result_message_and_update_status(app)

airlock_request_repo().get_airlock_request_by_id.assert_called_once_with(test_sb_step_result_message["data"]["request_id"])
airlock_request_repo().update_airlock_request_status.assert_called_once_with(expected_airlock_request, test_sb_step_result_message["data"]["new_status"], expected_airlock_request.user)
eg_client().send.assert_called_once()
assert eg_client().send.call_count == 2
logging_mock.assert_not_called()
sb_client().get_queue_receiver().complete_message.assert_called_once_with(service_bus_received_message_mock)

Expand Down
11 changes: 11 additions & 0 deletions devops/scripts/setup_local_debugging.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ set -e
: "${AZURE_SUBSCRIPTION_ID?"Check AZURE_SUBSCRIPTION_ID is defined in ./templates/core/private.env"}"
: "${EVENT_GRID_STATUS_CHANGED_TOPIC_RESOURCE_ID?"Check EVENT_GRID_STATUS_CHANGED_TOPIC_RESOURCE_ID is defined in ./templates/core/private.env"}"
: "${EVENT_GRID_STATUS_CHANGED_TOPIC_ENDPOINT?"Check EVENT_GRID_STATUS_CHANGED_TOPIC_ENDPOINT is defined in ./templates/core/private.env"}"
: "${EVENT_GRID_AIRLOCK_NOTIFICATION_TOPIC_ENDPOINT?"Check EVENT_GRID_AIRLOCK_NOTIFICATION_TOPIC_ENDPOINT is defined in ./templates/core/private.env"}"

set -o pipefail
set -o nounset
Expand All @@ -23,6 +24,7 @@ fi

# extract eventgrid topic name from endpoint
EVENT_GRID_STATUS_CHANGED_TOPIC_NAME=$(echo "$EVENT_GRID_STATUS_CHANGED_TOPIC_ENDPOINT" | sed 's/https\?:\/\///'| awk -F"." '{print $1}')
EVENT_GRID_AIRLOCK_NOTIFICATION_TOPIC_NAME=$(echo "$EVENT_GRID_AIRLOCK_NOTIFICATION_TOPIC_ENDPOINT" | sed 's/https\?:\/\///'| awk -F"." '{print $1}')

echo "Adding local IP Address to ${COSMOSDB_ACCOUNT_NAME}. This may take a while . . . "
az cosmosdb update \
Expand All @@ -44,6 +46,15 @@ az eventgrid topic update \
--public-network-access enabled \
--inbound-ip-rules "${IPADDR}" allow


echo "Adding local IP Address to ${EVENT_GRID_AIRLOCK_NOTIFICATION_TOPIC_NAME}."
az eventgrid topic update \
--resource-group "${RESOURCE_GROUP_NAME}" \
--name "${EVENT_GRID_AIRLOCK_NOTIFICATION_TOPIC_NAME}" \
--public-network-access enabled \
--inbound-ip-rules "${IPADDR}" allow


# Get the object id of the currently logged-in identity
if [[ -n ${ARM_CLIENT_ID:-} ]]; then
# if environment includes a SP with subscription access, then we should use that.
Expand Down
38 changes: 38 additions & 0 deletions templates/core/terraform/airlock/eventgrid_topics.tf
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,44 @@ resource "azurerm_private_endpoint" "eg_scan_result" {
}
}

# Custom topic (for airlock notifications)
resource "azurerm_eventgrid_topic" "airlock_notification" {
name = local.notification_topic_name
location = var.location
resource_group_name = var.resource_group_name
public_network_access_enabled = false

identity {
type = "SystemAssigned"
}

tags = merge(var.tre_core_tags, {
Publishers = "airlock;custom notification service;"
})

lifecycle { ignore_changes = [tags] }
}

resource "azurerm_private_endpoint" "eg_airlock_notification" {
name = "pe-eg-airlock_notification-${var.tre_id}"
location = var.location
resource_group_name = var.resource_group_name
subnet_id = var.airlock_events_subnet_id
lifecycle { ignore_changes = [tags] }

private_dns_zone_group {
name = "private-dns-zone-group"
private_dns_zone_ids = [data.azurerm_private_dns_zone.eventgrid.id]
}

private_service_connection {
name = "psc-eg-${var.tre_id}"
private_connection_resource_id = azurerm_eventgrid_topic.airlock_notification.id
is_manual_connection = false
subresource_names = ["topic"]
}
}

## Subscriptions

resource "azurerm_eventgrid_event_subscription" "step_result" {
Expand Down
6 changes: 6 additions & 0 deletions templates/core/terraform/airlock/identity.tf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ resource "azurerm_role_assignment" "eventgrid_data_sender" {
principal_id = var.api_principal_id
}

resource "azurerm_role_assignment" "eventgrid_data_sender_notification" {
scope = azurerm_eventgrid_topic.airlock_notification.id
role_definition_name = "EventGrid Data Sender"
principal_id = var.api_principal_id
}

resource "azurerm_role_assignment" "sa_import_external" {
scope = azurerm_storage_account.sa_import_external.id
role_definition_name = "Contributor"
Expand Down
1 change: 1 addition & 0 deletions templates/core/terraform/airlock/locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ locals {
scan_result_topic_name = "evgt-airlock-scan-result-${local.topic_name_suffix}"
step_result_topic_name = "evgt-airlock-step-result-${local.topic_name_suffix}"
status_changed_topic_name = "evgt-airlock-status-changed-${local.topic_name_suffix}"
notification_topic_name = "evgt-airlock-notification-${local.topic_name_suffix}"

step_result_queue_name = "airlock-step-result"
status_changed_queue_name = "airlock-status-changed"
Expand Down
4 changes: 4 additions & 0 deletions templates/core/terraform/airlock/output.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ output "event_grid_status_changed_topic_endpoint" {
value = azurerm_eventgrid_topic.status_changed.endpoint
}

output "event_grid_airlock_notification_topic_endpoint" {
value = azurerm_eventgrid_topic.airlock_notification.endpoint
}

output "service_bus_step_result_queue" {
value = azurerm_servicebus_queue.step_result.name
}
Expand Down
Loading

0 comments on commit 05c6f31

Please sign in to comment.