feat: optional term_filter for run-inference and latest-inference-cohort endpoint#201
feat: optional term_filter for run-inference and latest-inference-cohort endpoint#201
Conversation
- Add GET /institutions/{inst_id}/latest-inference-cohort endpoint returning
LatestInferenceCohortResponse (status, cohort_label, valid_student_count,
batch_name, reason). Always 200; status=invalid with reason when no batch,
missing config, missing cohort_term, or no cohort has course data.
- Define "most recent cohort" as most recent cohort term (e.g. Fall 2024,
Spring 2025). Require cohort_term column; support cohort+cohort_term or
entry_year+cohort_term. Use config preprocessing.selection.cohort_term to
filter allowed terms. Loop back to next cohort term when current has no
course data for students.
- Add DatabricksControl.read_volume_training_config to read config.toml from
institution bronze volume and return [preprocessing.selection] for cohort
term filtering and student_criteria.
- Add get_ordered_cohort_terms, filter_to_most_recent_cohort,
_resolve_latest_inference_cohort_from_dataframes, and helpers; integrate
apply_student_criteria_count with course-data restriction.
- Add unit tests for get_ordered_cohort_terms and filter_to_most_recent_cohort;
API tests for latest-inference-cohort (unauthorized, missing cohort/term/config,
valid, loop-back, missing student id, valid_count=0, missing criteria column);
apply_student_criteria_count tests; tighten response shape assertions.
- Remove unused imports (joinedload, LatestInferenceCohortResponse); apply black.
- Add optional cohort: list[str] to InferenceRunRequest and DatabricksInferenceRunRequest - Validate cohort when provided: reject empty list and empty/whitespace-only labels - Serialize cohort as JSON in job_parameters when set; omit key when None (backward compatible) - Log user, cohorts, and job_run_id once in API after successful run - Fix batch error message to use len(batch_result) instead of len(inst_result) - Add tests: backward compat (no cohort), single/multiple cohorts, empty list, empty string, whitespace-only
- Centralize ENV_TO_VOLUME_SCHEMA in config; use in front_end_tables and databricks - Add read_volume_training_config and _parse_config_toml_to_selection for volume TOML - Refactor _resolve_latest_inference_cohort_from_dataframes into helpers (≤50 lines) - Improve run_pdp_inference logging (%-style, exception context); guard None response.contents - Rename res to inference_run_response in trigger_inference_run - Add tests: get_latest_batch when no batch has both STUDENT and COURSE; read_volume_training_config error paths Co-authored-by: Cursor <cursoragent@cursor.com>
…ection - Add optional batch_name query param; resolve batch by name or latest with student+course - Resolve model from single registered (valid, non-deleted) when model_name omitted - Validate model exists for institution when model_name is provided - Read training config from Databricks volumes for cohort selection - Refactor endpoint into focused helpers; add mypy type-narrowing assertions - Add tests for batch_name, model/batch validation, and registered-model resolution Co-authored-by: Cursor <cursoragent@cursor.com>
- Add term_filter to InferenceRunRequest and DatabricksInferenceRunRequest - Pass term_filter in job_params to Databricks job - Update models router validation and tests - Extend databricks tests and remove unused import Co-authored-by: Cursor <cursoragent@cursor.com>
- Normalize selection config at inference entry (checkpoint_term -> cohort_term) - Rename inference path to checkpoint term (constants, helpers, docstrings) - Change LatestInferenceCohortResponse.cohort_label to cohort_labels (list) - Extract _try_checkpoint_term_candidate, _ordered_checkpoint_terms_from_* for single-purpose helpers and 50-line limit; add _UNKNOWN_TERM_SORT_RANK - Add tests: normalizer edge cases, cohort_term string/empty list, response contract; databricks checkpoint_term parse; type hint on test Co-authored-by: Cursor <cursoragent@cursor.com>
| ) -> Dict[str, Any]: | ||
| """Normalize selection_config so student_criteria uses column name cohort_term for inference. | ||
|
|
||
| If checkpoint_term is present in student_criteria, set cohort_term to that value and |
There was a problem hiding this comment.
Sorry what is checkpoint_term here? Is it meant to be the term config field we have added listing the terms for inference?
If so, a couple thoughts, it shouldn't be a term the same as cohort_term field is simply FALL or SPRING, but instead it also includes the year so it will be like FALL 2023-24, eg.
| return out | ||
|
|
||
|
|
||
| def _latest_inference_cohort_validation_response( |
There was a problem hiding this comment.
We should talk about how to square this with that were using terms now, do we want both?
|
|
||
|
|
||
| def _first_valid_from_ordered_terms( | ||
| ordered_checkpoint_terms: List[Dict[str, Any]], |
There was a problem hiding this comment.
ah i see you are doing both, i will need to change the config to terms i believe
| remove checkpoint_term so downstream (get_ordered_*, apply_student_criteria_count) | ||
| only see cohort_term. If both keys exist, prefer checkpoint_term. | ||
| """ | ||
| criteria = selection_config.get("student_criteria") |
There was a problem hiding this comment.
lets make sure this is not the student criteria field
kaylawilding
left a comment
There was a problem hiding this comment.
adding comments but this is on hold
changes
To review only the latest commit: see Checkpoint term support and cohort_labels below.
Run-inference: optional term_filter list (API → Databricks)
API (
routers/models.py)InferenceRunRequest: added optionalterm_filter: list[str] | None = None.term_filteris present: require at least one non-empty label; reject empty list (400) and empty or whitespace-only labels (400), with logging before each raise.DatabricksInferenceRunRequestwithterm_filter=req.term_filterand pass it torun_pdp_inference.term_filterfield.len(batch_result)instead oflen(inst_result).restoinference_run_responseintrigger_inference_runfor clarity.Databricks (
databricks.py)DatabricksInferenceRunRequest: added optionalterm_filter: list[str] | None = None.run_pdp_inference(): buildjob_paramsdict; whenreq.term_filter is not None, setjob_params["term_filter"] = json.dumps(req.term_filter). Omit the key whenterm_filterisNone.Tests (
routers/models_test.py)term_filterin request → 200 and mock called withterm_filter=None.term_filter→ 200 and mock receives the correct list.term_filter→ 400 with the expected detail messages.Latest-inference-cohort endpoint and config from Databricks
API (
routers/data.py)/{inst_id}/latest-inference-cohortreturningLatestInferenceCohortResponse(cohort_labels, valid_student_count, status, batch_name, reason). Always returns 200; usesstatus='invalid'withreasonwhen no batch, missing config, or no cohort has course data.model_name(optional): if omitted, the institution must have exactly one registered model (valid, non-deleted); otherwise invalid. When provided, the model must exist for the institution or the endpoint returns invalid.batch_name(optional): when provided (URL-decoded), cohort selection uses that batch; when omitted, uses the most recent batch that has both STUDENT and COURSE files.DatabricksControl.read_volume_training_config(inst.name, model_run_id).model_run_idis the latest model version's run ID for the chosen model._resolve_model_name_and_run_id_for_inference(resolve model name, validate existence, fetch run ID),_load_batch_and_dataframes_for_inference(resolve batch, load config, read batch files, validate STUDENT/COURSE),get_batch_for_inference,get_batch_by_name_with_student_and_course,_batch_has_student_and_course, plus existing cohort logic split into_latest_inference_cohort_validation_response,_first_valid_from_ordered_terms, and_resolve_latest_inference_cohort_from_dataframes(≤50 lines per function). Mypy type-narrowing assertions added where optional values are guaranteed after error checks.Config (
config.py)ENV_TO_VOLUME_SCHEMA = {"DEV": "dev_sst_02", "STAGING": "staging_sst_01"}for Databricks volume paths. Used bydatabricks.read_volume_training_configandfront_end_tables.get_model_cards(replacing a local dict there).Databricks (
databricks.py)read_volume_training_config(inst_name, model_run_id): reads from/Volumes/{env_schema}/{slug}_silver/silver_volume/{model_run_id}/, finds any.tomlunder that path that contains[preprocessing.selection], and returns that section only (orNoneif missing/unreadable). UsesENV_TO_VOLUME_SCHEMA; returnsNonefor unsupported ENV (e.g. LOCAL). New helpers:_parse_config_toml_to_selection(raw: bytes)for TOML parsing and section extraction, and_find_selection_in_toml_under(w, directory_path, inst_name)to list directory and scan .toml files. Guard forresponse.contents is Nonebefore calling.read()(mypy-safe).Front-end (
routers/front_end_tables.py)get_model_cardsnow usesENV_TO_VOLUME_SCHEMAfrom config instead of a local schema dict.Tests
routers/data_test.py: Tests forget_latest_inference_cohort(unauthorized; no model_name and no models / models exist but none registered; missing cohort column/term/config; valid; valid withbatch_name; model_name not found; batch_name not found; loop-back when no course data for most recent; no cohort has course data; missing student id column; zero students meeting criteria; student_criteria references missing column); forget_latest_batch_with_student_and_course(returns batch when it has STUDENT and COURSE, returns None when no batch or no batch has both); and forapply_student_criteria_count,filter_to_most_recent_checkpoint_term,get_ordered_checkpoint_terms.databricks_test.py: Tests for_parse_config_toml_to_selection(valid TOML with section, invalid/missing section); forread_volume_training_config(empty inst_name, empty model_run_id, ENV not DEV/STAGING, databricksify raises, WorkspaceClient raises, list_raises, download raises, TOML missing[preprocessing.selection], success when .toml found under run dir). Added return type-> list[DirectoryEntry]to_one_toml_entryfor mypy.Checkpoint term support and cohort_labels (latest commit)
API (
routers/data.py)_normalize_selection_config_for_inference()so pipeline config can usecheckpoint_terminstudent_criteria; at inference entry it is normalized tocohort_termonce. Downstream code and DB column staycohort_term._allowed_checkpoint_terms_from_confignow reads onlycohort_term(config is always normalized first).get_ordered_checkpoint_terms,filter_to_most_recent_checkpoint_term,_try_checkpoint_term_candidate,_first_valid_from_ordered_terms,CHECKPOINT_TERM_REQUIRED_MSG,_TERM_ORDER,_UNKNOWN_TERM_SORT_RANK. Column and schema remaincohort_term.LatestInferenceCohortResponse.cohort_label→cohort_labels: list[str]; all construction sites updated. User-facing message for missing column says "cohort term" (matches column); other messages keep "checkpoint term."filter_to_most_recent_checkpoint_term; explicit fields on early "Institution not found" response. Extracted_try_checkpoint_term_candidate,_ordered_checkpoint_terms_from_cohort_column, and_ordered_checkpoint_terms_from_entry_yearfor single-purpose helpers and 50-line limit.Databricks (
databricks.py)_parse_config_toml_to_selection: acceptscheckpoint_termin config (normalized tocohort_termby API); behavior unchanged forcohort_term.Tests
routers/data_test.py: Normalizer (valid, non-dictstudent_criteria);checkpoint_termin config;cohort_termas string in config;cohort_termempty list (valid, count 0); response contract (cohort_labelslist).databricks_test.py:test_parse_config_toml_to_selection_accepts_checkpoint_term; return type-> Noneon the other parse test.context
Run-inference term_filter: Callers need to be able to choose which cohorts (e.g. "fall 2024-25") to run inference on. This PR adds an optional
term_filterlist to the run-inference request and passes it through to the Databricks job so the pipeline can filter by those terms/cohorts. Whenterm_filteris omitted, noterm_filterkey is sent to Databricks and behavior is unchanged (pipeline uses its config default).Latest-inference-cohort: The UI needs a single "latest inference-ready cohort" (label, valid student count, status) for an institution. This endpoint supports optional
model_nameandbatch_nameso the UI can show a specific model/batch or rely on defaults (single registered model, latest batch with student+course files). It reads the chosen model's training/preprocessing selection config from Databricks (silver volume, per model run) and returns the first cohort term that has course data and meets the config criteria. Refactors and tests align with project standards (e.g. function length, naming, error-path coverage).Checkpoint term and cohort_labels: Pipelines may specify the inference cohort via
checkpoint_termin config; the API normalizes this tocohort_termat entry so the rest of the stack stays column-aligned. The response now exposescohort_labels(list) for consistency and future multi-cohort use. Naming ("checkpoint term") is aligned with product language while keeping the DB column namecohort_term.questions