Introduce EdgeDBManager: Independent Provider Specific Database Schema Management#61155
Conversation
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
Outdated
Show resolved
Hide resolved
jscheffl
left a comment
There was a problem hiding this comment.
Cool! This looks good and reasonable. I would prefer the solution w/o needed changes in core. Only thing we need to consider is that if DB manager is not registered the modules might fail - in case of such we should have a good error prompting telling the user that DB manager needs to be added to configuration to create structures. As well as add this to documentation.
Root cause of the discussion was that DB locks failed to be ackquired as API server (each instance!) attempts to lock but needs to day a full DB lock. How is this with Alembic? Does this check before locking if the DB is already in target state? Or woul it be at the end the same problem with Alembic that an exclusive lock (incl all problems) is needed?
Implement EdgeDBManager to integrate Edge3 provider with Airflow's external database manager system, enabling independent schema version control for Edge3 tables separate from core Airflow migrations. This enables Edge3 provider to manage its database schema evolution independently from core Airflow, allowing for provider-specific version control and migration management. The infrastructure is ready for use once initial migration files are generated and the legacy _check_db_schema() approach in EdgeExecutor.start() is removed.
9b4121c to
b73bf2a
Compare
Wire up the full initdb/create_db_from_orm lifecycle in EdgeDBManager
so table creation is handled through proper Alembic migrations instead
of ad-hoc metadata.create_all() in EdgeExecutor.start().
- Create initial Alembic migration (0001_3_0_0) for edge_worker,
edge_job, and edge_logs tables with if_not_exists=True
- Replace _check_db_schema() + metadata.create_all() in
EdgeExecutor.start() with EdgeDBManager.initdb()
- Remove legacy _check_db_schema() (Airflow 2.x workaround)
- Add edge3 support to check_revision_heads_map.py pre-commit script
- Add check-revision-heads-map-edge3 pre-commit hook
- Rename migrations README to README.md
- Add tests for create_db_from_orm, initdb, and revision_heads_map
The test_project_structure check requires every provider source module to have a corresponding test file. Add test_env.py for the migrations env.py module with tests for version table name and metadata contents.
b73bf2a to
a7a494c
Compare
Add a fast-path check_migration() call before acquiring the global advisory lock in EdgeExecutor.start(). This avoids unnecessary lock contention when multiple API server instances start simultaneously and the database is already at the target migration state.
Since edge3 tables are managed via separate _edge_metadata (removed from Base.metadata), the test framework's initdb no longer creates them. Add a session-scoped autouse fixture in conftest.py that creates the tables once for all edge3 tests.
If EdgeDBManager is not in external_db_managers config and someone runs airflow db init, the edge tables won't be created. However EdgeExecutor.start() will try to create them when executor starts anyway. Should we add a warning instead of an error? The only things you miss without the config registration are:
I think a warning there will inform the user of the recommended setup. For the db lock, I have added fast-path check before acquiring the lock. If proper db init/migrate with EdgeDBManager has occurred, multiple server instances can start without contention. |
- Guard _create_edge_tables conftest fixture against settings.engine
being None when non-DB tests (CLI) run in the same session
- Patch _check_valid_db_connection in test_list_edge_workers since
the test env sql_alchemy_conn equals the default value
dc3ec3f to
0f69647
Compare
jscheffl
left a comment
There was a problem hiding this comment.
Approving to unblock.
Some small sanity questions, please check prior merge.
One nit on pytest, please add another small assert.
One question: How does alembic behave if the three tables exist before and the version table is missing? Will just the version table be added and the rest be kept?
(Just did one test, started this PR from a existing setup of main, that was good. Also starting with empty DB was working)
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
Outdated
Show resolved
Hide resolved
...iders/edge3/src/airflow/providers/edge3/migrations/versions/0001_3_0_0_create_edge_tables.py
Outdated
Show resolved
Hide resolved
You're safe. op.create_table() calls use if_not_exists=True , So the behavior when tables exist but the version table is missing:
|
| EdgeWorkerModel.__table__.to_metadata(_edge_metadata) | ||
| EdgeJobModel.__table__.to_metadata(_edge_metadata) | ||
| EdgeLogsModel.__table__.to_metadata(_edge_metadata) | ||
|
|
||
| # Remove edge tables from Airflow's core metadata to prevent validation conflicts | ||
| # The tables are now managed exclusively through _edge_metadata | ||
| Base.metadata.remove(EdgeWorkerModel.__table__) | ||
| Base.metadata.remove(EdgeJobModel.__table__) | ||
| Base.metadata.remove(EdgeLogsModel.__table__) |
There was a problem hiding this comment.
Sorry, I'm late to the review but we could have avoided this by not using airflow's Base. I wrote a sample app to demonstrate how to wire up the DB manager here: https://github.com/ephraimbuddy/ticketing and here's the Base implementation. https://github.com/ephraimbuddy/ticketing/blob/59cf87c4f400251291df7be59fb235401c75bbc8/ticketing/models.py#L26
There was a problem hiding this comment.
Never too late... it is software and can always be changed later... @dheerajturaga that ^^^totally makes sense, can you make a follow-up?
There was a problem hiding this comment.
Thanks for the guidance @ephraimbuddy ! let me work on a follow-up.
…a Management (apache#61155) * Add EdgeDBManager for provider-specific database migrations Implement EdgeDBManager to integrate Edge3 provider with Airflow's external database manager system, enabling independent schema version control for Edge3 tables separate from core Airflow migrations. This enables Edge3 provider to manage its database schema evolution independently from core Airflow, allowing for provider-specific version control and migration management. The infrastructure is ready for use once initial migration files are generated and the legacy _check_db_schema() approach in EdgeExecutor.start() is removed. * Add table creation support to EdgeDBManager via Alembic migration Wire up the full initdb/create_db_from_orm lifecycle in EdgeDBManager so table creation is handled through proper Alembic migrations instead of ad-hoc metadata.create_all() in EdgeExecutor.start(). - Create initial Alembic migration (0001_3_0_0) for edge_worker, edge_job, and edge_logs tables with if_not_exists=True - Replace _check_db_schema() + metadata.create_all() in EdgeExecutor.start() with EdgeDBManager.initdb() - Remove legacy _check_db_schema() (Airflow 2.x workaround) - Add edge3 support to check_revision_heads_map.py pre-commit script - Add check-revision-heads-map-edge3 pre-commit hook - Rename migrations README to README.md - Add tests for create_db_from_orm, initdb, and revision_heads_map * Add missing test file for edge3 migrations env module The test_project_structure check requires every provider source module to have a corresponding test file. Add test_env.py for the migrations env.py module with tests for version table name and metadata contents. * Skip DB lock acquisition when edge3 migrations are already current Add a fast-path check_migration() call before acquiring the global advisory lock in EdgeExecutor.start(). This avoids unnecessary lock contention when multiple API server instances start simultaneously and the database is already at the target migration state. * Add session-scoped fixture to create edge3 tables for tests Since edge3 tables are managed via separate _edge_metadata (removed from Base.metadata), the test framework's initdb no longer creates them. Add a session-scoped autouse fixture in conftest.py that creates the tables once for all edge3 tests. * Fix edge3 test failures - Guard _create_edge_tables conftest fixture against settings.engine being None when non-DB tests (CLI) run in the same session - Patch _check_valid_db_connection in test_list_edge_workers since the test env sql_alchemy_conn equals the default value * Jens suggestions
Implement EdgeDBManager to integrate Edge3 provider with Airflow's
external database manager system, enabling independent schema version
control for Edge3 tables separate from core Airflow migrations.
This enables Edge3 provider to manage its database schema evolution
independently from core Airflow, allowing for provider-specific version
control and migration management. The infrastructure is ready for use
once initial migration files are generated and the legacy _check_db_schema()
approach in EdgeExecutor.start() is removed.
With this we should now be able to user airflow db framework to init, reset, migrate
Alternate approach to #60752 keeping changes local to edge3 provider
Reference: https://github.com/ephraimbuddy/ticketing
Impact: This critical architectural evolution is necessary for the success of AIP-67
and future scalability.
Was generative AI tooling used to co-author this PR?
Claude Code