Skip to content

Commit

Permalink
CloudwatchTaskHandler reads timestamp from Cloudwatch events (#15173)
Browse files Browse the repository at this point in the history
* format Cloudwatch timestamp using default template and utc timezone
  • Loading branch information
Pavel Hlushchanka authored May 14, 2021
1 parent 634c12d commit 1467046
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
11 changes: 10 additions & 1 deletion airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime

import watchtower

try:
Expand Down Expand Up @@ -118,10 +120,17 @@ def get_cloudwatch_logs(self, stream_name: str) -> str:
log_group=self.log_group, log_stream_name=stream_name, start_from_head=True
)
)
return '\n'.join([event['message'] for event in events])

return '\n'.join([self._event_to_str(event) for event in events])
except Exception: # pylint: disable=broad-except
msg = 'Could not read remote logs from log_group: {} log_stream: {}.'.format(
self.log_group, stream_name
)
self.log.exception(msg)
return msg

def _event_to_str(self, event: dict) -> str:
event_dt = datetime.utcfromtimestamp(event['timestamp'] / 1000.0)
formatted_event_dt = event_dt.strftime('%Y-%m-%d %H:%M:%S,%f')[:-3]
message = event['message']
return f'[{formatted_event_dt}] {message}'
32 changes: 26 additions & 6 deletions tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,21 @@ def test_write(self):
handler.handle(message)
mock_emit.assert_has_calls([call(message) for message in messages])

def test_event_to_str(self):
handler = self.cloudwatch_task_handler
events = [
{'timestamp': 1617400267123, 'message': 'First'},
{'timestamp': 1617400367456, 'message': 'Second'},
{'timestamp': 1617400467789, 'message': 'Third'},
]
assert [handler._event_to_str(event) for event in events] == (
[
'[2021-04-02 21:51:07,123] First',
'[2021-04-02 21:52:47,456] Second',
'[2021-04-02 21:54:27,789] Third',
]
)

def test_read(self):
# Confirmed via AWS Support call:
# CloudWatch events must be ordered chronologically otherwise
Expand All @@ -122,17 +137,22 @@ def test_read(self):
self.remote_log_group,
self.remote_log_stream,
[
{'timestamp': 10000, 'message': 'First'},
{'timestamp': 20000, 'message': 'Second'},
{'timestamp': 30000, 'message': 'Third'},
{'timestamp': 1617400267123, 'message': 'First'},
{'timestamp': 1617400367456, 'message': 'Second'},
{'timestamp': 1617400467789, 'message': 'Third'},
],
)

expected = (
'*** Reading remote log from Cloudwatch log_group: {} log_stream: {}.\nFirst\nSecond\nThird\n'
msg_template = '*** Reading remote log from Cloudwatch log_group: {} log_stream: {}.\n{}\n'
events = '\n'.join(
[
'[2021-04-02 21:51:07,123] First',
'[2021-04-02 21:52:47,456] Second',
'[2021-04-02 21:54:27,789] Third',
]
)
assert self.cloudwatch_task_handler.read(self.ti) == (
[[('', expected.format(self.remote_log_group, self.remote_log_stream))]],
[[('', msg_template.format(self.remote_log_group, self.remote_log_stream, events))]],
[{'end_of_log': True}],
)

Expand Down

0 comments on commit 1467046

Please sign in to comment.