-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
Description
Discussed in #31832
Originally posted by michalc June 10, 2023
Should something like the below be in the codebase? It's a simple handler for storing Airflow task logs in Redis, enforcing a max number of entries per try, and an expiry time for the logs
Happy to raise a PR (and I guessed a lot at how things should be... so suspect can be improved upon...)
class RedisHandler(logging.Handler):
def __init__(self, client, key):
super().__init__()
self.client = client
self.key = key
def emit(self, record):
p = self.client.pipeline()
p.rpush(self.key, self.format(record))
p.ltrim(self.key, start=-10000, end=-1)
p.expire(self.key, time=60 * 60 * 24 * 28)
p.execute()
class RedisTaskHandler(FileTaskHandler, LoggingMixin):
"""
RedisTaskHandler is a python log handler that handles and reads
task instance logs. It extends airflow FileTaskHandler and
uploads to and reads from Redis.
"""
trigger_should_wrap = True
def __init__(self, base_log_folder: str, redis_url):
super().__init__(base_log_folder)
self.handler = None
self.client = redis.Redis.from_url(redis_url)
def _read(
self,
ti,
try_number,
metadata=None,
):
log_str = b"\n".join(
self.client.lrange(self._render_filename(ti, try_number), start=0, end=-1)
).decode("utf-8")
return log_str, {"end_of_log": True}
def set_context(self, ti):
super().set_context(ti)
self.handler = RedisHandler(
self.client, self._render_filename(ti, ti.try_number)
)
self.handler.setFormatter(self.formatter)
```</div>