-
Notifications
You must be signed in to change notification settings - Fork 0
TIMX 495 - add new reindex-source CLI command #365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ | |
| }, | ||
| { | ||
| "name": "Bulk record processing commands", | ||
| "commands": ["bulk-index", "bulk-delete", "bulk-update"], | ||
| "commands": ["bulk-update", "reindex-source"], | ||
| }, | ||
| ] | ||
| } | ||
|
|
@@ -315,3 +315,72 @@ def bulk_update( | |
|
|
||
| summary_results = {"index": index_results, "delete": delete_results} | ||
| logger.info(f"Bulk update complete: {json.dumps(summary_results)}") | ||
|
|
||
|
|
||
| @main.command() | ||
| @click.option( | ||
| "-s", | ||
| "--source", | ||
| type=click.Choice(VALID_SOURCES), | ||
| required=True, | ||
| help="TIMDEX Source to fully reindex in Opensearch.", | ||
| ) | ||
| @click.option( | ||
| "-a", | ||
| "--alias", | ||
| multiple=True, | ||
| help="Alias to promote the index to in addition to the primary alias. May " | ||
| "be repeated to promote the index to multiple aliases at once.", | ||
| ) | ||
| @click.argument("dataset_path", type=click.Path()) | ||
| @click.pass_context | ||
| def reindex_source( | ||
| ctx: click.Context, | ||
| source: str, | ||
| alias: tuple[str], | ||
| dataset_path: str, | ||
| ) -> None: | ||
| """Perform a full refresh for a source in Opensearch for all current records. | ||
|
|
||
| This CLI command performs the following: | ||
| 1. creates a new index for the source | ||
| 2. promotes this index as the primary for the source alias, and added to any other | ||
| aliases passed (e.g. 'timdex') | ||
| 3. uses the TDA library to yield only current records from the parquet dataset | ||
| for the source | ||
| 4. bulk index these records to the new Opensearch index | ||
|
|
||
| The net effect is a full refresh for a source in Opensearch, ensuring only current, | ||
| non-deleted versions of records are used from the parquet dataset. | ||
| """ | ||
| client = ctx.obj["CLIENT"] | ||
|
|
||
| # create new index | ||
| index = helpers.generate_index_name(source) | ||
| new_index = tim_os.create_index(ctx.obj["CLIENT"], str(index)) | ||
| logger.info("Index '%s' created.", new_index) | ||
|
|
||
| # promote index | ||
| aliases = [source, *list(alias)] | ||
| tim_os.promote_index(client, index, extra_aliases=aliases) | ||
| logger.info( | ||
| "Index promoted. Current aliases for index '%s': %s", | ||
| index, | ||
| tim_os.get_index_aliases(client, index), | ||
| ) | ||
|
|
||
| # perform bulk indexing of current records from source | ||
| index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0} | ||
|
|
||
| td = TIMDEXDataset(location=dataset_path) | ||
| td.load(current_records=True, source=source) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is where we use the new |
||
|
|
||
| # bulk index records | ||
| records_to_index = td.read_transformed_records_iter(action="index") | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And here, we limit to only NOTE: this was actually what revealed the bug where non-current records could be yielded if filtering was applied, as it is here. This has been resolved. |
||
| try: | ||
| index_results.update(tim_os.bulk_index(client, index, records_to_index)) | ||
| except BulkIndexingError as exception: | ||
| logger.info(f"Bulk indexing failed: {exception}") | ||
|
|
||
| summary_results = {"index": index_results} | ||
| logger.info(f"Reindex source complete: {json.dumps(summary_results)}") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect overview of the command!