5353
5454import airflow
5555from airflow import settings
56- from airflow .configuration import conf
5756from airflow .jobs import BaseJob
5857from airflow .models import DAG , DagModel , DagRun , Log , SlaMiss , \
5958 TaskInstance , Variable , XCom
6059from airflow .operators .python_operator import PythonOperator
60+ from airflow .version import version as airflow_version
61+
6162import dateutil .parser
6263from sqlalchemy import and_ , func
6364from sqlalchemy .exc import ProgrammingError
6667try :
6768 # airflow.utils.timezone is available from v1.10 onwards
6869 from airflow .utils import timezone
70+
6971 now = timezone .utcnow
7072except ImportError :
7173 now = datetime .utcnow
7981DAG_OWNER_NAME = "operations"
8082# List of email address to send email alerts to if this job fails
8183ALERT_EMAIL_ADDRESSES = []
84+ # Airflow version used by the environment in list form, value stored in
85+ # airflow_version is in format e.g "1.10.15+composer"
86+ AIRFLOW_VERSION = airflow_version [:- len ("+composer" )].split ("." )
8287# Length to retain the log files if not already provided in the conf. If this
8388# is set to 30, the job will remove those files that arE 30 days old or older.
84-
8589DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int (
8690 Variable .get ("airflow_db_cleanup__max_db_entry_age_in_days" , 30 ))
8791# Prints the database entries which will be getting deleted; set to False
139143# Check for TaskReschedule model
140144try :
141145 from airflow .models import TaskReschedule
146+
142147 DATABASE_OBJECTS .append ({
143148 "airflow_db_model" : TaskReschedule ,
144149 "age_check_column" : TaskReschedule .execution_date ,
153158# Check for TaskFail model
154159try :
155160 from airflow .models import TaskFail
161+
156162 DATABASE_OBJECTS .append ({
157163 "airflow_db_model" : TaskFail ,
158164 "age_check_column" : TaskFail .execution_date ,
164170except Exception as e :
165171 logging .error (e )
166172
167- # Check for RenderedTaskInstanceFields model
168- try :
169- from airflow .models import RenderedTaskInstanceFields
170- DATABASE_OBJECTS .append ({
171- "airflow_db_model" : RenderedTaskInstanceFields ,
172- "age_check_column" : RenderedTaskInstanceFields .execution_date ,
173- "keep_last" : False ,
174- "keep_last_filters" : None ,
175- "keep_last_group_by" : None
176- })
177-
178- except Exception as e :
179- logging .error (e )
180-
181173# Check for ImportError model
182174try :
183175 from airflow .models import ImportError
176+
184177 DATABASE_OBJECTS .append ({
185178 "airflow_db_model" : ImportError ,
186179 "age_check_column" : ImportError .timestamp ,
193186except Exception as e :
194187 logging .error (e )
195188
196- # Check for celery executor
197- airflow_executor = str (conf .get ("core" , "executor" ))
198- logging .info ("Airflow Executor: " + str (airflow_executor ))
199- if (airflow_executor == "CeleryExecutor" ):
200- logging .info ("Including Celery Modules" )
201- try :
202- from celery .backends .database .models import Task , TaskSet
203- DATABASE_OBJECTS .extend (({
204- "airflow_db_model" : Task ,
205- "age_check_column" : Task .date_done ,
206- "keep_last" : False ,
207- "keep_last_filters" : None ,
208- "keep_last_group_by" : None ,
209- "do_not_delete_by_dag_id" : True
210- }, {
211- "airflow_db_model" : TaskSet ,
212- "age_check_column" : TaskSet .date_done ,
213- "keep_last" : False ,
214- "keep_last_filters" : None ,
215- "keep_last_group_by" : None ,
216- "do_not_delete_by_dag_id" : True
217- }))
218-
219- except Exception as e :
220- logging .error (e )
221-
222- session = settings .Session ()
223-
224189default_args = {
225190 "owner" : DAG_OWNER_NAME ,
226191 "depends_on_past" : False ,
@@ -252,7 +217,7 @@ def print_configuration_function(**context):
252217 max_db_entry_age_in_days = dag_run_conf .get (
253218 "maxDBEntryAgeInDays" , None )
254219 logging .info ("maxDBEntryAgeInDays from dag_run.conf: " + str (dag_run_conf ))
255- if ( max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1 ) :
220+ if max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1 :
256221 logging .info (
257222 "maxDBEntryAgeInDays conf variable isn't included or Variable " +
258223 "value is less than 1. Using Default '" +
@@ -266,7 +231,6 @@ def print_configuration_function(**context):
266231 logging .info ("max_db_entry_age_in_days: " + str (max_db_entry_age_in_days ))
267232 logging .info ("max_date: " + str (max_date ))
268233 logging .info ("enable_delete: " + str (ENABLE_DELETE ))
269- logging .info ("session: " + str (session ))
270234 logging .info ("" )
271235
272236 logging .info ("Setting max_execution_date to XCom for Downstream Processes" )
@@ -280,7 +244,57 @@ def print_configuration_function(**context):
280244 dag = dag )
281245
282246
247+ def build_query (session , airflow_db_model , age_check_column , max_date ,
248+ keep_last , keep_last_filters = None , keep_last_group_by = None ):
249+ query = session .query (airflow_db_model ).options (
250+ load_only (age_check_column ))
251+
252+ logging .info ("INITIAL QUERY : " + str (query ))
253+
254+ if not keep_last :
255+ query = query .filter (age_check_column <= max_date , )
256+ else :
257+ subquery = session .query (func .max (DagRun .execution_date ))
258+ # workaround for MySQL "table specified twice" issue
259+ # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
260+ if keep_last_filters is not None :
261+ for entry in keep_last_filters :
262+ subquery = subquery .filter (entry )
263+
264+ logging .info ("SUB QUERY [keep_last_filters]: " + str (subquery ))
265+
266+ if keep_last_group_by is not None :
267+ subquery = subquery .group_by (keep_last_group_by )
268+ logging .info (
269+ "SUB QUERY [keep_last_group_by]: " +
270+ str (subquery ))
271+
272+ subquery = subquery .from_self ()
273+
274+ query = query .filter (
275+ and_ (age_check_column .notin_ (subquery )),
276+ and_ (age_check_column <= max_date ))
277+
278+ return query
279+
280+
281+ def print_query (query , airflow_db_model , age_check_column ):
282+ entries_to_delete = query .all ()
283+
284+ logging .info ("Query: " + str (query ))
285+ logging .info ("Process will be Deleting the following " +
286+ str (airflow_db_model .__name__ ) + "(s):" )
287+ for entry in entries_to_delete :
288+ date = str (entry .__dict__ [str (age_check_column ).split ("." )[1 ]])
289+ logging .info ("\t Entry: " + str (entry ) + ", Date: " + date )
290+
291+ logging .info ("Process will be Deleting "
292+ + str (len (entries_to_delete )) + " "
293+ + str (airflow_db_model .__name__ ) + "(s)" )
294+
295+
283296def cleanup_function (** context ):
297+ session = settings .Session ()
284298
285299 logging .info ("Retrieving max_execution_date from XCom" )
286300 max_date = context ["ti" ].xcom_pull (
@@ -310,67 +324,34 @@ def cleanup_function(**context):
310324 logging .info ("Running Cleanup Process..." )
311325
312326 try :
313- query = session .query (airflow_db_model ).options (
314- load_only (age_check_column ))
315-
316- logging .info ("INITIAL QUERY : " + str (query ))
317-
318- if keep_last :
319-
320- subquery = session .query (func .max (DagRun .execution_date ))
321- # workaround for MySQL "table specified twice" issue
322- # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
323- if keep_last_filters is not None :
324- for entry in keep_last_filters :
325- subquery = subquery .filter (entry )
326-
327- logging .info ("SUB QUERY [keep_last_filters]: " + str (subquery ))
328-
329- if keep_last_group_by is not None :
330- subquery = subquery .group_by (keep_last_group_by )
331- logging .info (
332- "SUB QUERY [keep_last_group_by]: " +
333- str (subquery ))
334-
335- subquery = subquery .from_self ()
336-
337- query = query .filter (
338- and_ (age_check_column .notin_ (subquery )),
339- and_ (age_check_column <= max_date ))
340-
341- else :
342- query = query .filter (age_check_column <= max_date ,)
343-
344- if PRINT_DELETES :
345- entries_to_delete = query .all ()
346-
347- logging .info ("Query: " + str (query ))
348- logging .info ("Process will be Deleting the following " +
349- str (airflow_db_model .__name__ ) + "(s):" )
350- for entry in entries_to_delete :
351- date = str (entry .__dict__ [str (age_check_column ).split ("." )[1 ]])
352- logging .info ("\t Entry: " + str (entry ) + ", Date: " + date )
353-
354- logging .info ("Process will be Deleting "
355- + str (len (entries_to_delete )) + " "
356- + str (airflow_db_model .__name__ ) + "(s)" )
327+ if context ["params" ].get ("do_not_delete_by_dag_id" ):
328+ query = build_query (session , airflow_db_model , age_check_column ,
329+ max_date , keep_last , keep_last_filters ,
330+ keep_last_group_by )
331+ if PRINT_DELETES :
332+ print_query (query , airflow_db_model , age_check_column )
333+ if ENABLE_DELETE :
334+ logging .info ("Performing Delete..." )
335+ query .delete (synchronize_session = False )
336+ session .commit ()
357337 else :
358- logging .warn (
359- "You've opted to skip printing the db entries to be deleted. "
360- "Set PRINT_DELETES to True to show entries!!!" )
361-
362- if ENABLE_DELETE :
363- logging .info ("Performing Delete..." )
364- if context ["params" ].get ("do_not_delete_by_dag_id" ):
365- query .filter (age_check_column <= max_date ).delete (synchronize_session = False )
338+ dags = session .query (airflow_db_model .dag_id ).distinct ()
339+ session .commit ()
340+
341+ list_dags = [str (list (dag )[0 ]) for dag in dags ]
342+ for dag in list_dags :
343+ query = build_query (session , airflow_db_model , age_check_column ,
344+ max_date , keep_last , keep_last_filters ,
345+ keep_last_group_by )
346+ query = query .filter (airflow_db_model .dag_id == dag )
347+ if PRINT_DELETES :
348+ print_query (query , airflow_db_model , age_check_column )
349+ if ENABLE_DELETE :
350+ logging .info ("Performing Delete..." )
351+ query .delete (synchronize_session = False )
366352 session .commit ()
367- else :
368- dags = session .query (airflow_db_model .dag_id ).distinct ()
369- list_dags = [str (list (dag )[0 ]) for dag in dags ]
370- for dag in list_dags :
371- query .filter (age_check_column <= max_date ).filter (airflow_db_model .dag_id == dag ).delete (synchronize_session = False )
372- session .commit ()
373- else :
353+
354+ if not ENABLE_DELETE :
374355 logging .warn ("You've opted to skip deleting the db entries. "
375356 "Set ENABLE_DELETE to True to delete entries!!!" )
376357
@@ -379,12 +360,13 @@ def cleanup_function(**context):
379360 except ProgrammingError as e :
380361 logging .error (e )
381362 logging .error (
382- str (airflow_db_model ) + " is not present in the metadata."
383- "Skipping..." )
363+ str (airflow_db_model ) + " is not present in the metadata. "
364+ "Skipping..." )
365+ finally :
366+ session .close ()
384367
385368
386369for db_object in DATABASE_OBJECTS :
387-
388370 cleanup_op = PythonOperator (
389371 task_id = "cleanup_" + str (db_object ["airflow_db_model" ].__name__ ),
390372 python_callable = cleanup_function ,
0 commit comments