Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
from unit.elasticsearch.log.elasticmock import elasticmock
from unit.elasticsearch.log.elasticmock.utilities import SearchFailedException

pytestmark = pytest.mark.db_test

ES_PROVIDER_YAML_FILE = AIRFLOW_PROVIDERS_ROOT_PATH / "elasticsearch" / "provider.yaml"


Expand Down Expand Up @@ -202,6 +200,7 @@ def test_client_with_patterns(self):
)
assert handler.index_patterns == patterns

@pytest.mark.db_test
def test_read(self, ti):
ts = pendulum.now()
logs, metadatas = self.es_task_handler.read(
Expand All @@ -227,6 +226,7 @@ def test_read(self, ti):
assert metadata["offset"] == "1"
assert timezone.parse(metadata["last_log_timestamp"]) > ts

@pytest.mark.db_test
def test_read_with_patterns(self, ti):
ts = pendulum.now()
with mock.patch.object(self.es_task_handler, "index_patterns", new="test_*,other_*"):
Expand All @@ -253,6 +253,7 @@ def test_read_with_patterns(self, ti):
assert metadata["offset"] == "1"
assert timezone.parse(metadata["last_log_timestamp"]) > ts

@pytest.mark.db_test
def test_read_with_patterns_no_match(self, ti):
ts = pendulum.now()
with mock.patch.object(self.es_task_handler, "index_patterns", new="test_other_*,test_another_*"):
Expand All @@ -276,6 +277,7 @@ def test_read_with_patterns_no_match(self, ti):
# last_log_timestamp won't change if no log lines read.
assert timezone.parse(metadata["last_log_timestamp"]) == ts

@pytest.mark.db_test
def test_read_with_missing_index(self, ti):
ts = pendulum.now()
with mock.patch.object(self.es_task_handler, "index_patterns", new="nonexistent,test_*"):
Expand All @@ -286,6 +288,7 @@ def test_read_with_missing_index(self, ti):
{"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False},
)

@pytest.mark.db_test
@pytest.mark.parametrize("seconds", [3, 6])
def test_read_missing_logs(self, seconds, create_task_instance):
"""
Expand Down Expand Up @@ -330,6 +333,7 @@ def test_read_missing_logs(self, seconds, create_task_instance):
assert metadatas[0]["offset"] == "0"
assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts

@pytest.mark.db_test
def test_read_with_match_phrase_query(self, ti):
similar_log_id = (
f"{TestElasticsearchTaskHandler.TASK_ID}-"
Expand Down Expand Up @@ -374,6 +378,7 @@ def test_read_with_match_phrase_query(self, ti):
assert metadata["offset"] == "1"
assert timezone.parse(metadata["last_log_timestamp"]) > ts

@pytest.mark.db_test
def test_read_with_none_metadata(self, ti):
logs, metadatas = self.es_task_handler.read(ti, 1)
if AIRFLOW_V_3_0_PLUS:
Expand All @@ -395,6 +400,7 @@ def test_read_with_none_metadata(self, ti):
assert metadata["offset"] == "1"
assert timezone.parse(metadata["last_log_timestamp"]) < pendulum.now()

@pytest.mark.db_test
def test_read_nonexistent_log(self, ti):
ts = pendulum.now()
# In ElasticMock, search is going to return all documents with matching index
Expand All @@ -420,6 +426,7 @@ def test_read_nonexistent_log(self, ti):
# last_log_timestamp won't change if no log lines read.
assert timezone.parse(metadata["last_log_timestamp"]) == ts

@pytest.mark.db_test
def test_read_with_empty_metadata(self, ti):
ts = pendulum.now()
logs, metadatas = self.es_task_handler.read(ti, 1, {})
Expand Down Expand Up @@ -466,6 +473,7 @@ def test_read_with_empty_metadata(self, ti):
# if not last_log_timestamp is provided.
assert timezone.parse(metadata["last_log_timestamp"]) > ts

@pytest.mark.db_test
def test_read_timeout(self, ti):
ts = pendulum.now().subtract(minutes=5)

Expand Down Expand Up @@ -498,6 +506,7 @@ def test_read_timeout(self, ti):
assert str(offset) == metadata["offset"]
assert timezone.parse(metadata["last_log_timestamp"]) == ts

@pytest.mark.db_test
def test_read_as_download_logs(self, ti):
ts = pendulum.now()
logs, metadatas = self.es_task_handler.read(
Expand Down Expand Up @@ -529,6 +538,7 @@ def test_read_as_download_logs(self, ti):
assert metadata["offset"] == "1"
assert timezone.parse(metadata["last_log_timestamp"]) > ts

@pytest.mark.db_test
def test_read_raises(self, ti):
with mock.patch.object(self.es_task_handler.log, "exception") as mock_exception:
with mock.patch.object(self.es_task_handler.client, "search") as mock_execute:
Expand All @@ -552,17 +562,20 @@ def test_read_raises(self, ti):
assert metadata["offset"] == "0"
assert not metadata["end_of_log"]

@pytest.mark.db_test
def test_set_context(self, ti):
self.es_task_handler.set_context(ti)
assert self.es_task_handler.mark_end_on_close

@pytest.mark.db_test
def test_set_context_w_json_format_and_write_stdout(self, ti):
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
self.es_task_handler.formatter = formatter
self.es_task_handler.write_stdout = True
self.es_task_handler.json_format = True
self.es_task_handler.set_context(ti)

@pytest.mark.db_test
def test_read_with_json_format(self, ti):
ts = pendulum.now()
formatter = logging.Formatter(
Expand Down Expand Up @@ -592,6 +605,7 @@ def test_read_with_json_format(self, ti):
else:
assert logs[0][0][1] == expected_message

@pytest.mark.db_test
def test_read_with_json_format_with_custom_offset_and_host_fields(self, ti):
ts = pendulum.now()
formatter = logging.Formatter(
Expand Down Expand Up @@ -624,6 +638,7 @@ def test_read_with_json_format_with_custom_offset_and_host_fields(self, ti):
else:
assert logs[0][0][1] == expected_message

@pytest.mark.db_test
def test_read_with_custom_offset_and_host_fields(self, ti):
ts = pendulum.now()
# Delete the existing log entry as it doesn't have the new offset and host fields
Expand All @@ -648,6 +663,7 @@ def test_read_with_custom_offset_and_host_fields(self, ti):
else:
assert self.test_message == logs[0][0][1]

@pytest.mark.db_test
def test_close(self, ti):
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
self.es_task_handler.formatter = formatter
Expand All @@ -664,6 +680,7 @@ def test_close(self, ti):
assert log_line.endswith(self.end_of_log_mark.strip())
assert self.es_task_handler.closed

@pytest.mark.db_test
def test_close_no_mark_end(self, ti):
ti.raw = True
self.es_task_handler.set_context(ti)
Expand All @@ -674,6 +691,7 @@ def test_close_no_mark_end(self, ti):
assert self.end_of_log_mark not in log_file.read()
assert self.es_task_handler.closed

@pytest.mark.db_test
def test_close_closed(self, ti):
self.es_task_handler.closed = True
self.es_task_handler.set_context(ti)
Expand All @@ -683,6 +701,7 @@ def test_close_closed(self, ti):
) as log_file:
assert len(log_file.read()) == 0

@pytest.mark.db_test
def test_close_with_no_handler(self, ti):
self.es_task_handler.set_context(ti)
self.es_task_handler.handler = None
Expand All @@ -693,6 +712,7 @@ def test_close_with_no_handler(self, ti):
assert len(log_file.read()) == 0
assert self.es_task_handler.closed

@pytest.mark.db_test
def test_close_with_no_stream(self, ti):
self.es_task_handler.set_context(ti)
self.es_task_handler.handler.stream = None
Expand All @@ -712,6 +732,7 @@ def test_close_with_no_stream(self, ti):
assert self.end_of_log_mark in log_file.read()
assert self.es_task_handler.closed

@pytest.mark.db_test
def test_render_log_id(self, ti):
assert self.es_task_handler._render_log_id(ti, 1) == self.LOG_ID

Expand All @@ -722,6 +743,7 @@ def test_clean_date(self):
clean_logical_date = self.es_task_handler._clean_date(datetime(2016, 7, 8, 9, 10, 11, 12))
assert clean_logical_date == "2016_07_08T09_10_11_000012"

@pytest.mark.db_test
@pytest.mark.parametrize(
"json_format, es_frontend, expected_url",
[
Expand Down Expand Up @@ -781,6 +803,7 @@ def test_supports_external_link(self, frontend, expected):
self.es_task_handler.frontend = frontend
assert self.es_task_handler.supports_external_link == expected

@pytest.mark.db_test
@mock.patch("sys.__stdout__", new_callable=StringIO)
def test_dynamic_offset(self, stdout_mock, ti, time_machine):
# arrange
Expand Down Expand Up @@ -843,6 +866,7 @@ def test_filename_template_for_backward_compatibility(self):
filename_template=None,
)

@pytest.mark.db_test
def test_write_to_es(self, ti):
self.es_task_handler.write_to_es = True
self.es_task_handler.json_format = True
Expand Down