Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,059 changes: 552 additions & 507 deletions Pipfile.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,26 @@ interactions:
status:
code: 200
message: OK
- request:
body: null
headers:
Content-Length:
- '0'
content-type:
- application/json
user-agent:
- opensearch-py/2.8.0 (Python 3.12.2)
method: POST
uri: http://localhost:9200/test-index/_refresh
response:
body:
string: '{"_shards":{"total":2,"successful":1,"failed":0}}'
headers:
content-length:
- '49'
content-type:
- application/json; charset=UTF-8
status:
code: 200
message: OK
version: 1
31 changes: 31 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,37 @@ def test_bulk_update_embeddings_exit_bulk_operation_error(
assert "Bulk update with embeddings failed" in caplog.text


@patch("tim.helpers.validate_bulk_cli_options")
@patch("tim.opensearch.bulk_update")
def test_bulk_update_embeddings_source_only_logs_complete(
mock_bulk_update,
mock_validate_bulk_cli_options,
caplog,
monkeypatch,
runner,
):
monkeypatch.delenv("TIMDEX_OPENSEARCH_ENDPOINT", raising=False)
bulk_update_return = {"updated": 1, "errors": 0, "total": 1}
mock_bulk_update.return_value = bulk_update_return
mock_validate_bulk_cli_options.return_value = "libguides"

result = runner.invoke(
main,
[
"bulk-update-embeddings",
"--source",
"libguides",
"tests/fixtures/dataset",
],
)

assert result.exit_code == EXIT_CODES["success"]
assert (
f"Bulk update with embeddings complete: {json.dumps(bulk_update_return)}"
in caplog.text
)


@patch("tim.opensearch.create_index")
@patch("tim.opensearch.promote_index")
@patch("tim.opensearch.get_index_aliases")
Expand Down
14 changes: 10 additions & 4 deletions tests/test_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from tim.config import PRIMARY_ALIAS
from tim.errors import (
AliasNotFoundError,
BulkOperationError,
IndexExistsError,
IndexNotFoundError,
)
Expand Down Expand Up @@ -545,6 +544,7 @@ def test_bulk_update_updates_records(test_opensearch_client):
]
assert tim_os.bulk_update(test_opensearch_client, "test-index", iter(updates)) == {
"updated": 1,
"skipped": 0,
"errors": 0,
"total": 1,
}
Expand All @@ -553,7 +553,8 @@ def test_bulk_update_updates_records(test_opensearch_client):
@my_vcr.use_cassette(
"opensearch/bulk_update_raises_bulk_operation_error_if_record_not_found.yaml"
)
def test_bulk_update_raises_bulk_operation_error_if_record_not_found(
def test_bulk_update_records_error_record_not_found(
caplog,
test_opensearch_client,
):
updates = [
Expand All @@ -562,5 +563,10 @@ def test_bulk_update_raises_bulk_operation_error_if_record_not_found(
"title": "Materials Science & Engineering (UPDATED)",
}
]
with pytest.raises(BulkOperationError):
tim_os.bulk_update(test_opensearch_client, "test-index", iter(updates))
results = tim_os.bulk_update(test_opensearch_client, "test-index", iter(updates))
assert results["errors"] == 1
assert (
"""Error updating record 'i-am-not-found'. """
"""Details: {"type": "document_missing_exception", """
""""reason": "[i-am-not-found]: document missing","""
) in caplog.text
57 changes: 38 additions & 19 deletions tim/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,16 @@ def bulk_update(
"--source",
type=click.Choice(VALID_SOURCES),
help=(
"Source whose primary-aliased index will receive the bulk updated "
"records with embeddings."
"Source whose primary-aliased index will receive the bulk updated records with "
"embeddings. If --run-id is not passed, all current embeddings for this source "
"will be used."
),
)
@click.option("-rid", "--run-id", required=True, help="Run ID.")
@click.option(
"-rid",
"--run-id",
help="Limit to embeddings for a specific TIMDEX ETL run.",
)
@click.argument("dataset_path", type=click.Path())
@click.pass_context
def bulk_update_embeddings(
Expand Down Expand Up @@ -372,31 +377,43 @@ def bulk_update_embeddings(
f"into '{index}'"
)

update_results = {"updated": 0, "errors": 0, "total": 0}

td = TIMDEXDataset(location=dataset_path)

# bulk index embeddings
embeddings = td.embeddings.read_dicts_iter(
table="current_run_embeddings",
columns=[
"timdex_record_id",
"embedding_strategy",
"embedding_object",
],
run_id=run_id,
action="index",
)
# read embeddings for a specific run
if run_id:
embeddings = td.embeddings.read_dicts_iter(
table="current_run_embeddings",
columns=[
"timdex_record_id",
"embedding_strategy",
"embedding_object",
],
run_id=run_id,
action="index",
)
# default: read current embeddings for a source
else:
embeddings = td.embeddings.read_dicts_iter(
table="current_embeddings",
source=source,
columns=[
"timdex_record_id",
"embedding_strategy",
"embedding_object",
],
action="index",
)

embeddings_to_index = helpers.format_embeddings(embeddings)

try:
update_results.update(tim_os.bulk_update(client, index, embeddings_to_index))
update_results = tim_os.bulk_update(client, index, embeddings_to_index)
logger.info(f"Bulk update with embeddings complete: {json.dumps(update_results)}")

except BulkOperationError as exception:
logger.error(f"Bulk update with embeddings failed: {exception}") # noqa: TRY400
ctx.exit(1)

logger.info(f"Bulk update with embeddings complete: {json.dumps(update_results)}")


@main.command()
@click.option(
Expand Down Expand Up @@ -458,6 +475,7 @@ def reindex_source(
td = TIMDEXDataset(location=dataset_path)

# bulk index records
logger.info("Reindexing records.")
index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}
records_to_index = td.read_transformed_records_iter(
table="current_records",
Expand All @@ -470,6 +488,7 @@ def reindex_source(
logger.error(f"Bulk indexing failed: {exception}") # noqa: TRY400

# bulk index embeddings
logger.info("Reindexing embeddings.")
update_results = {"updated": 0, "errors": 0, "total": 0}
embeddings = td.embeddings.read_dicts_iter(
table="current_embeddings",
Expand Down
9 changes: 7 additions & 2 deletions tim/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def bulk_update(
Returns total sums of: records updated, errors, and total records
processed.
"""
result = {"updated": 0, "errors": 0, "total": 0}
result = {"updated": 0, "skipped": 0, "errors": 0, "total": 0}
actions = helpers.generate_bulk_actions(index, records, "update")
responses = streaming_bulk(
client,
Expand All @@ -445,7 +445,10 @@ def bulk_update(
if response[0] is False:
error = response[1]["update"]["error"]
record = response[1]["update"]["_id"]
if error["type"] == "mapper_parsing_exception":
if error["type"] in [
"mapper_parsing_exception",
"document_missing_exception",
]:
logger.error(
"Error updating record '%s'. Details: %s",
record,
Expand All @@ -458,6 +461,8 @@ def bulk_update(
)
elif response[1]["update"].get("result") == "updated":
result["updated"] += 1
elif response[1]["update"].get("result") == "noop":
result["skipped"] += 1
else:
logger.error(
"Something unexpected happened during update. Bulk update response: %s",
Expand Down