Skip to content

Commit efbadbe

Browse files
committed
Add support for S3 dag bundle
1 parent 0c41fa4 commit efbadbe

File tree

2 files changed

+18
-7
lines changed

2 files changed

+18
-7
lines changed

airflow/dag_processing/bundles/s3.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,25 +116,26 @@ def _delete_stale_local_files(self, current_s3_objects: List[Path]):
116116

117117
def _download_s3_object_if_changed(self, s3_bucket, s3_object, local_target_path: Path):
118118
should_download = False
119+
download_msg = ""
119120
if not local_target_path.exists():
120121
should_download = True
121-
self.log.debug(f"Local file {local_target_path} does not exist. Downloading.")
122+
download_msg = f"Local file {local_target_path} does not exist."
122123
else:
123124
local_stats = local_target_path.stat()
124125

125126
if s3_object.size != local_stats.st_size:
126127
should_download = True
127-
self.log.debug(f"S3 object size ({s3_object.size}) and local file size ({local_stats.st_size}) differ. Downloading.")
128+
download_msg = f"S3 object size ({s3_object.size}) and local file size ({local_stats.st_size}) differ."
128129

129130
s3_last_modified = s3_object.last_modified
130131
local_last_modified = datetime.fromtimestamp(local_stats.st_mtime, tz=timezone.utc)
131132
if s3_last_modified.replace(microsecond=0) != local_last_modified.replace(microsecond=0):
132133
should_download = True
133-
self.log.debug(f"S3 object last modified ({s3_last_modified}) and local file last modified ({local_last_modified}) differ. Downloading.")
134+
download_msg = f"S3 object last modified ({s3_last_modified}) and local file last modified ({local_last_modified}) differ."
134135

135136
if should_download:
136137
s3_bucket.download_file(s3_object.key, local_target_path)
137-
self.log.debug(f"Downloaded {s3_object.key} to {local_target_path.as_posix()}")
138+
self.log.debug(f"{download_msg} Downloaded {s3_object.key} to {local_target_path.as_posix()}")
138139
else:
139140
self.log.debug(
140141
f"Local file {local_target_path.as_posix()} is up-to-date with S3 object {s3_object.key}. Skipping download."

tests/dag_processing/bundles/test_s3.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,19 +205,29 @@ def test_refresh(self, s3_bucket, s3_client, caplog):
205205
level="DEBUG",
206206
regex=rf"Downloaded.*{S3_BUCKET_PREFIX}.*dag_03.py.*/s3/{bundle.name}/dag_03.py",
207207
)
208+
assert bundle.bare_repo_path.joinpath("dag_03.py").read_text() == "test data"
208209
bundle.bare_repo_path.joinpath("dag_should_be_deleted.py").write_text("test dag")
209210
bundle.bare_repo_path.joinpath("dag_should_be_deleted_folder").mkdir(exist_ok=True)
211+
s3_client.put_object(
212+
Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body="test data-changed".encode("utf-8")
213+
)
210214
bundle.refresh()
211215
assert caplog.text.count("Downloading DAGs from s3") == 4
212216
self.assert_log_matches_regex(
213217
caplog=caplog,
214218
level="DEBUG",
215-
regex=rf"Deleted stale empty directory.*dag_should_be_deleted_folder",
219+
regex=rf"S3 object size.*and local file size.*differ.*Downloaded.*dag_03.py.*",
220+
)
221+
assert bundle.bare_repo_path.joinpath("dag_03.py").read_text() == "test data-changed"
222+
self.assert_log_matches_regex(
223+
caplog=caplog,
224+
level="DEBUG",
225+
regex=rf"Deleted stale empty directory.*dag_should_be_deleted_folder.*",
216226
)
217227
self.assert_log_matches_regex(
218228
caplog=caplog,
219229
level="DEBUG",
220-
regex=rf"Deleted stale local file.*dag_should_be_deleted.py",
230+
regex=rf"Deleted stale local file.*dag_should_be_deleted.py.*",
221231
)
222232

223233
def assert_log_matches_regex(self, caplog, level, regex):
@@ -229,4 +239,4 @@ def assert_log_matches_regex(self, caplog, level, regex):
229239
break # Stop searching once a match is found
230240
assert (
231241
matched
232-
), f"No log message at level {level} matching regex '{regex}' found. Logged messages:\n{caplog.text}"
242+
), f"No log message at level {level} matching regex '{regex}' found."

0 commit comments

Comments
 (0)