Skip to content

Commit

Permalink
bug-1889156: remove crash prefix splitting
Browse files Browse the repository at this point in the history
  • Loading branch information
biancadanforth committed May 22, 2024
1 parent 9704c8f commit 85284aa
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 50 deletions.
91 changes: 43 additions & 48 deletions bin/load_processed_crashes_into_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
from socorro.libclass import build_instance_from_settings

NUM_CRASHIDS_TO_FETCH = "all"
# Number of prefix variations to pass to a check_crashids subprocess
CHUNK_SIZE = 4
# Number of seconds until we decide a worker has stalled
WORKER_TIMEOUT = 15 * 60

Expand Down Expand Up @@ -77,47 +75,58 @@ def check_elasticsearch(supersearch, crash_ids):
return set(crash_ids) - set(crash_ids_in_es)


def check_crashids_for_date(firstchars_chunk, date, only_missing_in_es):
"""Check crash ids for a given subset of all crash id prefixes and date"""
def get_crashids_in_storage(page, only_missing_in_es):
crash_source = build_instance_from_settings(settings.CRASH_SOURCE)
crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"])

supersearch = SuperSearch(crash_dest)
in_crash_source_page = []
missing_in_es_page = []
# NOTE(bdanforth): Keys here look like /v1/raw_crash/DATE/CRASHID
crash_ids = [item.split("/")[-1] for item in page]

in_crash_source = []
missing_in_es = []
if not crash_ids:
return []

for firstchars in firstchars_chunk:
# Grab all the crash ids at the given date directory
page_iterator = crash_source.list_objects_paginator(
prefix=f"v1/raw_crash/{date}/{firstchars}",
)

for page in page_iterator:
# NOTE(bdanforth): Keys here look like /v1/raw_crash/DATE/CRASHID
crash_ids = [item.split("/")[-1] for item in page]

if not crash_ids:
continue

# Check crashstorage source first
for crash_id in crash_ids:
if is_in_storage(crash_source, crash_id):
in_crash_source.append(crash_id)
else:
click.echo(
f"Could not find processed crash for raw crash {crash_id}."
)
# Check crashstorage source first
for crash_id in crash_ids:
if is_in_storage(crash_source, crash_id):
in_crash_source_page.append(crash_id)
else:
click.echo(f"Could not find processed crash for raw crash {crash_id}.")

if only_missing_in_es:
supersearch = SuperSearch(crash_dest)

# Check Elasticsearch in batches
for crash_ids_batch in chunked(in_crash_source, 100):
for crash_ids_batch in chunked(in_crash_source_page, 100):
missing_in_es_batch = check_elasticsearch(supersearch, crash_ids_batch)
missing_in_es.extend(missing_in_es_batch)
missing_in_es_page.extend(missing_in_es_batch)

return list(set(missing_in_es_page))

return list(set(missing_in_es))
return in_crash_source_page

return in_crash_source

def check_crashids_for_date(date, only_missing_in_es, num_workers):
"""Check crash ids for a given date"""
crash_source = build_instance_from_settings(settings.CRASH_SOURCE)

crash_ids = []

# Grab all the crash ids at the given date directory
page_iterator = crash_source.list_objects_paginator(
prefix=f"v1/raw_crash/{date}",
)

get_crashids = partial(
get_crashids_in_storage, only_missing_in_es=only_missing_in_es
)

with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
for result in executor.map(get_crashids, page_iterator, timeout=WORKER_TIMEOUT):
crash_ids.extend(result)

return crash_ids


def save_crash_to_es(crash_id):
Expand Down Expand Up @@ -181,25 +190,11 @@ def load_crashes(ctx, date, crash_id, num_workers, only_missing_in_es):
f"Checking for missing processed crashes for: {check_date_formatted}"
)

check_crashids = partial(
check_crashids_for_date,
crash_ids = check_crashids_for_date(
date=check_date_formatted,
only_missing_in_es=only_missing_in_es,
num_workers=num_workers,
)

firstchars_chunked = chunked(get_threechars(), CHUNK_SIZE)

if num_workers == 1:
for result in map(check_crashids, firstchars_chunked):
crash_ids.extend(result)
else:
with concurrent.futures.ProcessPoolExecutor(
max_workers=num_workers
) as executor:
for result in executor.map(
check_crashids, firstchars_chunked, timeout=WORKER_TIMEOUT
):
crash_ids.extend(result)
else:
raise click.BadParameter(
"Neither date nor crash_id were provided. At least one must be provided.",
Expand Down
4 changes: 2 additions & 2 deletions socorro/tests/test_load_processed_crashes_into_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_it_runs():


def test_it_loads_processed_crashes_by_date(storage_helper, es_helper):
"""Test whether the module loads processed crashes by date from S3."""
"""Test whether the module loads processed crashes by date."""
date_str = "2024-05-01"
expected_crashes = load_crashes_into_crashstorage_source(storage_helper, date_str)
runner = CliRunner()
Expand All @@ -76,7 +76,7 @@ def test_it_loads_processed_crashes_by_date(storage_helper, es_helper):


def test_it_loads_processed_crashes_by_crashid(storage_helper, es_helper):
"""Test whether the module loads processed crashes by crash id from S3."""
"""Test whether the module loads processed crashes by crash id."""
expected_crashes = load_crashes_into_crashstorage_source(storage_helper)
runner = CliRunner()
expected_crash = expected_crashes[0]
Expand Down

0 comments on commit 85284aa

Please sign in to comment.