Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 103 additions & 13 deletions playground/infrastructure/cd_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging
import os
import shutil
from pathlib import Path
from google.cloud import storage
from typing import List
from helper import Example

from api.v1.api_pb2 import Sdk
from config import Config, PrecompiledExample
from helper import Example, get_statuses
from grpc_client import GRPCClient


class CDHelper:
Expand All @@ -28,27 +37,108 @@ def store_examples(self, examples: List[Example]):
"""
Store beam examples and their output in the Google Cloud.
"""
self._run_code(examples)
self._save_to_cloud(examples)
asyncio.run(self._get_outputs(examples))
self._save_to_cloud_storage(examples)
self._clear_temp_folder()

def _run_code(self, examples: List[Example]):
async def _get_outputs(self, examples: List[Example]):
"""
Run beam examples and keep their ouput.
Run beam examples and keep their output.

Call the backend to start code processing for the examples. Then receive code output.

Args:
examples: beam examples that should be run
"""
# TODO [BEAM-13258] Implement logic
pass
await get_statuses(examples) # run examples code and wait until all are executed
client = GRPCClient()
tasks = [client.get_run_output(example.pipeline_id) for example in examples]
outputs = await asyncio.gather(*tasks)
for output, example in zip(outputs, examples):
example.output = output

def _save_to_cloud_storage(self, examples: List[Example]):
"""
Save examples, outputs and meta to bucket

Args:
examples: precompiled examples
"""
self._storage_client = storage.Client()
self._bucket = self._storage_client.bucket(Config.BUCKET_NAME)
for example in examples:
file_names = self._write_to_local_fs(example)
for cloud_file_name, local_file_name in file_names.items():
self._upload_blob(source_file=local_file_name, destination_blob_name=cloud_file_name)

def _save_to_cloud(self, examples: List[Example]):
def _write_to_local_fs(self, example: Example):
"""
Save beam examples and their output using backend instance.
Write code of an example, output and meta info to the filesystem (in temp folder)

Args:
examples: beam examples with their output
example: example object

Returns: dict {path_at_the_bucket:path_at_the_os}

"""
path_to_object_folder = os.path.join(Config.TEMP_FOLDER, example.pipeline_id, Sdk.Name(example.sdk),
example.tag.name)
Path(path_to_object_folder).mkdir(parents=True, exist_ok=True)

file_names = dict()
code_path = self._get_gcs_object_name(sdk=example.sdk, base_folder_name=example.tag.name,
file_name=example.tag.name)
output_path = self._get_gcs_object_name(sdk=example.sdk, base_folder_name=example.tag.name,
file_name=example.tag.name,
extension=PrecompiledExample.OUTPUT_EXTENSION)
meta_path = self._get_gcs_object_name(sdk=example.sdk, base_folder_name=example.tag.name,
file_name=PrecompiledExample.META_NAME,
extension=PrecompiledExample.META_EXTENSION)
file_names[code_path] = example.code
file_names[output_path] = example.output
file_names[meta_path] = str(example.tag._asdict())
for file_name, file_content in file_names.items():
local_file_path = os.path.join(Config.TEMP_FOLDER, example.pipeline_id, file_name)
with open(local_file_path, "w") as file:
file.write(file_content)
file_names[file_name] = local_file_path # don't need content anymore, instead save the local path
return file_names

def _get_gcs_object_name(self, sdk: Sdk, base_folder_name: str, file_name: str, extension: str = None):
"""
Get the path where file will be stored at the bucket.

Args:
sdk: sdk of the example
file_name: name of the example
base_folder_name: name of the folder where example is stored (eq. to example name)
extension: extension of the file

Returns: file name
"""
if extension is None:
extension = Config.EXTENSIONS[Sdk.Name(sdk)]
return os.path.join(Sdk.Name(sdk), base_folder_name, "{}.{}".format(file_name, extension))

def _upload_blob(self, source_file: str, destination_blob_name: str):
"""
Upload a file to the bucket.

Args:
source_file: name of the file to be stored
destination_blob_name: "storage-object-name"
"""

blob = self._bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file)
# change caching to no caching
blob.cache_control = Config.NO_STORE
blob.patch()
logging.info("File uploaded to {}.".format(destination_blob_name))

def _clear_temp_folder(self):
"""
Remove temporary folder with source files.
"""
# TODO [BEAM-13258] Implement logic of saving examples
pass
if os.path.exists(Config.TEMP_FOLDER):
shutil.rmtree(Config.TEMP_FOLDER)
1 change: 1 addition & 0 deletions playground/infrastructure/ci_cd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import os

Expand Down
2 changes: 1 addition & 1 deletion playground/infrastructure/ci_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def verify_examples(self, examples: List[Example]):
2. Group code of examples by their SDK.
3. Run processing for all examples to verify examples' code.
"""
get_statuses(examples)
await get_statuses(examples)
await self._verify_examples_status(examples)

async def _verify_examples_status(self, examples: List[Example]):
Expand Down
14 changes: 13 additions & 1 deletion playground/infrastructure/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@
@dataclass(frozen=True)
class Config:
SERVER_ADDRESS = os.getenv("SERVER_ADDRESS", "localhost:8080")
SUPPORTED_SDK = {"java": SDK_JAVA, "go": SDK_GO, "py": SDK_PYTHON}
BUCKET_NAME = "test_public_bucket_akvelon"
TEMP_FOLDER = "temp"
EXTENSIONS = {"SDK_JAVA": "java", "SDK_GO": "go", "SDK_PYTHON": "py"}
NO_STORE = "no-store"
ERROR_STATUSES = [STATUS_VALIDATION_ERROR, STATUS_ERROR, STATUS_PREPARATION_ERROR, STATUS_COMPILE_ERROR,
STATUS_RUN_TIMEOUT, STATUS_RUN_ERROR]
SUPPORTED_SDK = {'java': SDK_JAVA, 'go': SDK_GO, 'py': SDK_PYTHON}
BEAM_PLAYGROUND_TITLE = "Beam-playground:\n"
BEAM_PLAYGROUND = "Beam-playground"
PAUSE_DELAY = 10


@dataclass(frozen=True)
Expand All @@ -36,3 +41,10 @@ class TagFields:
DESCRIPTION: str = "description"
MULTIFILE: str = "multifile"
CATEGORIES: str = "categories"


@dataclass(frozen=True)
class PrecompiledExample:
OUTPUT_EXTENSION = "output"
META_NAME = "meta"
META_EXTENSION = "info"
1 change: 1 addition & 0 deletions playground/infrastructure/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ grpcio==1.41.1
mock==4.0.3
protobuf==3.19.1
pytest==6.2.5
pytest-mock==3.6.1
PyYAML==6.0
79 changes: 71 additions & 8 deletions playground/infrastructure/test_cd_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,79 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import mock
import os
import shutil

import pytest

from cd_helper import CDHelper
from api.v1.api_pb2 import SDK_JAVA, STATUS_UNSPECIFIED
from config import Config
from helper import Example, Tag


@pytest.fixture
def delete_temp_folder():
"""
Create temp folder for tests with storing files
"""
yield delete_temp_folder
if os.path.exists(Config.TEMP_FOLDER):
shutil.rmtree(Config.TEMP_FOLDER)


@pytest.fixture
def upload_blob():
"""
Fake method for mocking
Returns: None
"""
pass


def test__get_gcs_object_name():
"""
Test getting the path where file will be stored at the bucket
"""
expected_result = "SDK_JAVA/base_folder/file.java"
expected_result_with_extension = "SDK_JAVA/base_folder/file.output"
assert CDHelper()._get_gcs_object_name(SDK_JAVA, "base_folder", "file") == expected_result
assert CDHelper()._get_gcs_object_name(SDK_JAVA, "base_folder", "file", "output") == expected_result_with_extension


def test__write_to_local_fs(delete_temp_folder):
"""
Test writing code of an example, output and meta info to the filesystem (in temp folder)
Args:
delete_temp_folder: python fixture to clean up temp folder after method execution
"""
object_meta = {"name": "name", "description": "description", "multifile": False,
"categories": ["category-1", "category-2"]}
example = Example("name", "pipeline_id", SDK_JAVA, "filepath", "code_of_example",
"output_of_example", STATUS_UNSPECIFIED, Tag(**object_meta))
expected_result = {"SDK_JAVA/name/name.java": "temp/pipeline_id/SDK_JAVA/name/name.java",
"SDK_JAVA/name/name.output": "temp/pipeline_id/SDK_JAVA/name/name.output",
"SDK_JAVA/name/meta.info": "temp/pipeline_id/SDK_JAVA/name/meta.info"}
assert CDHelper()._write_to_local_fs(example) == expected_result


@mock.patch('cd_helper.CDHelper._save_to_cloud')
@mock.patch('cd_helper.CDHelper._run_code')
def test_store_examples(mock_run_code, mock_save_to_cloud):
helper = CDHelper()
helper.store_examples([])
def test__save_to_cloud_storage(mocker):
"""
Test saving examples, outputs and meta to bucket
Args:
mocker: mocker fixture from pytest-mocker
"""
upload_blob_mock = mocker.patch(
"cd_helper.CDHelper._upload_blob",
return_value=upload_blob
)
write_to_os_mock = mocker.patch(
"cd_helper.CDHelper._write_to_local_fs",
return_value={"": ""}
)
example = Example("name", "pipeline_id", SDK_JAVA, "filepath", "code_of_example",
"output_of_example", STATUS_UNSPECIFIED, None)

mock_run_code.assert_called_once_with([])
mock_save_to_cloud.assert_called_once_with([])
CDHelper()._save_to_cloud_storage([example])
write_to_os_mock.assert_called_with(example)
upload_blob_mock.assert_called_with(source_file="", destination_blob_name="")