Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
# not exist, the task handler should use the log_id_template attribute instead.
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")

TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"]

VALID_ES_CONFIG_KEYS = set(inspect.signature(elasticsearch.Elasticsearch.__init__).parameters.keys())
# Remove `self` from the valid set of kwargs
Expand Down Expand Up @@ -160,11 +161,11 @@ def __init__(
es_kwargs = es_kwargs or {}
if es_kwargs == "default_es_kwargs":
es_kwargs = get_es_kwargs_from_config()
host = self.format_url(host)
self.host = self.format_url(host)
super().__init__(base_log_folder)
self.closed = False

self.client = elasticsearch.Elasticsearch(host, **es_kwargs)
self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
# in airflow.cfg, host of elasticsearch has to be http://dockerhostXxxx:9200

self.frontend = frontend
Expand Down Expand Up @@ -287,7 +288,7 @@ def _clean_date(value: datetime | None) -> str:
def _group_logs_by_host(self, response: ElasticSearchResponse) -> dict[str, list[Hit]]:
grouped_logs = defaultdict(list)
for hit in response:
key = getattr_nested(hit, self.host_field, None) or "default_host"
key = getattr_nested(hit, self.host_field, None) or self.host
grouped_logs[key].append(hit)
return grouped_logs

Expand Down Expand Up @@ -382,8 +383,13 @@ def concat_logs(hits: list[Hit]) -> str:
StructuredLogMessage(event="::endgroup::"),
]

# Flatten all hits, filter to only desired fields, and construct StructuredLogMessage objects
message = header + [
StructuredLogMessage(event=concat_logs(hits)) for hits in logs_by_host.values()
StructuredLogMessage(
**{k: v for k, v in hit.to_dict().items() if k.lower() in TASK_LOG_FIELDS}
)
for hits in logs_by_host.values()
for hit in hits
]
else:
message = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ def setup_method(self, method):
self.index_name = "test_index"
self.doc_type = "log"
self.test_message = "some random stuff"
self.body = {"message": self.test_message, "log_id": self.LOG_ID, "offset": 1}
self.body = {
"message": self.test_message,
"log_id": self.LOG_ID,
"offset": 1,
"event": self.test_message,
}
self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=1)

def teardown_method(self):
Expand Down Expand Up @@ -210,7 +215,7 @@ def test_read(self, ti):
if AIRFLOW_V_3_0_PLUS:
logs = list(logs)
assert logs[0].event == "::group::Log message source details"
assert logs[0].sources == ["default_host"]
assert logs[0].sources == ["http://localhost:9200"]
assert logs[1].event == "::endgroup::"
assert logs[2].event == "some random stuff"

Expand Down Expand Up @@ -238,7 +243,7 @@ def test_read_with_patterns(self, ti):
if AIRFLOW_V_3_0_PLUS:
logs = list(logs)
assert logs[0].event == "::group::Log message source details"
assert logs[0].sources == ["default_host"]
assert logs[0].sources == ["http://localhost:9200"]
assert logs[1].event == "::endgroup::"
assert logs[2].event == "some random stuff"

Expand Down Expand Up @@ -365,7 +370,7 @@ def test_read_with_match_phrase_query(self, ti):
if AIRFLOW_V_3_0_PLUS:
logs = list(logs)
assert logs[0].event == "::group::Log message source details"
assert logs[0].sources == ["default_host"]
assert logs[0].sources == ["http://localhost:9200"]
assert logs[1].event == "::endgroup::"
assert logs[2].event == "some random stuff"

Expand All @@ -388,7 +393,7 @@ def test_read_with_none_metadata(self, ti):
if AIRFLOW_V_3_0_PLUS:
logs = list(logs)
assert logs[0].event == "::group::Log message source details"
assert logs[0].sources == ["default_host"]
assert logs[0].sources == ["http://localhost:9200"]
assert logs[1].event == "::endgroup::"
assert logs[2].event == "some random stuff"

Expand Down Expand Up @@ -438,7 +443,7 @@ def test_read_with_empty_metadata(self, ti):
if AIRFLOW_V_3_0_PLUS:
logs = list(logs)
assert logs[0].event == "::group::Log message source details"
assert logs[0].sources == ["default_host"]
assert logs[0].sources == ["http://localhost:9200"]
assert logs[1].event == "::endgroup::"
assert logs[2].event == "some random stuff"

Expand Down Expand Up @@ -528,7 +533,7 @@ def test_read_as_download_logs(self, ti):
if AIRFLOW_V_3_0_PLUS:
logs = list(logs)
assert logs[0].event == "::group::Log message source details"
assert logs[0].sources == ["default_host"]
assert logs[0].sources == ["http://localhost:9200"]
assert logs[1].event == "::endgroup::"
assert logs[2].event == "some random stuff"

Expand Down Expand Up @@ -593,6 +598,7 @@ def test_read_with_json_format(self, ti):

self.body = {
"message": self.test_message,
"event": self.test_message,
"log_id": f"{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1",
"offset": 1,
"asctime": "2020-12-24 19:25:00,962",
Expand All @@ -606,12 +612,13 @@ def test_read_with_json_format(self, ti):
logs, _ = self.es_task_handler.read(
ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}
)
expected_message = "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - "
if AIRFLOW_V_3_0_PLUS:
logs = list(logs)
assert logs[2].event == expected_message
assert logs[2].event == self.test_message
else:
assert logs[0][0][1] == expected_message
assert (
logs[0][0][1] == "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - "
)

@pytest.mark.db_test
def test_read_with_json_format_with_custom_offset_and_host_fields(self, ti):
Expand All @@ -626,6 +633,7 @@ def test_read_with_json_format_with_custom_offset_and_host_fields(self, ti):

self.body = {
"message": self.test_message,
"event": self.test_message,
"log_id": f"{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1",
"log": {"offset": 1},
"host": {"name": "somehostname"},
Expand All @@ -640,12 +648,13 @@ def test_read_with_json_format_with_custom_offset_and_host_fields(self, ti):
logs, _ = self.es_task_handler.read(
ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}
)
expected_message = "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - "
if AIRFLOW_V_3_0_PLUS:
logs = list(logs)
assert logs[2].event == expected_message
assert logs[2].event == self.test_message
else:
assert logs[0][0][1] == expected_message
assert (
logs[0][0][1] == "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - "
)

@pytest.mark.db_test
def test_read_with_custom_offset_and_host_fields(self, ti):
Expand All @@ -658,6 +667,7 @@ def test_read_with_custom_offset_and_host_fields(self, ti):

self.body = {
"message": self.test_message,
"event": self.test_message,
"log_id": self.LOG_ID,
"log": {"offset": 1},
"host": {"name": "somehostname"},
Expand All @@ -670,7 +680,7 @@ def test_read_with_custom_offset_and_host_fields(self, ti):
if AIRFLOW_V_3_0_PLUS:
pass
else:
assert self.test_message == logs[0][0][1]
assert logs[0][0][1] == "some random stuff"

@pytest.mark.db_test
def test_close(self, ti):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from datetime import datetime
from operator import attrgetter
from typing import TYPE_CHECKING, Any, Literal, cast
from urllib.parse import urlparse

import pendulum
from opensearchpy import OpenSearch
Expand Down Expand Up @@ -57,6 +58,7 @@

USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""}
TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"]


def getattr_nested(obj, item, default):
Expand Down Expand Up @@ -174,6 +176,7 @@ def __init__(
self.write_stdout = write_stdout
self.json_format = json_format
self.json_fields = [label.strip() for label in json_fields.split(",")]
self.host = self.format_url(host)
self.host_field = host_field
self.offset_field = offset_field
self.index_patterns = index_patterns
Expand All @@ -184,7 +187,6 @@ def __init__(
http_auth=(username, password),
**os_kwargs,
)
# client = OpenSearch(hosts=[{"host": host, "port": port}], http_auth=(username, password), use_ssl=True, verify_certs=True, ca_cert="/opt/airflow/root-ca.pem", ssl_assert_hostname = False, ssl_show_warn = False)
self.formatter: logging.Formatter
self.handler: logging.FileHandler | logging.StreamHandler
self._doc_type_map: dict[Any, Any] = {}
Expand Down Expand Up @@ -425,8 +427,13 @@ def concat_logs(hits: list[Hit]):
StructuredLogMessage(event="::endgroup::"),
]

# Flatten all hits, filter to only desired fields, and construct StructuredLogMessage objects
message = header + [
StructuredLogMessage(event=concat_logs(hits)) for hits in logs_by_host.values()
StructuredLogMessage(
**{k: v for k, v in hit.to_dict().items() if k.lower() in TASK_LOG_FIELDS}
)
for hits in logs_by_host.values()
for hit in hits
]
else:
message = [(host, concat_logs(hits)) for host, hits in logs_by_host.items()] # type: ignore[misc]
Expand Down Expand Up @@ -583,7 +590,7 @@ def _resolve_nested(self, hit: dict[Any, Any], parent_class=None) -> type[Hit]:
def _group_logs_by_host(self, response: OpensearchResponse) -> dict[str, list[Hit]]:
grouped_logs = defaultdict(list)
for hit in response:
key = getattr_nested(hit, self.host_field, None) or "default_host"
key = getattr_nested(hit, self.host_field, None) or self.host
grouped_logs[key].append(hit)
return grouped_logs

Expand Down Expand Up @@ -621,3 +628,21 @@ def get_external_log_url(self, task_instance, try_number) -> str:
def log_name(self) -> str:
"""The log name."""
return self.LOG_NAME

@staticmethod
def format_url(host: str) -> str:
"""
Format the given host string to ensure it starts with 'http' and check if it represents a valid URL.

:params host: The host string to format and check.
"""
parsed_url = urlparse(host)

if parsed_url.scheme not in ("http", "https"):
host = "http://" + host
parsed_url = urlparse(host)

if not parsed_url.netloc:
raise ValueError(f"'{host}' is not a valid URL.")

return host
83 changes: 6 additions & 77 deletions providers/opensearch/tests/unit/opensearch/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,100 +63,29 @@ def sample_log_response(self):
"hits": [
{
"_id": "jdeZT4kBjAZqZnexVUxk",
"_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
"_score": 2.482621,
"_source": {
"@timestamp": "2023-07-13T14:13:15.140Z",
"asctime": "2023-07-09T07:47:43.907+0000",
"container": {"id": "airflow"},
"dag_id": "example_bash_operator",
"ecs": {"version": "8.0.0"},
"execution_date": "2023_07_09T07_47_32_000000",
"filename": "taskinstance.py",
"input": {"type": "log"},
"levelname": "INFO",
"lineno": 1144,
"log": {
"file": {
"path": "/opt/airflow/Documents/GitHub/airflow/logs/"
"dag_id=example_bash_operator'"
"/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
},
"offset": 0,
},
"offset": 1688888863907337472,
"log_id": "example_bash_operator-run_after_loop-owen_run_run--1-1",
"message": "Dependencies all met for "
"dep_context=non-requeueable deps "
"ti=<TaskInstance: "
"example_bash_operator.run_after_loop "
"owen_run_run [queued]>",
"message": "Some Message 1",
"event": "Some Message 1",
"task_id": "run_after_loop",
"try_number": "1",
"offset": 0,
},
"_type": "_doc",
},
{
"_id": "qteZT4kBjAZqZnexVUxl",
"_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
"_score": 2.482621,
"_source": {
"@timestamp": "2023-07-13T14:13:15.141Z",
"asctime": "2023-07-09T07:47:43.917+0000",
"container": {"id": "airflow"},
"dag_id": "example_bash_operator",
"ecs": {"version": "8.0.0"},
"execution_date": "2023_07_09T07_47_32_000000",
"filename": "taskinstance.py",
"input": {"type": "log"},
"levelname": "INFO",
"lineno": 1347,
"log": {
"file": {
"path": "/opt/airflow/Documents/GitHub/airflow/logs/"
"dag_id=example_bash_operator"
"/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
},
"offset": 988,
},
"offset": 1688888863917961216,
"log_id": "example_bash_operator-run_after_loop-owen_run_run--1-1",
"message": "Starting attempt 1 of 1",
"task_id": "run_after_loop",
"try_number": "1",
},
"_type": "_doc",
},
{
"_id": "v9eZT4kBjAZqZnexVUx2",
"_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
"_score": 2.482621,
"_source": {
"@timestamp": "2023-07-13T14:13:15.143Z",
"asctime": "2023-07-09T07:47:43.928+0000",
"container": {"id": "airflow"},
"dag_id": "example_bash_operator",
"ecs": {"version": "8.0.0"},
"execution_date": "2023_07_09T07_47_32_000000",
"filename": "taskinstance.py",
"input": {"type": "log"},
"levelname": "INFO",
"lineno": 1368,
"log": {
"file": {
"path": "/opt/airflow/Documents/GitHub/airflow/logs/"
"dag_id=example_bash_operator"
"/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
},
"offset": 1372,
},
"offset": 1688888863928218880,
"log_id": "example_bash_operator-run_after_loop-owen_run_run--1-1",
"message": "Executing <Task(BashOperator): "
"run_after_loop> on 2023-07-09 "
"07:47:32+00:00",
"message": "Another Some Message 2",
"event": "Another Some Message 2",
"task_id": "run_after_loop",
"try_number": "1",
"offset": 1,
},
"_type": "_doc",
},
Expand Down
Loading