Skip to content

Cannot Set Index Pattern on Elasticsearch as a Log Handler #16828

@imamdigmi

Description

@imamdigmi

Apache Airflow version: 2.0.0

Kubernetes version (if you are using kubernetes) (use kubectl version): Client Version: version.Info{Major:"1", Minor:"20", GitVersion:"v1.20.2", GitCommit:"faecb196815e248d3ecfb03c680a4507229c2a56", GitTreeState:"clean", BuildDate:"2021-01-13T13:28:09Z", GoVersion:"go1.15.5", Compiler:"gc", Platform:"linux/amd64"} Server Version: version.Info{Major:"1", Minor:"18+", GitVersion:"v1.18.8-aliyun.1", GitCommit:"94f1dc8", GitTreeState:"", BuildDate:"2021-01-10T02:57:47Z", GoVersion:"go1.13.15", Compiler:"gc", Platform:"linux/amd64"}

Environment: -

  • Cloud provider or hardware configuration: Alibaba Cloud
  • OS (e.g. from /etc/os-release): Debian GNU/Linux 10 (buster)
  • Kernel (e.g. uname -a): Linux airflow-webserver-fb89b7f8b-fgzvv 3.10.0-1160.11.1.el7.x86_64 #1 SMP Fri Dec 18 16:34:56 UTC 2020 x86_64 GNU/Linux
  • Install tools: Helm (Custom)
  • Others: None

What happened:
My Airflow use fluent-bit to catch the stdout logs from airflow containers and then send the logs messages to Elasticsearch in a remote machine and it works well, I can see the logs through Kibana. But the Airflow cannot display the logs, because an error:

ERROR - Exception on /get_logs_with_metadata [GET]
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)  
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/auth.py", line 34, in decorated
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/decorators.py", line 60, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/views.py", line 1054, in get_logs_with_metadata
    logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/log/log_reader.py", line 58, in read_log_chunks
    logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 217, in read
    log, metadata = self._read(task_instance, try_number_element, metadata)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 160, in _read
    logs = self.es_read(log_id, offset, metadata)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 233, in es_read
    max_log_line = search.count()
  File "/home/airflow/.local/lib/python3.8/site-packages/elasticsearch_dsl/search.py", line 701, in count
    return es.count(index=self._index, body=d, **self._params)["count"]
  File "/home/airflow/.local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line 84, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line 528, in count
    return self.transport.perform_request(
  File "/home/airflow/.local/lib/python3.8/site-packages/elasticsearch/transport.py", line 351, in perform_request
    status, headers_response, data = connection.perform_request(
  File "/home/airflow/.local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py", line 261, in perform_request
    self._raise_error(response.status, raw_data)
  File "/home/airflow/.local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line 181, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
elasticsearch.exceptions.AuthorizationException: AuthorizationException(403, 'security_exception', 'no permissions for [indices:data/read/search] and User [name=airflow, backend_roles=[], request

but when I debug and use this code, I can see the logs:

es = elasticsearch.Elasticsearch(['...'], **es_kwargs)
es.search(index="airflow-*", body=dsl)

and when I look into the source code of elasticsearch providers there are no definition of the index-pattern on that

search = Search(using=self.client).query('match_phrase', log_id=log_id).sort('offset')

so I assume the issue is insufficient permission to scan all the indices, therefore, how can I set the index-pattern so that Airflow only reads certain indices?
Thank you!

What you expected to happen: The Airflow configuration has option to add elasticsearch index pattern so that airflow only queries certain indices, not querying all indexes on the elasticsearch server

How to reproduce it: Click log button on task popup modal to see logs page

Anything else we need to know: Every time etc

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions