Skip to content

Commit

Permalink
[AIRFLOW-1333] Enable copy function for Google Cloud Storage Hook
Browse files Browse the repository at this point in the history
Closes apache#2385 from yk5/gcs_hook_copy
  • Loading branch information
yk5 authored and criccomini committed Jun 23, 2017
1 parent 4841e3e commit e2c3831
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,52 @@ def get_conn(self):
http_authorized = self._authorize()
return build('storage', 'v1', http=http_authorized)


# pylint:disable=redefined-builtin
def copy(self, source_bucket, source_object, destination_bucket=None,
destination_object=None):
"""
Copies an object from a bucket to another, with renaming if requested.
destination_bucket or destination_object can be omitted, in which case
source bucket/object is used, but not both.
:param bucket: The bucket of the object to copy from.
:type bucket: string
:param object: The object to copy.
:type object: string
:param destination_bucket: The destination of the object to copied to.
Can be omitted; then the same bucket is used.
:type destination_bucket: string
:param destination_object: The (renamed) path of the object if given.
Can be omitted; then the same name is used.
"""
destination_bucket = destination_bucket or source_bucket
destination_object = destination_object or source_object
if (source_bucket == destination_bucket and
source_object == destination_object):
raise ValueError(
'Either source/destination bucket or source/destination object '
'must be different, not both the same: bucket=%s, object=%s' %
(source_bucket, source_object))
if not source_bucket or not source_object:
raise ValueError('source_bucket and source_object cannot be empty.')

service = self.get_conn()
try:
service \
.objects() \
.copy(sourceBucket=source_bucket, sourceObject=source_object,
destinationBucket=destination_bucket,
destinationObject=destination_object, body='') \
.execute()
return True
except errors.HttpError as ex:
if ex.resp['status'] == '404':
return False
raise


# pylint:disable=redefined-builtin
def download(self, bucket, object, filename=False):
"""
Expand Down Expand Up @@ -157,6 +203,7 @@ def delete(self, bucket, object, generation=None):
"""
Delete an object if versioning is not enabled for the bucket, or if generation
parameter is used.
:param bucket: name of the bucket, where the object resides
:type bucket: string
:param object: name of the object to delete
Expand All @@ -181,6 +228,7 @@ def delete(self, bucket, object, generation=None):
def list(self, bucket, versions=None, maxResults=None, prefix=None):
"""
List all objects from the bucket with the give string prefix in name
:param bucket: bucket name
:type bucket: string
:param versions: if true, list all versions of the objects
Expand Down

0 comments on commit e2c3831

Please sign in to comment.