Skip to content

Commit

Permalink
[Storage] Refactor storage and fix data transfer service (skypilot-or…
Browse files Browse the repository at this point in the history
…g#1239)

* Refactor storage and fix data transfer service

* fix UX for the data transfer

* UX fixes

* Address comments
  • Loading branch information
Michaelvll authored and ewzeng committed Oct 24, 2022
1 parent 525b7c7 commit c513324
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 33 deletions.
64 changes: 44 additions & 20 deletions sky/data/data_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,29 @@
- All combinations of Azure Transfer
- GCS -> S3
"""
from datetime import datetime
import json
import subprocess
from typing import Any
import time

import colorama

from sky import clouds
from sky import sky_logging
from sky.adaptors import aws, gcp
from sky.backends import backend_utils
from sky.utils import ux_utils

logger = sky_logging.init_logger(__name__)

S3Store = Any
GcsStore = Any
MAX_POLLS = 120000
POLL_INTERVAL = 1


def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None:
"""Creates a one-time transfer from Amazon S3 to Google Cloud Storage.
Can be viewed from: https://console.cloud.google.com/transfer/cloud
it will block until the transfer is complete.
Args:
s3_bucket_name: str; Name of the Amazon S3 Bucket
Expand All @@ -56,24 +61,11 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None:
_add_bucket_iam_member(gs_bucket_name, 'roles/storage.admin',
'serviceAccount:' + storage_account['accountEmail'])

starttime = datetime.utcnow()
transfer_job = {
'description': f'Transferring data from S3 Bucket \
{s3_bucket_name} to GCS Bucket {gs_bucket_name}',
'status': 'ENABLED',
'projectId': project_id,
'schedule': {
'scheduleStartDate': {
'day': starttime.day,
'month': starttime.month,
'year': starttime.year,
},
'scheduleEndDate': {
'day': starttime.day,
'month': starttime.month,
'year': starttime.year,
},
},
'transferSpec': {
'awsS3DataSource': {
'bucketName': s3_bucket_name,
Expand All @@ -88,8 +80,40 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None:
}
}

result = storagetransfer.transferJobs().create(body=transfer_job).execute()
logger.info(f'AWS -> GCS Transfer Job: {json.dumps(result, indent=4)}')
response = storagetransfer.transferJobs().create(
body=transfer_job).execute()
operation = storagetransfer.transferJobs().run(jobName=response['name'],
body={
'projectId': project_id
}).execute()

logger.info(f'{colorama.Fore.GREEN}Transfer job scheduled: '
f'{colorama.Style.RESET_ALL}'
f's3://{s3_bucket_name} -> gs://{gs_bucket_name} ')
logger.debug(json.dumps(operation, indent=4))
logger.info('Waiting for the transfer to finish')
start = time.time()
with backend_utils.safe_console_status('Transferring'):
for _ in range(MAX_POLLS):
result = (storagetransfer.transferOperations().get(
name=operation['name']).execute())
if 'error' in result:
with ux_utils.print_exception_no_traceback():
raise RuntimeError(result['error'])

if 'done' in result and result['done']:
break
time.sleep(POLL_INTERVAL)
else:
# If we get here, we timed out.
logger.info(
f'Transfer timed out after {(time.time() - start) / 3600:.2f} '
'hours. Please check the status of the transfer job in the GCP '
'Storage Transfer Service console at '
'https://cloud.google.com/storage-transfer-service')
return
logger.info(
f'Transfer finished in {(time.time() - start) / 60:.2f} minutes.')


def gcs_to_s3(gs_bucket_name: str, s3_bucket_name: str) -> None:
Expand All @@ -114,4 +138,4 @@ def _add_bucket_iam_member(bucket_name: str, role: str, member: str) -> None:

bucket.set_iam_policy(policy)

logger.info(f'Added {member} with role {role} to {bucket_name}.')
logger.debug(f'Added {member} with role {role} to {bucket_name}.')
38 changes: 25 additions & 13 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,32 @@


class StoreType(enum.Enum):
"""Enum for the different types of stores."""
S3 = 'S3'
GCS = 'GCS'
AZURE = 'AZURE'

@classmethod
def from_cloud(cls, cloud: clouds.Cloud) -> 'StoreType':
if isinstance(cloud, clouds.AWS):
return StoreType.S3
elif isinstance(cloud, clouds.GCP):
return StoreType.GCS
elif isinstance(cloud, clouds.Azure):
return StoreType.AZURE

raise ValueError(f'Unsupported cloud for StoreType: {cloud}')

@classmethod
def from_store(cls, store: 'AbstractStore') -> 'StoreType':
if isinstance(store, S3Store):
return StoreType.S3
elif isinstance(store, GcsStore):
return StoreType.GCS
else:
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Unknown store type: {store}')


class StorageMode(enum.Enum):
MOUNT = 'MOUNT'
Expand Down Expand Up @@ -76,16 +98,6 @@ def get_store_prefix(storetype: StoreType) -> str:
raise ValueError(f'Unknown store type: {storetype}')


def _get_storetype_from_store(store: 'Storage') -> StoreType:
if isinstance(store, S3Store):
return StoreType.S3
elif isinstance(store, GcsStore):
return StoreType.GCS
else:
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Unknown store type: {store}')


class AbstractStore:
"""AbstractStore abstracts away the different storage types exposed by
different clouds.
Expand Down Expand Up @@ -312,11 +324,11 @@ def __repr__(self):
f'\n\tstores={self.sky_stores})')

def add_store(self, store: AbstractStore) -> None:
storetype = _get_storetype_from_store(store)
storetype = StoreType.from_store(store)
self.sky_stores[storetype] = store.get_metadata()

def remove_store(self, store: AbstractStore) -> None:
storetype = _get_storetype_from_store(store)
storetype = StoreType.from_store(store)
if storetype in self.sky_stores:
del self.sky_stores[storetype]

Expand Down Expand Up @@ -618,7 +630,7 @@ def add_store(self, store_type: Union[str, StoreType]) -> AbstractStore:

def _add_store(self, store: AbstractStore, is_reconstructed: bool = False):
# Adds a store object to the storage
store_type = _get_storetype_from_store(store)
store_type = StoreType.from_store(store)
self.stores[store_type] = store
# If store initialized and is sky managed, add to state
if store.is_sky_managed:
Expand Down

0 comments on commit c513324

Please sign in to comment.