Description
Use Case
Right now, it seems like prefect.context
is not available inside of a cache_validator
function. (At least import perfect; logger = prefect.context.get("logger")
seems to return None for logger.)
I have two use cases in mind, one mundane and one that I think could be very powerful:
- It's useful to be able to log messages inside of a cache_validator function for debugging purposes
- It would allow for very dynamic cache validation driven off of Flow parameters, the Task being executed, etc.
Solution
For use case 2 above, imagine a data science workflow where the first portion of the flow is a series of feature engineering tasks. It would be very useful and powerful to be able to run the flow in one of three modes:
Mode 1 - generate feature matrices: perform feature engineering and write out a file for each large feature matrix, e.g. with daskdataframe.to_parquet() to a cloud bucket, using a specific timestamp or label in the bucket/key name along with the name of the task, e.g. generate_vitals_features_201909141352.parquet
Mode 2 - use latest available set of feature matrices: treat those tasks as cached, thus skipping task execution, and instead read from the most recent existing parquet files
Mode 3 - use the set of feature matrices from a specific timestamp/label: again treat those tasks as cached, thus skipping task execution, but read from a specific set of parquet files (not the most recent one)
The selection of mode would be driven by a Parameter (or Parameters) to the Flow. Parameters are already available to cache_validator functions, but a cache_validator might also want to examine the name of the current task in order to query a cloud bucket to see if a parquet file that includes the task name already exists for that task, e.g. someone might have requested to run the Flow in Mode 2, but never ran in Mode 1 so no file is available, etc.
The advantage, of course, is that after the first Flow run (in Mode 1), subsequent runs could execute far more quickly. (There are some other potential optimizations like skipping intermediate reading of parquet files and only reading a final, merged large feature matrix, etc.)
Being able to supply our own cache_validator function is already very powerful, but with the addition of being able to examine context for task names we could write a common cache validator that could return True or False appropriately, etc.
This also would allow data scientists to focus on just the featuring engineering code in their tasks yet get this persistence "for free" just by specifying a common "feature caching" result_handler
and cache_validator
.
Alternatives
We could place the code to write and read the feature matrices inside the tasks themselves, but that seems like a far less elegant and reusable approach. The existing ResultHandler
and cache_validator
plumbing is already very close to allowing us to implement this idea and hopefully this would be a relatively small, but powerful change. (BTW, if this is better as a Prefect Improvement Notice feel free to treat it as one.)
Activity