Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle request cancellation in Airlock processor #2584

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ FEATURES:

ENHANCEMENTS:

*
* Cancelling an Airlock request triggers deletion of the request container and files ([#2584](https://github.com/microsoft/AzureTRE/pull/2584))

BUG FIXES:

Expand Down
4 changes: 2 additions & 2 deletions airlock_processor/BlobCreatedTrigger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ def main(msg: func.ServiceBusMessage,
id=str(uuid.uuid4()),
data={"blob_to_delete": copied_from[-1]}, # last container in copied_from is the one we just copied from
subject=request_id,
event_type="Airlock.ToDelete",
event_type="Airlock.DataDeletion",
event_time=datetime.datetime.utcnow(),
data_version="1.0"
data_version=constants.DATA_DELETION_EVENT_DATA_VERSION
)
)
162 changes: 106 additions & 56 deletions airlock_processor/StatusChangedQueueTrigger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Optional

import azure.functions as func
import datetime
Expand All @@ -14,7 +15,8 @@

class RequestProperties(BaseModel):
request_id: str
status: str
new_status: str
previous_status: Optional[str]
type: str
workspace_id: str

Expand All @@ -28,50 +30,46 @@ def __init__(self, source_account_name: str, dest_account_name: str):
self.dest_account_name = dest_account_name


def main(msg: func.ServiceBusMessage, outputEvent: func.Out[func.EventGridOutputEvent]):
def main(msg: func.ServiceBusMessage, stepResultEvent: func.Out[func.EventGridOutputEvent], dataDeletionEvent: func.Out[func.EventGridOutputEvent]):
try:
request_properties = extract_properties(msg)
request_files = get_request_files(request_properties) if request_properties.status == constants.STAGE_SUBMITTED else None
handle_status_changed(request_properties, outputEvent, request_files)
request_files = get_request_files(request_properties) if request_properties.new_status == constants.STAGE_SUBMITTED else None
handle_status_changed(request_properties, stepResultEvent, dataDeletionEvent, request_files)

except NoFilesInRequestException:
set_output_event_to_report_failure(outputEvent, request_properties, failure_reason=constants.NO_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason=constants.NO_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
except TooManyFilesInRequestException:
set_output_event_to_report_failure(outputEvent, request_properties, failure_reason=constants.TOO_MANY_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason=constants.TOO_MANY_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
except Exception:
set_output_event_to_report_failure(outputEvent, request_properties, failure_reason=constants.UNKNOWN_REASON_MESSAGE, request_files=request_files)
set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason=constants.UNKNOWN_REASON_MESSAGE, request_files=request_files)


def handle_status_changed(request_properties: RequestProperties, outputEvent: func.Out[func.EventGridOutputEvent], request_files):
new_status = request_properties.status
def handle_status_changed(request_properties: RequestProperties, stepResultEvent: func.Out[func.EventGridOutputEvent], dataDeletionEvent: func.Out[func.EventGridOutputEvent], request_files):
new_status = request_properties.new_status
previous_status = request_properties.previous_status
req_id = request_properties.request_id
ws_id = request_properties.workspace_id
request_type = request_properties.type

logging.info('Processing request with id %s. new status is "%s", type is "%s"', req_id, new_status, request_type)

try:
tre_id = os.environ["TRE_ID"]
except KeyError as e:
logging.error(f'Missing environment variable: {e}')
raise

if new_status == constants.STAGE_DRAFT and request_type == constants.IMPORT_TYPE:
account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL + tre_id
if new_status == constants.STAGE_DRAFT:
account_name = get_storage_account(status=constants.STAGE_DRAFT, request_type=request_type, short_workspace_id=ws_id)
blob_operations.create_container(account_name, req_id)
return

if new_status == constants.STAGE_DRAFT and request_type == constants.EXPORT_TYPE:
account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + ws_id
blob_operations.create_container(account_name, req_id)
if new_status == constants.STAGE_CANCELLED:
storage_account_name = get_storage_account(previous_status, request_type, ws_id)
container_to_delete_url = blob_operations.get_blob_url(account_name=storage_account_name, container_name=req_id)
set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_properties, container_url=container_to_delete_url)
return

if new_status == constants.STAGE_SUBMITTED:
set_output_event_to_report_request_files(outputEvent, request_properties, request_files)
set_output_event_to_report_request_files(stepResultEvent, request_properties, request_files)

if (is_require_data_copy(new_status)):
logging.info('Request with id %s. requires data copy between storage accounts', req_id)
containers_metadata = get_source_dest_for_copy(new_status, request_type, ws_id)
containers_metadata = get_source_dest_for_copy(new_status=new_status, previous_status=previous_status, request_type=request_type, short_workspace_id=ws_id)
eladiw marked this conversation as resolved.
Show resolved Hide resolved
blob_operations.create_container(containers_metadata.dest_account_name, req_id)
blob_operations.copy_data(containers_metadata.source_account_name,
containers_metadata.dest_account_name, req_id)
Expand Down Expand Up @@ -104,78 +102,130 @@ def is_require_data_copy(new_status: str):
return False


def get_source_dest_for_copy(new_status: str, request_type: str, short_workspace_id: str) -> ContainersCopyMetadata:
def get_source_dest_for_copy(new_status: str, previous_status: str, request_type: str, short_workspace_id: str) -> ContainersCopyMetadata:
# sanity
if is_require_data_copy(new_status) is False:
raise Exception("Given new status is not supported")

try:
tre_id = os.environ["TRE_ID"]
except KeyError as e:
logging.error(f'Missing environment variable: {e}')
raise

request_type = request_type.lower()
if request_type != constants.IMPORT_TYPE and request_type != constants.EXPORT_TYPE:
msg = "Airlock request type must be either '{}' or '{}".format(str(constants.IMPORT_TYPE),
str(constants.EXPORT_TYPE))
logging.error(msg)
raise Exception(msg)

source_account_name = get_storage_account(previous_status, request_type, short_workspace_id)
dest_account_name = get_storage_account_destination_for_copy(new_status, request_type, short_workspace_id)
return ContainersCopyMetadata(source_account_name, dest_account_name)


def get_storage_account(status: str, request_type: str, short_workspace_id: str) -> str:
tre_id = _get_tre_id()

if request_type == constants.IMPORT_TYPE:
if status == constants.STAGE_DRAFT:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL + tre_id
elif status == constants.STAGE_APPROVED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + short_workspace_id
elif status == constants.STAGE_REJECTED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
elif status == constants.STAGE_BLOCKED_BY_SCAN:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_BLOCKED + tre_id
elif status in [constants.STAGE_IN_REVIEW, constants.STAGE_SUBMITTED, constants.STAGE_APPROVAL_INPROGRESS, constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id

if request_type == constants.EXPORT_TYPE:
if status == constants.STAGE_DRAFT:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + short_workspace_id
elif status == constants.STAGE_APPROVED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
elif status == constants.STAGE_REJECTED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + short_workspace_id
elif status == constants.STAGE_BLOCKED_BY_SCAN:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id
elif status in [constants.STAGE_IN_REVIEW, constants.STAGE_SUBMITTED, constants.STAGE_APPROVAL_INPROGRESS, constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id

error_message = f"Missing current storage account definition for status '{status}' and request type '{request_type}'."
logging.error(error_message)
raise Exception(error_message)


def get_storage_account_destination_for_copy(new_status: str, request_type: str, short_workspace_id: str) -> str:
tre_id = _get_tre_id()

if request_type == constants.IMPORT_TYPE:
if new_status == constants.STAGE_SUBMITTED:
source_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL + tre_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
elif new_status == constants.STAGE_APPROVAL_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + short_workspace_id
elif new_status == constants.STAGE_REJECTION_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
return constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
elif new_status == constants.STAGE_BLOCKING_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_BLOCKED + tre_id
else:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_BLOCKED + tre_id

if request_type == constants.EXPORT_TYPE:
if new_status == constants.STAGE_SUBMITTED:
source_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + short_workspace_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
elif new_status == constants.STAGE_APPROVAL_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
elif new_status == constants.STAGE_REJECTION_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + short_workspace_id
elif new_status == constants.STAGE_BLOCKING_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id

return ContainersCopyMetadata(source_account_name, dest_account_name)
yuvalyaron marked this conversation as resolved.
Show resolved Hide resolved
error_message = f"Missing copy destination storage account definition for status '{new_status}' and request type '{request_type}'."
logging.error(error_message)
raise Exception(error_message)


def set_output_event_to_report_failure(outputEvent, request_properties, failure_reason, request_files):
def set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason, request_files):
logging.exception(f"Failed processing Airlock request with ID: '{request_properties.request_id}', changing request status to '{constants.STAGE_FAILED}'.")
outputEvent.set(
stepResultEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"completed_step": request_properties.status, "new_status": constants.STAGE_FAILED, "request_id": request_properties.request_id, "request_files": request_files, "error_message": failure_reason},
data={"completed_step": request_properties.new_status, "new_status": constants.STAGE_FAILED, "request_id": request_properties.request_id, "request_files": request_files, "error_message": failure_reason},
subject=request_properties.request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version=constants.STEP_RESULT_EVENT_DATA_VERSION))


def set_output_event_to_report_request_files(outputEvent, request_properties, request_files):
outputEvent.set(
def set_output_event_to_report_request_files(stepResultEvent, request_properties, request_files):
logging.info(f'Sending file enumeration result for request with ID: {request_properties.request_id} result: {request_files}')
stepResultEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"completed_step": request_properties.status, "request_id": request_properties.request_id, "request_files": request_files},
data={"completed_step": request_properties.new_status, "request_id": request_properties.request_id, "request_files": request_files},
subject=request_properties.request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version=constants.STEP_RESULT_EVENT_DATA_VERSION))


def get_request_files(request_properties):
containers_metadata = get_source_dest_for_copy(request_properties.status, request_properties.type, request_properties.workspace_id)
storage_account_name = containers_metadata.source_account_name
def set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_properties, container_url):
logging.info(f'Sending container deletion event for request ID: {request_properties.request_id}. container URL: {container_url}')
dataDeletionEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"blob_to_delete": container_url},
eladiw marked this conversation as resolved.
Show resolved Hide resolved
subject=request_properties.request_id,
event_type="Airlock.DataDeletion",
event_time=datetime.datetime.utcnow(),
data_version=constants.DATA_DELETION_EVENT_DATA_VERSION
)
)


def get_request_files(request_properties: RequestProperties):
storage_account_name = get_storage_account(request_properties.previous_status, request_properties.type, request_properties.workspace_id)
return blob_operations.get_request_files(account_name=storage_account_name, request_id=request_properties.request_id)


def _get_tre_id():
try:
tre_id = os.environ["TRE_ID"]
except KeyError as e:
logging.error(f'Missing environment variable: {e}')
raise
return tre_id
9 changes: 8 additions & 1 deletion airlock_processor/StatusChangedQueueTrigger/function.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@
},
{
"type": "eventGrid",
"name": "outputEvent",
"name": "stepResultEvent",
"topicEndpointUri": "EVENT_GRID_STEP_RESULT_TOPIC_URI_SETTING",
"topicKeySetting": "EVENT_GRID_STEP_RESULT_TOPIC_KEY_SETTING",
"direction": "out"
},
{
"type": "eventGrid",
"name": "dataDeletionEvent",
"topicEndpointUri": "EVENT_GRID_TO_DELETE_TOPIC_URI_SETTING",
"topicKeySetting": "EVENT_GRID_TO_DELETE_TOPIC_KEY_SETTING",
"direction": "out"
}
]
}
7 changes: 6 additions & 1 deletion airlock_processor/ToDeleteTrigger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ def delete_blob_and_container_if_last_blob(blob_url: str):
credential=credential)
container_client = blob_service_client.get_container_client(container_name)

if not blob_name:
logging.info(f'No specific blob specified, deleting the entire container: {container_name}')
container_client.delete_container()
return

# If it's the only blob in the container, we need to delete the container too
# Check how many blobs are in the container (note: this exausts the generator)
blobs_num = sum(1 for _ in container_client.list_blobs())
Expand All @@ -33,7 +38,7 @@ def delete_blob_and_container_if_last_blob(blob_url: str):

def main(msg: func.ServiceBusMessage):
body = msg.get_body().decode('utf-8')
logging.info(f'Python ServiceBus queue trigger processed mesage: {body}')
logging.info(f'Python ServiceBus queue trigger processed message: {body}')
json_body = json.loads(body)

blob_url = json_body["data"]["blob_to_delete"]
Expand Down
2 changes: 1 addition & 1 deletion airlock_processor/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.6"
__version__ = "0.4.7"
4 changes: 4 additions & 0 deletions airlock_processor/shared_code/blob_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,7 @@ def get_blob_info_from_topic_and_subject(topic: str, subject: str):
def get_blob_info_from_blob_url(blob_url: str) -> Tuple[str, str, str]:
# Example of blob url: https://stalimappws663d.blob.core.windows.net/50866a82-d13a-4fd5-936f-deafdf1022ce/test_blob.txt
return re.search(r'https://(.*?).blob.core.windows.net/(.*?)/(.*?)$', blob_url).groups()


def get_blob_url(account_name: str, container_name: str, blob_name='') -> str:
return f'{get_account_url(account_name)}{container_name}/{blob_name}'
1 change: 1 addition & 0 deletions airlock_processor/shared_code/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@

# Event Grid
STEP_RESULT_EVENT_DATA_VERSION = "1.0"
DATA_DELETION_EVENT_DATA_VERSION = "1.0"

NO_THREATS = "No threats found"
17 changes: 16 additions & 1 deletion airlock_processor/tests/shared_code/test_blob_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from unittest import TestCase
from unittest.mock import MagicMock, patch

from shared_code.blob_operations import get_blob_info_from_topic_and_subject, get_blob_info_from_blob_url, copy_data
from shared_code.blob_operations import get_blob_info_from_topic_and_subject, get_blob_info_from_blob_url, copy_data, get_blob_url
from exceptions import TooManyFilesInRequestException, NoFilesInRequestException


Expand Down Expand Up @@ -75,3 +75,18 @@ def test_copy_data_adds_copied_from_metadata(self, _, mock_blob_service_client):

# Check that copied_from field was set correctly in the metadata
dest_blob_client_mock.start_copy_from_url.assert_called_with(f"{source_url}?sas", metadata=dest_metadata)

def test_get_blob_url_should_return_blob_url(self):
account_name = "account"
container_name = "container"
blob_name = "blob"

blob_url = get_blob_url(account_name, container_name, blob_name)
self.assertEqual(blob_url, f"https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}")

def test_get_blob_url_without_blob_name_should_return_container_url(self):
account_name = "account"
container_name = "container"

blob_url = get_blob_url(account_name, container_name)
self.assertEqual(blob_url, f"https://{account_name}.blob.core.windows.net/{container_name}/")
Loading