Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Fixed

- V2: Fixed memory leaks caused by nested delays [#854](https://github.com/askap-vast/vast-pipeline/pull/854)
- V2: Fixed duplicate source ID error by switching from sources_df.repartition() to sources_df.shuffle() in pipeline.finalise.final_operations [#845](https://github.com/askap-vast/vast-pipeline/pull/845)
- V2: Tentative fix for image upload memory leak via garbage collect [#845](https://github.com/askap-vast/vast-pipeline/pull/845)
- V2: Fixed dd.concat memory blow-up in pipeline.forced_extraction.forced_extraction by persisting both dataframes prior [#845](https://github.com/askap-vast/vast-pipeline/pull/845)
Expand Down Expand Up @@ -77,6 +78,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### List of PRs

- [#854](https://github.com/askap-vast/vast-pipeline/pull/854): fix: V2: Fixed memory leaks caused by nested delays
- [#852](https://github.com/askap-vast/vast-pipeline/pull/852): feat: V2: Switch worker throttling to use Dask Semaphore and add Dask cluster memory usage logging
- [#846](https://github.com/askap-vast/vast-pipeline/pull/846): fix: V2: Compute associations prior to upload and persist srcs_df in final pairs calculation
- [#845](https://github.com/askap-vast/vast-pipeline/pull/845): fix: V2: Fix memory leaks and duplicate source ID issue
Expand Down
42 changes: 26 additions & 16 deletions vast_pipeline/pipeline/forced_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,13 @@ def parallel_extraction(
out = out.drop(["image_rms_min", "detection"], axis=1).rename(
columns={"image": "image_name"}
)
out = out.set_index("image_name", sorted=False).persist()
logger.info("Persisting out df...")
wait(out)
logger.info("Persisted out df")

# get the unique images to extract from
unique_images_to_extract = out["image_name"].unique().compute().tolist()
unique_images_to_extract = out.index.unique().compute().tolist()

# create a list of all the measurements parquet files to extract data from,
# such as prefix and max_id
Expand All @@ -403,37 +408,42 @@ def parallel_extraction(
)
)

# Get a map of the columns that have a fixed value from the measurements parquets
# in list_meas_parquets. This generates a list of delayed futures that will only
# compute at the next persist.
# Get the columns that have a fixed value from the measurements parquets
# - this will be computed, which should be fine for a moderate number of images
df_cols = ["id", "path", "background_path", "noise_path", "beam_bmaj", "beam_bmin", "beam_bpa", "datetime"]
measurements_parquet_data = (
db.from_sequence(list_meas_parquets, npartitions=len(list_meas_parquets))
.map(get_data_from_parquet, p_run_path, add_mode)
.to_dataframe()
.merge(df_images[df_cols], on="id", how="left")
.to_delayed()
.compute()
.reset_index(drop=True)
)

# Create a list of dataframes containing the relevant data from out per image
# This generates a list of delayed futures that will only compute at the next persist.
generate_df = lambda name, out: out[out["image_name"] == name]
df_per_image=[delayed(generate_df)(n, out) for n in unique_images_to_extract]
@delayed
def generate_df(name):
return out.loc[name].compute().reset_index()
df_per_image=[generate_df(n) for n in unique_images_to_extract]


# Do the forced extraction work by combining the two delayed lists above then
# running extract_from_image on the tuple of delayed futures.
# Persist at this point uning the number of io workers.
image_data_list = zip(df_per_image, measurements_parquet_data)
func_d = [
delayed(extract_from_image)(image_df, meas_data, edge_buffer=edge_buffer,
cluster_threshold=cluster_threshold, allow_nan=allow_nan)
for image_df, meas_data in image_data_list
delayed(extract_from_image)(
image_df,
measurements_parquet_data.loc[[idx]],
edge_buffer=edge_buffer,
cluster_threshold=cluster_threshold,
allow_nan=allow_nan
)
for idx, image_df in enumerate(df_per_image)
]

# Persist at this point uning the number of io workers.
# df_out will contain the forced extraction measurments per image.
# df_out should be sorted and partitioned by image at this point.
df_out = dd.from_delayed(func_d).persist()
logger.info("Generating df_out from delayeds and persisting...")
wait(df_out)
logger.info("Finished persisting df_out")

del out, func_d, df_per_image, measurements_parquet_data

Expand Down
51 changes: 41 additions & 10 deletions vast_pipeline/pipeline/loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,16 @@ def copy_upload_associations(
associations_df: The associations dataframe to upload.
batch_size: The batch size. Defaults to 10_000.
"""
logger.info("Uploading associations in batches of %d", batch_size)

n_associations = len(associations_df)
n_partitions = associations_df.npartitions
logger.info(
"Uploading %d total associations w/ %d partitions in batches of %d",
n_associations,
n_partitions,
batch_size
)

columns_to_upload = ["source"]
for fld in Association._meta.get_fields():
if getattr(fld, "attname", None) and fld.attname in associations_df.columns:
Expand All @@ -467,16 +476,38 @@ def copy_upload_associations(
"d2d": "d2d",
"dr": "dr"
}

def upload(df, Association, mapping, batch_size):
logger.info(
"Uploading partition of %d associations in batches of %d",
len(df),
batch_size
)

df["db_id"] = df.apply(lambda _: str(uuid4()), axis=1)
copy_upload_model(
df,
Association,
mapping=mapping,
batch_size=batch_size
)

associations_df = associations_df[columns_to_upload].map_partitions(
upload,
Association,
mapping,
batch_size,
enforce_metadata=False,
meta={}
)

timer = StopWatch()
associations_df = associations_df.compute()
logger.debug("Time to compute associations_df: %.1f s", timer.reset())

associations_df["db_id"] = associations_df.apply(lambda _: str(uuid4()), axis=1)
logger.debug("Time to add db_id: %.1f s", timer.reset())

copy_upload_model(associations_df, Association, mapping=mapping, batch_size=batch_size)
logger.debug("Time to upload associations: %.1f s", timer.reset())
logger.info("Associations upload complete")
associations_df.compute()
logger.info(
"Uploaded %d associations in %.1f s",
n_associations,
timer.reset()
)


def make_upload_associations(associations_df: pd.DataFrame) -> None:
Expand Down
18 changes: 9 additions & 9 deletions vast_pipeline/pipeline/new_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,24 +246,24 @@ def parallel_get_new_high_sigma(

cols = ['img_diff_rms_path', 'flux_peak', 'source', 'wavg_ra', 'wavg_dec']

# Generate a delayed dataframe of sources for each image in uniq_img_diff
df_generator = lambda element, df: df[df['img_diff_rms_path'] == element]
df_per_img_rms = [delayed(df_generator)(elem, df[cols]) for elem in uniq_img_diff]
def process_group(df_group):
return get_image_rms_measurements(df_group, edge_buffer=edge_buffer)

# Do the rms calculations per rms image only using the subset of workers for IO
out = [delayed(get_image_rms_measurements)(rms_df, edge_buffer=edge_buffer) for rms_df in df_per_img_rms]
out = dd.from_delayed(out).persist()
out = df[cols].groupby("img_diff_rms_path") \
.apply(
process_group,
meta={'source': str, 'true_sigma': float}) \
.persist()

# Remove duplicate sources and only keep high sigma
out = out.sort_values('true_sigma', ascending=True) \
.drop_duplicates('source', keep='last') \
.rename(columns={'true_sigma': 'new_high_sigma'}) \
.set_index('source') \
.persist()

# Wait for delayed computations to finish.
logger.debug("Setup out df persisting...")
wait(out)
del df_per_img_rms
logger.debug("Finished persisting out df")

return out

Expand Down