-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add filtered index and promotion steps to data refresh #4833
Conversation
as part of the Data Refresh. | ||
|
||
TODO: We'll swap out the create and populate filtered index DAG to use this instead | ||
of the one defined in the legacy_data_refresh. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting that there is a separate issue for this, (and moreover the filtered index is going away).
es.create_index( | ||
index_config={ | ||
"index": temp_index_name, | ||
"body": index_settings(data_refresh_config.media_type), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change ensures that the data refresh always creates a new index based off the hard-coded index configuration in es_mapping.py
, rather than by copying the configuration of the existing index.
This is how the ingestion server does it currently, and was just a miss in the previous PR. It's necessary to ensure that temporary or experimental changes to the indices (for example, created using our create_new_es_index
DAGs or similar) are not carried forward in the data refresh without making the changes permanent via a code change/PR.
# only include Slack airflow connections | ||
done < <(env | grep "^AIRFLOW_CONN_SLACK*") | ||
# only include Slack airflow connections and the Sensitive Terms connection | ||
done < <(env | grep "^AIRFLOW_CONN_SLACK*\|AIRFLOW_CONN_SENSITIVE_TERMS") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is necessary to allow testing with the real sensitive terms list when desired, since it uses https.
@@ -116,6 +117,7 @@ def create_api(): | |||
_api.add_route("/healthcheck", HealthcheckResource()) | |||
_api.add_route("/task", IndexingJobResource(task_tracker)) | |||
_api.add_route("/task/{task_id}", TaskStatusResource(task_tracker)) | |||
_api.add_static_route("/static", (Path(".") / "static").absolute()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The indexer worker serves the mock sensitive terms list, similar to how the ingestion server currently does so. This is okay because in local development, the indexer worker is always running, and in production we use the real sensitive terms list rather than the mock one. It is helpful to have the mock list served this way to make the local filtered_index
steps behave as similarly as possible to the way it does in production, just using a different endpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we wanted to do it a different way (to avoid adding this functionality to the indexer worker), we could put the file into minio, and still retrieve it over HTTP. According to this guide about using minio to host a website it shouldn't be too difficult to do later on.
For the sake of simplicity, though, would it be better if the DAG just swapped the location it reads the sensitive terms from based on an additional environment variable? Like:
if USE_MOCK_SENSITIVE_TERMS:
return ["running", "water", "bird"]
else:
# retrieve the regular list
I know it's not as seamless as just swapping out the HTTP connection string in the environment file, but it avoids a bit of this hoop-jumping required to make the DAG code "seamless". I guess really we've just hidden the seam somewhere else by doing it this way 😛 (Also: because I wrote the original code for this in the ingestion server, I can say I think it was a bit cheeky/too clever to do it this way and wish I'd done this if/else there to begin with!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We couldn't use the HttpOperator if we wanted to do that, or else we'd have to add branching tasks to get around selecting the right task per environment. Not a big deal but I'd prefer to keep the operators clean. We could use minio, something similar to what @AetherUnbound did in #4836 maybe 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We couldn't use the HttpOperator if we wanted to do that, or else we'd have to add branching tasks to get around selecting the right task per environment
Oh 😕 We can't call the HttpOperator conditionally in a Python @task
operator? I figured something like this would be possible:
@task
def get_sensitive_terms(**context):
if USE_MOCK_SENSITIVE_TERMS:
return ["running", "water", "bird"]
else:
http_op = HttpOperator(...)
return http_op.execute(**context)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might be able to initialize the operator inside a @task
, but it's probably easier to use the HttpHook
inside a @task
than have an operator-in-operator situation by using HttpOperator
.
I think I'm fine going with the new static route approach suggested here, but @stacimc do you mind adding a comment about what the route is for, and that it's only intended to be used for local testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, maybe I'm thinking of the hook.
All good, I was just suggesting alternatives if wanted, this approach obviously has worked fine for us 🙂
It will go away in #3336 anyway, which I didn't know when I wrote my original comment or I would have mentioned it!
@sarayourfriend I added you as an additional reviewer because you expressed interest in this PR (and your feedback is always appreciated 😄) but don't feel obligated! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is such a great example of the fact that the productivity should not be measured by the line count :) A couple of lines changed needed so much debugging and explanation. Thank you for great detailed notes, as always. Great to have the local environment close to the production.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Far out! I haven't tested this locally, just read over the code and process. Very cool! It's so exciting to have this laid out in the graph view, it really helps show the complexity of the data refresh, while also giving a clearer narrative to the different steps. I'm really glad we went forward with this project to remove the ingestion server.
If you'd like me to test this out locally, I'm happy to do so as well, just let me know. Otherwise, I'll leave it to the catalogue reviewers to do that part of it. For my part, this is very cool and exciting.
# Connect to sensitive terms list used for filtered index creation in the data refresh. | ||
# For local development, the mock terms list is served by the catalog indexer worker. Change to the | ||
# full production URL in order to use the production sensitive terms list. | ||
AIRFLOW_CONN_SENSITIVE_TERMS="http://catalog_indexer_worker:8003/static/mock_sensitive_terms.txt" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stacimc will we need an infrastructure change to point this to the correct live location?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not required, since we can set this through the Admin UI (as we have with several other connections/variables). We could make an infrastructure change to set it as an environment variable though; actually, I'm not clear how we've drawn the line between which variables should be configured where 🤔 What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the Admin UI sounds good, I believe that's in line with decisions we've made in the past (https://github.com/WordPress/openverse-infrastructure/issues/215). I forgot that environment variables aren't necessary for configuring these. I realised that I didn't personally have a clear picture of environment variables vs Airflow variables/connections, and that some of our code muddies that distinction... and presents problems for advancements in our Airflow configuration, management, and execution. I've opened #4863 to address those inconsistencies.
I've started a discussion here to work towards an approach more in line with the rest of our applications (using infrastructure as code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for now! One disadvantage of declaring this in the Admin UI is that the password (if needed) for the services isn't codified in the same way in our infrastructure repo. Noting that for now, I'll add more thorough thoughts to that discussion when I get a chance to look at that discussion.
logger = logging.getLogger(__name__) | ||
|
||
|
||
class TableIndex(NamedTuple): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: is the usage of NamedTuple
instead of dataclasses due to a limitation in Airflow? I seem to recall limitations in task return types. Otherwise, do you have a preference for NamedTuples over dataclasses? And if so, I'd be curious to know the reason 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe anything json serializable can be passed through XCOMs, if I'm not mistaken a dataclass would work. No strong preference beyond following convention in the project, and given that we don't require anything more complex. I don't think the pros/cons of either option are particularly significant in this use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to what Staci said, according to the documentation:
They can have any (serializable) value
I'm assuming "serializable" in this case means JSON serialization. I was able to return a NamedTuple
in the add_rekognition_labels
DAG and Airflow hydrated it back into the NamedTuple
just fine. In practice (via the UI), it looks like it serializes it to a list:
Dataclasses, although they have more functionality, are a little heavier-weight. If we essentially just need an object to attach a bunch of data to and reference it more easily/programmatically/in a typed way than a dictionary, NamedTuple
is preferred. That said, I did just find this documentation on dataclasses!! 🤩:
As mentioned TaskFlow uses XCom to pass variables to each task. This requires that variables that are used as arguments need to be able to be serialized. Airflow out of the box supports all built-in types (like int or str) and it supports objects that are decorated with
@dataclass
or@attr.define
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I made an upstream PR to make that clearer here!
apache/airflow#42045
@@ -116,6 +117,7 @@ def create_api(): | |||
_api.add_route("/healthcheck", HealthcheckResource()) | |||
_api.add_route("/task", IndexingJobResource(task_tracker)) | |||
_api.add_route("/task/{task_id}", TaskStatusResource(task_tracker)) | |||
_api.add_static_route("/static", (Path(".") / "static").absolute()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we wanted to do it a different way (to avoid adding this functionality to the indexer worker), we could put the file into minio, and still retrieve it over HTTP. According to this guide about using minio to host a website it shouldn't be too difficult to do later on.
For the sake of simplicity, though, would it be better if the DAG just swapped the location it reads the sensitive terms from based on an additional environment variable? Like:
if USE_MOCK_SENSITIVE_TERMS:
return ["running", "water", "bird"]
else:
# retrieve the regular list
I know it's not as seamless as just swapping out the HTTP connection string in the environment file, but it avoids a bit of this hoop-jumping required to make the DAG code "seamless". I guess really we've just hidden the seam somewhere else by doing it this way 😛 (Also: because I wrote the original code for this in the ingestion server, I can say I think it was a bit cheeky/too clever to do it this way and wish I'd done this if/else there to begin with!)
I ran the steps from the testing instructions. The audio refresh ran first. The numbers in the ES (seen in ElasticVue) increased, but the
The error messages
There are also two warnings in the beginning of this task's logs:
|
…ting index The index configuration should be hard-coded here to match the behavior of the ingestion server. This is also important because we require permanent changes to the index configuration to go through the code review process. Other DAGs can be used to rapidly iterate on index configuration, and changes can be persisted here only if wanted.
This isn't possible because dynamically mapping tasks within a dynamically mapped taskgroup is not supported in our version of Airflow.
There was an error in the task dependencies caused by the fact that we were returning the output of transform_index_defs from the remap_indices taskgroup. Normally, the last task in a TaskGroup is the task that is set upstream of any downstream dependencies (in this case, the remap_constraints group). Apparently if you add a return statement to a TaskGroup (which we were doing in order to make the index_mapping the XCOM result for the entire taskgroup), this borks the dependency chain. As a result, transform_index_defs was being set as the upstream task to get_all_existing_constraints. This meant that if there was an error in create_table_indices, it would NOT stop the remap_constraints tasks from running. By updating the create_table_indices task to return what we want from XCOMs as well, we can avoid this issue.
Thanks @obulat! Good catch on the record count -- it was getting the counts for the old index rather than the promoted one, there's a quick fix which I'll push shortly! I cannot reproduce your issues with the image data refresh, though I'll keep trying some more things. Are you positive that the provider DAG had succeeded and finished loading additional records into the catalog DB before the refresh started? 🤔 The error/warning you mentioned seemed like it's related to the way I've overridden templated fields in the Sensor, but the sensor is working so that's strange. I'll play around with it to see if I can get the warnings cleared. |
e12db4a
to
bd57214
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wonderful! I'm sorry this last piece took more than expected but it's so nice to finally be there 🥳
I'm not quite done with my review yet, still about 10 more files to edit, but here's what I have so far.
One thing I wanted to note too - we send Slack notifications for nearly every step of the data refresh in its current implementation (that just all happens on the to-be-removed ingestion server). Maybe it might make sense to have an on_success_callback
for the data refresh which does something similar? That way we don't have to add it to every step, but still get some reporting. Just something to think about, if you like the idea or didn't already have a plan for Slack updates, we can make an issue for it!
logger = logging.getLogger(__name__) | ||
|
||
|
||
class TableIndex(NamedTuple): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to what Staci said, according to the documentation:
They can have any (serializable) value
I'm assuming "serializable" in this case means JSON serialization. I was able to return a NamedTuple
in the add_rekognition_labels
DAG and Airflow hydrated it back into the NamedTuple
just fine. In practice (via the UI), it looks like it serializes it to a list:
Dataclasses, although they have more functionality, are a little heavier-weight. If we essentially just need an object to attach a bunch of data to and reference it more easily/programmatically/in a typed way than a dictionary, NamedTuple
is preferred. That said, I did just find this documentation on dataclasses!! 🤩:
As mentioned TaskFlow uses XCom to pass variables to each task. This requires that variables that are used as arguments need to be able to be serialized. Airflow out of the box supports all built-in types (like int or str) and it supports objects that are decorated with
@dataclass
or@attr.define
.
def fetch_all_tuples(cursor): | ||
try: | ||
rows = cursor.fetchall() | ||
return [ConstraintInfo(row[0], row[1], row[2]) for row in rows] | ||
except Exception as e: | ||
raise ValueError("Unable to extract expected row data from cursor") from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NEAT
The constraint to remap may be a constraint directly on the live media table, in | ||
which case it is dropped and then added directly to the temp table. Alternatively, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be clear, does that mean the live tables won't have those constraints for some time, even while they're still live? That's probably fine, but if something fails part way through, could the constraints be missing on the live table the next time they're tried to be moved over?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. This does mirror the current implementation, so it's not a regression and the (minor) improvement here is that we'd have immediate insight into where a failure happened/if those steps need to be manually undone.
I'll make an issue about this, because it's not simple to work around. We could do something similar to the way we remap the indices, where we give the new constraints a temporary name and then map the names back over in the promote step (if that's even necessary 🤔). But the delete_orphans
step and remapping foreign key constraints to the temp table can't be easily postponed or undone.
I don't think we should be too worried about this because it's the very last step before promotion, anyway -- but definitely good to call out, and something we'd need to be aware of if for some reason we wanted to start the data refresh over from the beginning after a failure between this step and promotion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation, I think that sounds good! Not blocking in this case because it mirrors the previous implementation.
Based on the medium urgency of this PR, the following reviewers are being gently reminded to review this PR: @krysal Excluding weekend1 days, this PR was ready for review 4 day(s) ago. PRs labelled with medium urgency are expected to be reviewed within 4 weekday(s)2. @stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-ran the testing instructions, and the image refresh ran well and saved 10,000 images to the main image index.
@@ -84,7 +86,7 @@ def response_check_wait_for_completion(response: Response) -> bool: | |||
|
|||
@task | |||
def get_worker_params( | |||
estimated_record_count: int, | |||
id_range: tuple[int, int], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much clearer variable name!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, here's the rest of my feedback!
constraint_statement: str, constraint_table: str, temp_table_name: str | ||
): | ||
""" | ||
Parse a foreign key constraint and generate a `DELETE` statement to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this great documentation!
def apply_constraints_to_table( | ||
postgres_conn_id: str, table_name: str, constraints: list[str] | ||
): | ||
logger.info(f"Applying constraints for `{table_name}`.") | ||
for constraint in constraints: | ||
run_sql.function(postgres_conn_id=postgres_conn_id, sql_template=constraint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about failure retrying and idempotency here...should these maybe each be separate tasks that are run serially? They can probably still be mapped (by both table name and constraint order) 🤔 Not something that needs to be done now, but I'm thinking about the case where this fails half way through for some reason. We could maybe address and fix the issue, then restart it from that line rather than having to restart things further upstream. Just a though, no action needed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't dynamically map tasks within a dynamically mapped taskgroup, unfortunately (at least yet). I suppose we could have generate_constraints_for_table
generate constraints for all tables, and then map over table + constraint together when applying, although I don't like that control flow as much... 🤔 I think we could take this into consideration as part of the same issue I'll be creating for the other constraint concerns, particularly as neither are regressions from existing behavior.
# Connect to sensitive terms list used for filtered index creation in the data refresh. | ||
# For local development, the mock terms list is served by the catalog indexer worker. Change to the | ||
# full production URL in order to use the production sensitive terms list. | ||
AIRFLOW_CONN_SENSITIVE_TERMS="http://catalog_indexer_worker:8003/static/mock_sensitive_terms.txt" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for now! One disadvantage of declaring this in the Admin UI is that the password (if needed) for the services isn't codified in the same way in our infrastructure repo. Noting that for now, I'll add more thorough thoughts to that discussion when I get a chance to look at that discussion.
@@ -116,6 +117,7 @@ def create_api(): | |||
_api.add_route("/healthcheck", HealthcheckResource()) | |||
_api.add_route("/task", IndexingJobResource(task_tracker)) | |||
_api.add_route("/task/{task_id}", TaskStatusResource(task_tracker)) | |||
_api.add_static_route("/static", (Path(".") / "static").absolute()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might be able to initialize the operator inside a @task
, but it's probably easier to use the HttpHook
inside a @task
than have an operator-in-operator situation by using HttpOperator
.
I think I'm fine going with the new static route approach suggested here, but @stacimc do you mind adding a comment about what the route is for, and that it's only intended to be used for local testing?
@AetherUnbound I've run out of time today but written myself a TODO to create issues for the constraint error handling concerns you raised as well as Slack reporting. I'm not sure what Slack reporting we'll want yet, since unlike the ingestion server we'll already have much easier visibility into what's happening through Airflow -- but I think we ought to have reporting at least when the copy data and reindexing steps finish, for sure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fantastic work on this, looks great to me! I didn't get a chance to test it but given that other reviewers have, I feel comfortable approving it 😄
Fixes
Fixes #4149 by @stacimc
Description
This PR adds the final steps to the data refresh DAG:
This was much less straightforward than I imagined and this PR is fairly large as a consequence (although there are tests, and an
es_mapping.py
configuration file that's just copied from the ingestion server adding to the line count). I will attempt to break it into smaller pieces if reviewers like; however I'll point out that it's much easier to manually test in its current state, because you can actually run the entire data refresh (even multiple times in a row).While working on this PR I also noticed several small issues that I fixed at the same time. I have left comments at the relevant code changes to make these easier to spot:
create_index
steps to create the new index according to a hard-coded index config, rather than basing it off the config of the current index (see comment inline for reasoning)I've tried to make commit messages and comments in the code as descriptive as possible, but please ping me if clarification is needed anywhere.
Testing Instructions
Add the following to your
catalog/.env
:Run
ov just recreate
to start with fresh sample data.Run an image and an audio DAG for a few minutes to get some new records. Wait until you see that a couple 100 lines have been written in the
pull_data
logs, then mark the task as a success. I used Cleveland and Jamendo.Establish a baseline
Go to https://app.elasticvue.com/cluster/0/indices and verify you see the four initial indices (
image-init
,image-init-filtered
,audio-init
,audio-init-filtered
), and note the number of records in each. Now runov just api/pgcli
and rundescribe image;
anddescribe audio;
. Save the results somewhere.Run the new data refreshes and compare
Now enable the
staging_audio_data_refresh
andstaging_image_data_refresh
DAGs. They should both complete successfully. Once done, check out elasticvue again. You should see that the-init
indices have all been dropped, and replace with new indices with a uuid (exampleaudio-f61ab0e757d244ab9abeca0a0613d9e6
,audio-f61ab0e757d244ab9abeca0a0613d9e6-filtered
, and so on). You should have more records in each of these new indices than in the initial ones. To be extra safe, you can go check the logs of the provider DAGs you ran to see how many records were ingested; that's the number of extra records you should see in the non-filtered indices.In pgcli, again run
describe image;
anddescribe audio;
. Compare the results to what you saw earlier; pay particular attention to the indices and constraints. They should be identical, including the index names.Checklist
Update index.md
).main
) or a parent feature branch.ov just catalog/generate-docs
for catalogPRs) or the media properties generator (
ov just catalog/generate-docs media-props
for the catalog or
ov just api/generate-docs
for the API) where applicable.Developer Certificate of Origin
Developer Certificate of Origin