Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,17 @@
}

DEFAULT_LOGGING_CONFIG['handlers'].update(STACKDRIVER_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith('oss://'):
OSS_REMOTE_HANDLERS = {
'task': {
'class': 'airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'oss_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
}
DEFAULT_LOGGING_CONFIG['handlers'].update(OSS_REMOTE_HANDLERS)
elif ELASTICSEARCH_HOST:
ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get('elasticsearch', 'LOG_ID_TEMPLATE')
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get('elasticsearch', 'END_OF_LOG_MARK')
Expand Down
96 changes: 94 additions & 2 deletions airflow/providers/alibaba/cloud/hooks/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,13 @@ class OSSHook(BaseHook):
conn_type = 'oss'
hook_name = 'OSS'

def __init__(self, region, oss_conn_id='oss_default', *args, **kwargs) -> None:
def __init__(self, region: Optional[str] = None, oss_conn_id='oss_default', *args, **kwargs) -> None:
self.oss_conn_id = oss_conn_id
self.oss_conn = self.get_connection(oss_conn_id)
self.region = region
if region is None:
self.region = self.get_default_region()
else:
self.region = region
super().__init__(*args, **kwargs)

def get_conn(self) -> "Connection":
Expand All @@ -117,6 +120,7 @@ def parse_oss_url(ossurl: str) -> tuple:

return bucket_name, key

@provide_bucket_name
@unify_bucket_name_and_key
def object_exists(self, key: str, bucket_name: Optional[str] = None) -> bool:
"""
Expand All @@ -143,8 +147,10 @@ def get_bucket(self, bucket_name: Optional[str] = None) -> oss2.api.Bucket:
:rtype: oss2.api.Bucket
"""
auth = self.get_credential()
assert self.region is not None
return oss2.Bucket(auth, 'http://oss-' + self.region + '.aliyuncs.com', bucket_name)

@provide_bucket_name
@unify_bucket_name_and_key
def load_string(self, key: str, content: str, bucket_name: Optional[str] = None) -> None:
"""
Expand All @@ -159,6 +165,7 @@ def load_string(self, key: str, content: str, bucket_name: Optional[str] = None)
except Exception as e:
raise AirflowException(f"Errors: {e}")

@provide_bucket_name
@unify_bucket_name_and_key
def upload_local_file(
self,
Expand All @@ -178,6 +185,7 @@ def upload_local_file(
except Exception as e:
raise AirflowException(f"Errors when upload file: {e}")

@provide_bucket_name
@unify_bucket_name_and_key
def download_file(
self,
Expand All @@ -201,6 +209,7 @@ def download_file(
return None
return local_file

@provide_bucket_name
@unify_bucket_name_and_key
def delete_object(
self,
Expand All @@ -219,6 +228,7 @@ def delete_object(
self.log.error(e)
raise AirflowException(f"Errors when deleting: {key}")

@provide_bucket_name
@unify_bucket_name_and_key
def delete_objects(
self,
Expand Down Expand Up @@ -269,6 +279,73 @@ def create_bucket(
self.log.error(e)
raise AirflowException(f"Errors when create bucket: {bucket_name}")

@provide_bucket_name
@unify_bucket_name_and_key
def append_string(self, bucket_name: Optional[str], content: str, key: str, pos: int) -> None:
"""
Append string to a remote existing file

:param bucket_name: the name of the bucket
:param content: content to be appended
:param key: oss bucket key
:param pos: position of the existing file where the content will be appended
"""
self.log.info("Write oss bucket. key: %s, pos: %s", key, pos)
try:
self.get_bucket(bucket_name).append_object(key, pos, content)
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when append string for object: {key}")

@provide_bucket_name
@unify_bucket_name_and_key
def read_key(self, bucket_name: Optional[str], key: str) -> str:
"""
Read oss remote object content with the specified key

:param bucket_name: the name of the bucket
:param key: oss bucket key
"""
self.log.info("Read oss key: %s", key)
try:
return self.get_bucket(bucket_name).get_object(key).read().decode("utf-8")
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when read bucket object: {key}")

@provide_bucket_name
@unify_bucket_name_and_key
def head_key(self, bucket_name: Optional[str], key: str) -> oss2.models.HeadObjectResult:
"""
Get meta info of the specified remote object

:param bucket_name: the name of the bucket
:param key: oss bucket key
"""
self.log.info("Head Object oss key: %s", key)
try:
return self.get_bucket(bucket_name).head_object(key)
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when head bucket object: {key}")

@provide_bucket_name
@unify_bucket_name_and_key
def key_exist(self, bucket_name: Optional[str], key: str) -> bool:
"""
Find out whether the specified key exists in the oss remote storage

:param bucket_name: the name of the bucket
:param key: oss bucket key
"""
# full_path = None
self.log.info('Looking up oss bucket %s for bucket key %s ...', bucket_name, key)
try:
return self.get_bucket(bucket_name).object_exists(key)
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when check bucket object existence: {key}")

def get_credential(self) -> oss2.auth.Auth:
extra_config = self.oss_conn.extra_dejson
auth_type = extra_config.get('auth_type', None)
Expand All @@ -285,3 +362,18 @@ def get_credential(self) -> oss2.auth.Auth:
return oss2.Auth(oss_access_key_id, oss_access_key_secret)
else:
raise Exception("Unsupported auth_type: " + auth_type)

def get_default_region(self) -> Optional[str]:
extra_config = self.oss_conn.extra_dejson
auth_type = extra_config.get('auth_type', None)
if not auth_type:
raise Exception("No auth_type specified in extra_config. ")

if auth_type == 'AK':
default_region = extra_config.get('region', None)
if not default_region:
raise Exception("No region is specified for connection: " + self.oss_conn_id)
else:
raise Exception("Unsupported auth_type: " + auth_type)

return default_region
16 changes: 16 additions & 0 deletions airflow/providers/alibaba/cloud/log/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
186 changes: 186 additions & 0 deletions airflow/providers/alibaba/cloud/log/oss_task_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import sys

if sys.version_info >= (3, 8):
from functools import cached_property
else:
from cached_property import cached_property

from airflow.configuration import conf
from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin


class OSSTaskHandler(FileTaskHandler, LoggingMixin):
"""
OSSTaskHandler is a python log handler that handles and reads
task instance logs. It extends airflow FileTaskHandler and
uploads to and reads from OSS remote storage.
"""

def __init__(self, base_log_folder, oss_log_folder, filename_template):
self.log.info("Using oss_task_handler for remote logging...")
super().__init__(base_log_folder, filename_template)
(self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder)
self.log_relative_path = ''
self._hook = None
self.closed = False
self.upload_on_close = True

@cached_property
def hook(self):
remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
self.log.info("remote_conn_id: %s", remote_conn_id)
try:
return OSSHook(oss_conn_id=remote_conn_id)
except Exception as e:
self.log.error(e, exc_info=True)
self.log.error(
'Could not create an OSSHook with connection id "%s". '
'Please make sure that airflow[oss] is installed and '
'the OSS connection exists.',
remote_conn_id,
)

def set_context(self, ti):
super().set_context(ti)
# Local location and remote location is needed to open and
# upload local log file to OSS remote storage.
self.log_relative_path = self._render_filename(ti, ti.try_number)
self.upload_on_close = not ti.raw

# Clear the file first so that duplicate data is not uploaded
# when re-using the same path (e.g. with rescheduled sensors)
if self.upload_on_close:
with open(self.handler.baseFilename, 'w'):
pass

def close(self):
"""Close and upload local log file to remote storage OSS."""
# When application exit, system shuts down all handlers by
# calling close method. Here we check if logger is already
# closed to prevent uploading the log to remote storage multiple
# times when `logging.shutdown` is called.
if self.closed:
return

super().close()

if not self.upload_on_close:
return

local_loc = os.path.join(self.local_base, self.log_relative_path)
remote_loc = self.log_relative_path
if os.path.exists(local_loc):
# read log and remove old logs to get just the latest additions
with open(local_loc) as logfile:
log = logfile.read()
self.oss_write(log, remote_loc)

# Mark closed so we don't double write if close is called twice
self.closed = True

def _read(self, ti, try_number, metadata=None):
"""
Read logs of given task instance and try_number from OSS remote storage.
If failed, read the log from task instance host machine.

:param ti: task instance object
:param try_number: task instance try_number to read logs from
:param metadata: log metadata,
can be used for steaming log reading and auto-tailing.
"""
# Explicitly getting log relative path is necessary as the given
# task instance might be different than task instance passed in
# in set_context method.
log_relative_path = self._render_filename(ti, try_number)
remote_loc = log_relative_path

if self.oss_log_exists(remote_loc):
# If OSS remote file exists, we do not fetch logs from task instance
# local machine even if there are errors reading remote logs, as
# returned remote_log will contain error messages.
remote_log = self.oss_read(remote_loc, return_error=True)
log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n'
return log, {'end_of_log': True}
else:
return super()._read(ti, try_number)

def oss_log_exists(self, remote_log_location):
"""
Check if remote_log_location exists in remote storage

:param remote_log_location: log's location in remote storage
:return: True if location exists else False
"""
oss_remote_log_location = self.base_folder + '/' + remote_log_location
try:
return self.hook.key_exist(self.bucket_name, oss_remote_log_location)
except Exception:
pass
return False

def oss_read(self, remote_log_location, return_error=False):
"""
Returns the log found at the remote_log_location. Returns '' if no
logs are found or there is an error.

:param remote_log_location: the log's location in remote storage
:param return_error: if True, returns a string error message if an
error occurs. Otherwise returns '' when an error occurs.
"""
try:
oss_remote_log_location = self.base_folder + '/' + remote_log_location
self.log.info("read remote log: %s", oss_remote_log_location)
return self.hook.read_key(self.bucket_name, oss_remote_log_location)
except Exception:
msg = f'Could not read logs from {oss_remote_log_location}'
self.log.exception(msg)
# return error if needed
if return_error:
return msg

def oss_write(self, log, remote_log_location, append=True):
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.

:param log: the log to write to the remote_log_location
:param remote_log_location: the log's location in remote storage
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
"""
oss_remote_log_location = self.base_folder + '/' + remote_log_location
pos = 0
if append and self.oss_log_exists(oss_remote_log_location):
head = self.hook.head_key(self.bucket_name, oss_remote_log_location)
pos = head.content_length
self.log.info("log write pos is: %s", str(pos))
try:
self.log.info("writing remote log: %s", oss_remote_log_location)
self.hook.append_string(self.bucket_name, log, oss_remote_log_location, pos)
except Exception:
self.log.exception(
'Could not write logs to %s, log write pos is: %s, Append is %s',
oss_remote_log_location,
str(pos),
str(append),
)
3 changes: 3 additions & 0 deletions airflow/providers/alibaba/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@ hook-class-names: # deprecated - to be removed after providers add dependency o
connection-types:
- hook-class-name: airflow.providers.alibaba.cloud.hooks.oss.OSSHook
connection-type: oss

logging:
- airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler
3 changes: 2 additions & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ def create_default_connections(session: Session = NEW_SESSION):
extra='''{
"auth_type": "AK",
"access_key_id": "<ACCESS_KEY_ID>",
"access_key_secret": "<ACCESS_KEY_SECRET>"}
"access_key_secret": "<ACCESS_KEY_SECRET>",
"region": "<YOUR_OSS_REGION>"}
''',
),
session,
Expand Down
Loading