Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 71 additions & 57 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,48 @@ TIMDEX! Index Manager (TIM) is a Python CLI application for managing TIMDEX indi

1. Run the following command:

``` bash
docker run -p 9200:9200 -p 9600:9600 -e "discovery.type=single-node" \
-e "plugins.security.disabled=true" \
opensearchproject/opensearch:2.11.1
```
``` bash
docker run -p 9200:9200 -p 9600:9600 -e "discovery.type=single-node" \
-e "plugins.security.disabled=true" \
opensearchproject/opensearch:2.11.1
```

2. To confirm the instance is up, run `pipenv run tim -u localhost ping` or visit http://localhost:9200/. This should produce a log that looks like the following:
```
2024-02-08 13:22:16,826 INFO tim.cli.main(): OpenSearch client configured for endpoint 'localhost'

```text
2024-02-08 13:22:16,826 INFO tim.cli.main(): OpenSearch client configured for endpoint 'localhost'

Name: docker-cluster
UUID: RVCmwQ_LQEuh1GrtwGnRMw
OpenSearch version: 2.11.1
Lucene version: 9.7.0
Name: docker-cluster
UUID: RVCmwQ_LQEuh1GrtwGnRMw
OpenSearch version: 2.11.1
Lucene version: 9.7.0

2024-02-08 13:22:16,930 INFO tim.cli.log_process_time(): Total time to complete process: 0:00:00.105506
```
2024-02-08 13:22:16,930 INFO tim.cli.log_process_time(): Total time to complete process: 0:00:00.105506
```

### Running Opensearch and OpenSearch Dashboards locally with Docker

You can use the included Docker Compose file ([compose.yaml](compose.yaml)) to start an OpenSearch instance along with OpenSearch Dashboards, "[the user interface that lets you visualize your Opensearch data and run and scale your OpenSearch clusters](https://opensearch.org/docs/latest/dashboards/)". Two tools that are useful for exploring indices are [DevTools](https://opensearch.org/docs/latest/dashboards/dev-tools/index-dev/) and [Discover](https://opensearch.org/docs/latest/dashboards/discover/index-discover/).

**Note:** To use Discover, you'll need to create an index pattern. When creating the index pattern, decline the option to set a date field. When set, it detects a date field in our indices but then crashes trying to use it. When prompted, enter an index or alias to pull patterns from, and it will automatically be configured to work well enough for initial data exploration.

First, ensure the following environment variables are set:

0. First, set some environment variables:

```shell
OPENSEARCH_INITIAL_ADMIN_PASSWORD=SuperSecret42!
```

1. Run the following command:
```bash
docker pull opensearchproject/opensearch:latest
docker pull opensearchproject/opensearch-dashboards:latest
docker compose up
```

2. To confirm the instance is up, run `pipenv run tim -u localhost ping` or visit http://localhost:9200/.
```shell
docker pull opensearchproject/opensearch:latest
docker pull opensearchproject/opensearch-dashboards:latest
docker compose up
```

2. To confirm the instance is up, run `pipenv run tim ping` or visit http://localhost:9200/.

3. Access OpenSearch Dashboards through <http://localhost:5601>.

Expand All @@ -60,25 +70,28 @@ For a more detailed example with test data, please refer to the Confluence docum
1. Follow the instructions in either [Running Opensearch locally with Docker](#running-opensearch-locally-with-docker) or [Running Opensearch and OpenSearch Dashboards locally with Docker](#running-opensearch-and-opensearch-dashboards-locally-with-docker).

2. Open a new terminal, and create a new index. Copy the name of the created index printed to the terminal's output.
```
pipenv run tim create -s <source-name>
```

```shell
pipenv run tim create -s <source-name>
```

3. Copy the index name and promote the index to the alias.

```
pipenv run tim promote -a <source-name> -i <index-name>
```
```shell
pipenv run tim promote -a <source-name> -i <index-name>
```

4. Bulk index records from a specified directory (e.g., including S3).
```
pipenv run tim bulk-index -s <source-name> <filepath-to-records>
```

```shell
pipenv run tim bulk-index -s <source-name> <filepath-to-records>
```

5. After verifying that the bulk-index was successful, clean up your local OpenSearch instance by deleting the index.
```
pipenv run tim delete -i <index-name>
```

```shell
pipenv run tim delete -i <index-name>
```

### Running OpenSearch on AWS

Expand Down Expand Up @@ -115,31 +128,32 @@ SENTRY_DSN=### If set to a valid Sentry DSN, enables Sentry exception monitoring
All CLI commands can be run with `pipenv run`.

```
Usage: tim [OPTIONS] COMMAND [ARGS]...

TIM provides commands for interacting with OpenSearch indexes.
For more details on a specific command, run tim COMMAND -h.

╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ --url -u TEXT The OpenSearch instance endpoint minus the http scheme, e.g. │
│ 'search-timdex-env-1234567890.us-east-1.es.amazonaws.com'. If not provided, will attempt to get from the │
│ TIMDEX_OPENSEARCH_ENDPOINT environment variable. Defaults to 'localhost'. │
│ --verbose -v Pass to log at debug level instead of info │
│ --help -h Show this message and exit. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Get cluster-level information ────────────────────────────────────────────────────────────────────────────────────────────────╮
│ ping Ping OpenSearch and display information about the cluster. │
│ indexes Display summary information about all indexes in the cluster. │
│ aliases List OpenSearch aliases and their associated indexes. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Index management commands ────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ create Create a new index in the cluster. │
│ delete Delete an index. │
│ promote Promote index as the primary alias and add it to any additional provided aliases. │
│ demote Demote an index from all its associated aliases. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Bulk record processing commands ──────────────────────────────────────────────────────────────────────────────────────────────╮
│ bulk-update Bulk update records for an index. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
Usage: tim [OPTIONS] COMMAND [ARGS]...

TIM provides commands for interacting with OpenSearch indexes.
For more details on a specific command, run tim COMMAND -h.

╭─ Options ─────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ --url -u TEXT The OpenSearch instance endpoint minus the http scheme, e.g. │
│ 'search-timdex-env-1234567890.us-east-1.es.amazonaws.com'. If not provided, will attempt to get │
│ from the TIMDEX_OPENSEARCH_ENDPOINT environment variable. Defaults to 'localhost'. │
│ --verbose -v Pass to log at debug level instead of info │
│ --help -h Show this message and exit. │
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Get cluster-level information ───────────────────────────────────────────────────────────────────────────────────────╮
│ ping Ping OpenSearch and display information about the cluster. │
│ indexes Display summary information about all indexes in the cluster. │
│ aliases List OpenSearch aliases and their associated indexes. │
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Index management commands ───────────────────────────────────────────────────────────────────────────────────────────╮
│ create Create a new index in the cluster. │
│ delete Delete an index. │
│ promote Promote index as the primary alias and add it to any additional provided aliases. │
│ demote Demote an index from all its associated aliases. │
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Bulk record processing commands ─────────────────────────────────────────────────────────────────────────────────────╮
│ bulk-update Bulk update records for an index. │
│ reindex-source Perform a full refresh for a source in Opensearch for all current records. │
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
```

41 changes: 41 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,44 @@ def test_bulk_update_with_source_raise_bulk_indexing_error(
f'{{"index": {json.dumps(index_results_default)}, '
f'"delete": {json.dumps(mock_bulk_delete())}}}' in caplog.text
)


@patch("tim.opensearch.create_index")
@patch("tim.opensearch.promote_index")
@patch("tim.opensearch.get_index_aliases")
@patch("timdex_dataset_api.dataset.TIMDEXDataset.load")
@patch("tim.opensearch.bulk_index")
def test_reindex_source_success(
mock_bulk_index,
mock_timdex_dataset,
mock_get_index_aliases,
mock_promote_index,
mock_create_index,
caplog,
monkeypatch,
runner,
):
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
mock_get_index_aliases.return_value = ["alma", "all-current", "timdex"]
mock_bulk_index.return_value = {
"created": 1000,
"updated": 0,
"errors": 0,
"total": 1000,
}
mock_timdex_dataset.return_value = MagicMock()

result = runner.invoke(
main,
[
"reindex-source",
"--source",
"alma",
"s3://test-timdex-bucket/dataset",
],
)
assert result.exit_code == EXIT_CODES["success"]
assert (
"Reindex source complete: "
f'{{"index": {json.dumps(mock_bulk_index())}' in caplog.text
)
71 changes: 70 additions & 1 deletion tim/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
},
{
"name": "Bulk record processing commands",
"commands": ["bulk-index", "bulk-delete", "bulk-update"],
"commands": ["bulk-update", "reindex-source"],
},
]
}
Expand Down Expand Up @@ -315,3 +315,72 @@ def bulk_update(

summary_results = {"index": index_results, "delete": delete_results}
logger.info(f"Bulk update complete: {json.dumps(summary_results)}")


@main.command()
@click.option(
"-s",
"--source",
type=click.Choice(VALID_SOURCES),
required=True,
help="TIMDEX Source to fully reindex in Opensearch.",
)
@click.option(
"-a",
"--alias",
multiple=True,
help="Alias to promote the index to in addition to the primary alias. May "
"be repeated to promote the index to multiple aliases at once.",
)
@click.argument("dataset_path", type=click.Path())
@click.pass_context
def reindex_source(
ctx: click.Context,
source: str,
alias: tuple[str],
dataset_path: str,
) -> None:
"""Perform a full refresh for a source in Opensearch for all current records.

This CLI command performs the following:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect overview of the command!

1. creates a new index for the source
2. promotes this index as the primary for the source alias, and added to any other
aliases passed (e.g. 'timdex')
3. uses the TDA library to yield only current records from the parquet dataset
for the source
4. bulk index these records to the new Opensearch index

The net effect is a full refresh for a source in Opensearch, ensuring only current,
non-deleted versions of records are used from the parquet dataset.
"""
client = ctx.obj["CLIENT"]

# create new index
index = helpers.generate_index_name(source)
new_index = tim_os.create_index(ctx.obj["CLIENT"], str(index))
logger.info("Index '%s' created.", new_index)

# promote index
aliases = [source, *list(alias)]
tim_os.promote_index(client, index, extra_aliases=aliases)
logger.info(
"Index promoted. Current aliases for index '%s': %s",
index,
tim_os.get_index_aliases(client, index),
)

# perform bulk indexing of current records from source
index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}

td = TIMDEXDataset(location=dataset_path)
td.load(current_records=True, source=source)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where we use the new load(current_records=True).


# bulk index records
records_to_index = td.read_transformed_records_iter(action="index")
Copy link
Contributor Author

@ghukill ghukill Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here, we limit to only action="index" for bulk indexing. We do not need action="delete" records, because if they are the current version, they don't/shouldn't exist in Opensearch!

NOTE: this was actually what revealed the bug where non-current records could be yielded if filtering was applied, as it is here. This has been resolved.

try:
index_results.update(tim_os.bulk_index(client, index, records_to_index))
except BulkIndexingError as exception:
logger.info(f"Bulk indexing failed: {exception}")

summary_results = {"index": index_results}
logger.info(f"Reindex source complete: {json.dumps(summary_results)}")