feat(core): add per-mapper max_fan_out override for partition fan-out cap#67184
Draft
Lee-W wants to merge 28 commits into
Draft
feat(core): add per-mapper max_fan_out override for partition fan-out cap#67184Lee-W wants to merge 28 commits into
Lee-W wants to merge 28 commits into
Conversation
this is still not ideal, but at least it's not super wrong now
…t ordering
StartOfWeekMapper and StartOfQuarterMapper now derive their decode_downstream
regex from output_format itself, so users can re-order strftime directives
and {name} placeholders (e.g. "Q{quarter}/%Y") without having to override
decode_downstream. Malformed output_format — empty {}, non-identifier
placeholder names, duplicate %X directives, duplicate {name} placeholders —
raises ValueError at mapper construction instead of an opaque re.error from
deep inside a scheduler tick or UI route.
…ag_runs list Drop the SQL "count distinct assets with any log" subquery and always compute total_received via the Python rollup-aware helper. The list endpoint previously returned different numbers for the same APDR depending on whether the caller filtered by dag_id (rollup-aware, counts upstream window keys) or queried globally (SQL approximation, counts assets with any log) — same field, different semantics, very confusing for any UI consumer. The N+1 cost of per-Dag timetable loads was already paid in the global branch for total_required, so adding a single batched log fetch keeps the existing query budget while making the contract identical across both views. _compute_received_count now skips asset_ids that are no longer required (active=False) so the relaxed log query doesn't over-count.
StartOfWeekMapper now always uses ISO weeks (Monday) and StartOfMonthMapper always emits the 1st of the month. Custom fiscal boundaries can still be expressed by pairing a user-defined source mapper with the existing windows.
The next_run_assets and partitioned_dag_runs endpoints used to load and deserialize the full timetable on every request just to read mapper attributes (is_rollup) and required-key counts. Cache mapper metadata per asset on DagModel during Dag sync via a new ``partition_mapper_info`` JSON column, so the UI resolves mapper attributes from the cache and only loads the timetable when ``to_upstream`` evaluation for rollup mappers is actually needed.
``partition_mapper_info`` now iterates every asset in ``asset_condition`` and uses ``get_partition_mapper``, so a Dag configured with ``default_partition_mapper=RollupMapper(...)`` (the primary documented pattern) is correctly reported as rollup. Previously the list was built from ``partition_mapper_config`` only, leaving ``has_rollup_mappers`` False and silently disabling rollup UI behaviour. Also: extract the shared ``load_partitioned_timetable`` helper and log on deserialization failure; coerce NULL ``source_partition_key`` to ``""`` in the scheduler to match the UI normalisation.
Old serialized rows or hand-crafted partial dicts caused a KeyError on DagModel.is_rollup_asset and has_rollup_mappers. Switch to .get() with a False default so the read side is resilient to schema evolution.
Add docstrings explaining accepted strftime directives, round-trip requirements, and why regex compilation happens eagerly at construction time rather than lazily inside the scheduler loop.
Covers the previously untested MonthWindow case in test_window_serialize_round_trip. Uses input_format="%Y-%m-%d" instead of "%Y-%m" to prevent 29 day keys from collapsing to the same value and masking decode failures.
DayWindow always generates 24 naive hourly steps. When paired with a local-timezone source mapper, spring-forward gaps make one expected upstream key unattainable so the rollup can never complete; fall-back causes the extra hour to be excluded from the expected set. Add a warning block to DayWindow's docstring, two tests (one pinning the naive-24 invariant, one xfail documenting the spring-forward under-yield), and a Known Limitations section to the AIP-76 newsfragment.
Clarify that inactive assets are filtered from the UI progress query but their PartitionedAssetKeyLog rows are preserved, so re-activating an asset automatically resumes rollup accumulation without data loss.
… time Add PartitionMapper.__init_subclass__ that raises TypeError when a subclass overrides exactly one side of the decode/encode pair. An unpaired override silently breaks RollupMapper.to_upstream by producing non-str members, causing the scheduler's upstream-window check to never satisfy and leaving the Dag run held forever with no diagnostic. MRO-based comparison (cls.method is not PartitionMapper.method) is used rather than __dict__ lookup so intermediate base classes such as _BaseTemporalMapper are handled correctly.
A bad partition mapper previously wrote a new Log row on every scheduler tick (once per second), flooding the audit log. Add a process-level _partition_audit_seen set to SchedulerJobRunner that deduplicates by (dag_id, asset_name, asset_uri): after the first entry the scheduler still logs the exception at ERROR level each tick (useful for ops) but stops inserting into the Log table. The set resets on restart, so one fresh entry is written after a config fix and re-deploy. Also add three scheduler-side evidence tests: - audit log deduplication across two consecutive ticks - rollup survives a simulated scheduler restart with partial key arrival - duplicate PAKL rows do not prevent rollup completion (set semantics)
Replace the hardcoded MAX_PARTITION_DAG_RUNS_PER_TICK=500 with a new [scheduler] max_partition_dag_runs_to_create_per_loop config option (default 500). The value is read once in SchedulerJobRunner.__init__ alongside the other self._* conf reads, per the invariant that all conf access stays out of the scheduler loop.
Composes upstream_mapper + window + (optional) downstream_mapper, symmetric to RollupMapper. New [scheduler] partition_fanout_max_keys caps the downstream keys per upstream event.
… cap Add a `max_fan_out: int | None = None` parameter to both the SDK and core `PartitionMapper` base classes, threaded through every subclass constructor that defines its own `__init__`. The validator rejects 0, negatives, floats, strings, and booleans (bool is an int subclass — excluded explicitly). Serialization is updated in two parallel paths: - Per-class `serialize()` / `deserialize()` methods (custom-mapper path). - `encoders.py` singledispatch overrides (built-in-mapper path). Both paths write `max_fan_out` only when non-None, preserving byte-identical output for default-constructed mappers (zero-bloat contract). In `manager.py`, the single-shot `max_downstream_keys` read is deleted; the mapper object is retained after the `to_downstream` call so its `max_fan_out` attribute is readable at the boundary check. The effective cap is computed per target-Dag as either the mapper's own cap or the global `[scheduler] partition_fanout_max_keys`. The `Log.extra` string uses `max_fan_out=N` when the per-mapper cap trips, and keeps the existing `[scheduler] partition_fanout_max_keys=N` wording when the global trips. Co-Authored-By: wei.lee@astronomer.io
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why
a per-mapper cap so each
PartitionMapperinstance can override the global[scheduler] partition_fanout_max_keys(implemented in #66030)Lives on the mapper rather than the Dag because one Dag can bind several mappers
with very different fan-out profiles.
closes: #65760
What
max_fan_out: int | None = NonetoPartitionMapperbase (SDK + core). Validator rejects0, negatives, non-int, andbool.None→ global; positive int → override.__init__, and through both serialization paths (per-class.serialize()and theencoders.pysingledispatch overrides). Field emitted only when non-None — pre-change payloads round-trip byte-identical.assets/manager.py: read the cap per target Dag, swap acap_sourcefragment into the audit-logextra(max_fan_out=Nvs. existing[scheduler] partition_fanout_max_keys=N).Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.