Skip to content

Commit

Permalink
Split providers out of the main "airflow/" tree into a UV workspace p…
Browse files Browse the repository at this point in the history
…roject (#42505) (#42624)

This is only a partial split so far. It moves all the code and tests, but
leaves the creation of `core/` to a separate PR as this is already large
enough.

In addition to the straight file rename the other changes I had to make here
are:

- Some mypy/typing fixes.

  Mypy can be fragile about what it picks up when, so maybe some of those
  changes were caused by that. But the typing changes aren't large.

- Improve typing in common.sql type stub

  Again, likely a mypy file oddity, but the types should be safe

- Removed the `check-providers-init-file-missing` check

  This isn't needed now that airflow/providers shouldn't exist at all in the
  main tree.

- Create a "dev.tests_common" package that contains helper files and common
  pytest fixtures

  Since the provider tests are no longer under tests/ they don't automatically
  share the fixtures from the parent `tests/conftest.py` so they needed
  extracted.

  Ditto for `tests.test_utils` -- they can't be easily imported in provider
  tests anymore, so they are moved to a more explicit shared location.

In future we should switch how the CI image is built to make better use of UV
caching than our own approach as that would remvoe a lot of custom code.

Co-authored-by: Ash Berlin-Taylor <ash@apache.org>
Co-authored-by: Ryan Hatter <25823361+RNHTTR@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 11, 2024
1 parent 39f8e1d commit a052d9e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ def _change_state(
self.running.remove(key)
except KeyError:
self.log.debug("TI key not in running, not adding to event_buffer: %s", key)
return

# If we don't have a TI state, look it up from the db. event_buffer expects the TI state
if state is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,6 @@ def process_status(
(pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version)
)
elif status == "Succeeded":
# We get multiple events once the pod hits a terminal state, and we only want to
# send it along to the scheduler once.
# If our event type is DELETED, or the pod has a deletion timestamp, we've already
# seen the initial Succeeded event and sent it along to the scheduler.
if event["type"] == "DELETED" or pod.metadata.deletion_timestamp:
self.log.info(
"Skipping event for Succeeded pod %s - event for this pod already sent to executor",
pod_name,
)
return
self.log.info("Event: %s Succeeded, annotations: %s", pod_name, annotations_string)
self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version))
elif status == "Running":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,21 @@ def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_
finally:
executor.end()

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_change_state_key_not_in_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = self.kubernetes_executor
executor.start()
try:
key = ("dag_id", "task_id", "run_id", "try_number1")
executor.running = set()
executor._change_state(key, State.SUCCESS, "pod_name", "default")
assert executor.event_buffer.get(key) is None
assert executor.running == set()
finally:
executor.end()

@pytest.mark.db_test
@pytest.mark.parametrize(
"multi_namespace_mode_namespace_list, watchers_keys",
Expand Down Expand Up @@ -1858,14 +1873,6 @@ def test_process_status_succeeded(self):
# We don't know the TI state, so we send in None
self.assert_watcher_queue_called_once_with_state(None)

def test_process_status_succeeded_dedup_timestamp(self):
self.pod.status.phase = "Succeeded"
self.pod.metadata.deletion_timestamp = timezone.utcnow()
self.events.append({"type": "MODIFIED", "object": self.pod})

self._run()
self.watcher.watcher_queue.put.assert_not_called()

@pytest.mark.parametrize(
"ti_state",
[
Expand Down

0 comments on commit a052d9e

Please sign in to comment.