Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,26 +345,24 @@ def _es_read(self, log_id: str, offset: int | str) -> ElasticSearchResponse | No
:meta private:
"""
query: dict[Any, Any] = {
"query": {
"bool": {
"filter": [{"range": {self.offset_field: {"gt": int(offset)}}}],
"must": [{"match_phrase": {"log_id": log_id}}],
}
"bool": {
"filter": [{"range": {self.offset_field: {"gt": int(offset)}}}],
"must": [{"match_phrase": {"log_id": log_id}}],
}
}

try:
max_log_line = self.client.count(index=self.index_patterns, body=query)["count"] # type: ignore
max_log_line = self.client.count(index=self.index_patterns, query=query)["count"] # type: ignore
except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist", self.index_patterns)
raise e

if max_log_line != 0:
try:
query.update({"sort": [self.offset_field]})
res = self.client.search( # type: ignore
res = self.client.search(
index=self.index_patterns,
body=query,
query=query,
sort=[self.offset_field],
size=self.MAX_LINE_PER_PAGE,
from_=self.MAX_LINE_PER_PAGE * self.PAGE,
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/elasticsearch/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ versions:
dependencies:
- apache-airflow>=2.5.0
- apache-airflow-providers-common-sql>=1.3.1
- elasticsearch>8,<9
- elasticsearch>=8.10,<9

integrations:
- integration-name: Elasticsearch
Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@
"deps": [
"apache-airflow-providers-common-sql>=1.3.1",
"apache-airflow>=2.5.0",
"elasticsearch>8,<9"
"elasticsearch>=8.10,<9"
],
"cross-providers-deps": [
"common.sql"
Expand Down
24 changes: 12 additions & 12 deletions tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,8 @@ def get_source(self, index, doc_type, id, params=None):
"track_scores",
"version",
)
def count(self, index=None, doc_type=None, body=None, params=None, headers=None):
searchable_indexes = self._normalize_index_to_list(index, body)
def count(self, index=None, doc_type=None, query=None, params=None, headers=None):
searchable_indexes = self._normalize_index_to_list(index, query=query)
searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
i = 0
for searchable_index in searchable_indexes:
Expand Down Expand Up @@ -372,10 +372,10 @@ def count(self, index=None, doc_type=None, body=None, params=None, headers=None)
"track_scores",
"version",
)
def search(self, index=None, doc_type=None, body=None, params=None, headers=None):
searchable_indexes = self._normalize_index_to_list(index, body)
def search(self, index=None, doc_type=None, query=None, params=None, headers=None):
searchable_indexes = self._normalize_index_to_list(index, query=query)

matches = self._find_match(index, doc_type, body)
matches = self._find_match(index, doc_type, query=query)

result = {
"hits": {"total": len(matches), "max_score": 1.0},
Expand Down Expand Up @@ -442,11 +442,11 @@ def suggest(self, body, index=None):
]
return result_dict

def _find_match(self, index, doc_type, body):
searchable_indexes = self._normalize_index_to_list(index, body)
def _find_match(self, index, doc_type, query):
searchable_indexes = self._normalize_index_to_list(index, query=query)
searchable_doc_types = self._normalize_doc_type_to_list(doc_type)

must = body["query"]["bool"]["must"][0] # only support one must
must = query["bool"]["must"][0] # only support one must

matches = []
for searchable_index in searchable_indexes:
Expand All @@ -472,7 +472,7 @@ def match_must_phrase(document, matches, must):
matches.append(document)

# Check index(es) exists.
def _validate_search_targets(self, targets, body):
def _validate_search_targets(self, targets, query):
# TODO: support allow_no_indices query parameter
matches = set()
for target in targets:
Expand All @@ -482,10 +482,10 @@ def _validate_search_targets(self, targets, body):
elif "*" in target:
matches.update(fnmatch.filter(self.__documents_dict, target))
elif target not in self.__documents_dict:
raise MissingIndexException(msg=f"IndexMissingException[[{target}] missing]", body=body)
raise MissingIndexException(msg=f"IndexMissingException[[{target}] missing]", query=query)
return matches

def _normalize_index_to_list(self, index, body):
def _normalize_index_to_list(self, index, query):
# Ensure to have a list of index
if index is None:
searchable_indexes = self.__documents_dict.keys()
Expand All @@ -498,7 +498,7 @@ def _normalize_index_to_list(self, index, body):
raise ValueError("Invalid param 'index'")

generator = (target for index in searchable_indexes for target in index.split(","))
return list(self._validate_search_targets(generator, body))
return list(self._validate_search_targets(generator, query=query))

@staticmethod
def _normalize_doc_type_to_list(doc_type):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ def _escape(value):
class MissingIndexException(NotFoundError):
"""Exception representing a missing index."""

def __init__(self, msg, body):
def __init__(self, msg, query):
self.msg = msg
self.body = body
self.query = query

def __str__(self):
return f"IndexMissingException[[{self.msg}] missing] with body {self.body}"
return f"IndexMissingException[[{self.msg}] missing] with query {self.query}"


class SearchFailedException(NotFoundError):
Expand Down