Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
81086e8
lsst feature draft
franciscoandrades Oct 1, 2025
d3afabe
sync
franciscoandrades Oct 3, 2025
c70f222
draft
franciscoandrades Oct 9, 2025
c68702a
lsst preprocessing
franciscoandrades Oct 9, 2025
3624a4a
feature_step with output_parser
franciscoandrades Oct 20, 2025
f71369e
cleaning
franciscoandrades Oct 20, 2025
8eab43e
implemented produce_to_scribe
franciscoandrades Oct 23, 2025
092f6f4
fix sid value
franciscoandrades Oct 23, 2025
dc61b1b
do not save jsons
franciscoandrades Oct 27, 2025
97ca8b0
fix magnitude diff_flux calculation
franciscoandrades Oct 27, 2025
d285d28
fix fid mapping u6
franciscoandrades Oct 29, 2025
3ec3ffb
fix lsst composite phot
franciscoandrades Oct 29, 2025
49262eb
refactor
franciscoandrades Oct 29, 2025
bd0f4b2
using base.py
franciscoandrades Oct 29, 2025
581d954
fix fid_map spm
franciscoandrades Oct 29, 2025
b099a4a
fix fluxErr bug
franciscoandrades Nov 3, 2025
13ba5fb
remove prints
franciscoandrades Nov 3, 2025
a0ced1f
Merge branch 'multisurvey' of github.com:alercebroker/pipeline into f…
franciscoandrades Nov 5, 2025
65f541c
sync
franciscoandrades Nov 12, 2025
306884c
use schema from config
franciscoandrades Nov 12, 2025
8c29659
define schema in init
franciscoandrades Nov 12, 2025
a568c78
bring dockerfile from multisurvey
franciscoandrades Nov 24, 2025
357dcdb
Merge branch 'multisurvey' of github.com:alercebroker/pipeline into f…
franciscoandrades Nov 24, 2025
57e3aa9
database and utils scripts
franciscoandrades Nov 24, 2025
612b8d2
updated dockerfile
franciscoandrades Nov 24, 2025
927589e
updated scribe for oid keys
franciscoandrades Nov 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions feature_step/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
jsons/
csvs/
lsst_features_name_mapping.json
config copy.yaml
dfs_staging/
feature_plots/
consolidated_features/
config2.yaml
config.yaml
5 changes: 5 additions & 0 deletions feature_step/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ COPY libs/test_utils /libs/test_utils
COPY schemas/feature_step /schemas/feature_step
COPY schemas/xmatch_step /schemas/xmatch_step
COPY schemas/scribe_step /schemas/scribe_step
COPY schemas/correction_ms_step /schemas/correction_ms_step
COPY schemas/ingestion_step /schemas/ingestion_step


RUN poetry run python -m pip install setuptools wheel Cython==0.29.36 numpy
RUN poetry run python -m pip install ../mhps
RUN poetry run python -m pip install -r ../P4J/requirements.txt
RUN poetry install --without=test --no-root
COPY feature_step/scripts /app/scripts
COPY feature_step/settings.py /app/settings.py
COPY feature_step/features /app/features
#COPY feature_step/config2.yaml /app/config2.yaml

FROM python:3.10-slim
COPY --from=builder /app /app
Expand Down
68 changes: 68 additions & 0 deletions feature_step/features/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,71 @@ def get_sql_references(
reference = session.execute(stmt).all()
df = parse_sql_reference(reference, keys)
return df


def get_feature_name_lut(db_sql: PSQLConnection, schema: str, logger=None) -> dict:
"""Fetch feature name lookup table from multisurvey schema for LSST survey."""
if db_sql is None:
if logger:
logger.warning("No database connection available for feature name lookup")
return {}

try:
from sqlalchemy import text

with db_sql.session() as session:
# Query the feature_name_lut table from configured schema
query = text(f"SELECT feature_id, feature_name FROM {schema}.feature_name_lut ORDER BY feature_id")
result = session.execute(query)

# Create dictionary with id as key and name as value
feature_lut = {row[0]: row[1] for row in result.fetchall()}

if logger:
logger.info(f"Loaded {len(feature_lut)} feature names from lookup table")
return feature_lut

except Exception as e:
if logger:
logger.error(f"Error fetching feature name lookup table: {e}")
return {}


def get_or_create_version_id(db_sql: PSQLConnection, schema: str, version_name: str, logger=None) -> int:
"""Get version_id from version_lut table, or create it if it doesn't exist."""
if db_sql is None:
if logger:
logger.warning("No database connection available for version lookup")
return None

try:
from sqlalchemy import text

with db_sql.session() as session:
# First, try to get existing version_id
select_query = text(f"SELECT version_id FROM {schema}.feature_version_lut WHERE version_name = :version_name")
result = session.execute(select_query, {"version_name": version_name})
row = result.fetchone()

if row:
version_id = row[0]
if logger:
logger.info(f"Found existing version_id {version_id} for version_name '{version_name}'")
return version_id
else:
# Insert new version_name and get the generated version_id
insert_query = text(
f"INSERT INTO {schema}.feature_version_lut (version_name) VALUES (:version_name) RETURNING version_id"
)
result = session.execute(insert_query, {"version_name": version_name})
version_id = result.fetchone()[0]
session.commit()

if logger:
logger.info(f"Created new version_id {version_id} for version_name '{version_name}'")
return version_id

except Exception as e:
if logger:
logger.error(f"Error handling version_lut table: {e}")
return None
143 changes: 108 additions & 35 deletions feature_step/features/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,25 @@
from apf.consumers import KafkaConsumer

from lc_classifier.features.core.base import AstroObject, discard_bogus_detections
from lc_classifier.features.preprocess.ztf import ZTFLightcurvePreprocessor
from lc_classifier.features.composites.ztf import ZTFFeatureExtractor
from lc_classifier.features.preprocess.ztf import ZTFLightcurvePreprocessor #me falta crear esto
from lc_classifier.features.composites.ztf import ZTFFeatureExtractor #me falta crear esto
from lc_classifier.features.composites.lsst import LSSTFeatureExtractor
from lc_classifier.features.preprocess.lsst import LSSTLightcurvePreprocessor


from .database import (
PSQLConnection,
get_sql_references,
get_feature_name_lut,
get_or_create_version_id,
)

from .utils.metrics import get_sid
from .utils.parsers import parse_output, parse_scribe_payload
from .utils.parsers import detections_to_astro_object
from .utils.parsers import detections_to_astro_object,detections_to_astro_object_lsst
from .utils.parsers import parse_output_lsst,parse_scribe_payload_lsst
from .utils.data_utils import clean_and_flatten_columns, save_astro_objects_to_csvs


from importlib.metadata import version

Expand All @@ -44,16 +52,49 @@ def __init__(

super().__init__(config=config, **step_args)
# Bogus detections are dropped in pre_execute
self.lightcurve_preprocessor = ZTFLightcurvePreprocessor() # (drop_bogus=True)
self.feature_extractor = ZTFFeatureExtractor()


scribe_class = get_class(self.config["SCRIBE_PRODUCER_CONFIG"]["CLASS"])
self.scribe_producer = scribe_class(self.config["SCRIBE_PRODUCER_CONFIG"])
self.extractor_version = version("feature-step")
self.extractor_group = ZTFFeatureExtractor.__name__

self.db_sql = db_sql
self.logger = logging.getLogger("alerce.FeatureStep")
self.survey = self.config.get("SURVEY")

# Get schema from configuration
self.schema = self.config.get("DB_CONFIG", {}).get("SCHEMA", "multisurvey")

if self.survey == "ztf":
self.id_column = "candid"
self.lightcurve_preprocessor = ZTFLightcurvePreprocessor(drop_bogus=True)
self.feature_extractor = ZTFFeatureExtractor()
self.extractor_group = ZTFFeatureExtractor.__name__
self.detections_to_astro_object_fn = detections_to_astro_object
self.parse_output_fn = parse_output
self.parse_scribe_payload = parse_scribe_payload
self.extractor_version = version("feature-step")
self.feature_name_lut = None



if self.survey == "lsst":
self.id_column = "measurement_id"
self.lightcurve_preprocessor = LSSTLightcurvePreprocessor()
self.feature_extractor = LSSTFeatureExtractor()
self.extractor_group = LSSTFeatureExtractor.__name__
self.detections_to_astro_object_fn = detections_to_astro_object_lsst
self.parse_output_fn = parse_output_lsst
self.parse_scribe_payload = parse_scribe_payload_lsst

# Get version name and resolve version_id from version_lut table
version_name = version("feature-step")
self.extractor_version = get_or_create_version_id(
self.db_sql, self.schema, version_name, self.logger
)

# Fetch feature name lookup table from multisurvey schema
self.feature_name_lut = get_feature_name_lut(
self.db_sql, self.schema, self.logger
)

self.min_detections_features = config.get("MIN_DETECTIONS_FEATURES", None)
if self.min_detections_features is None:
Expand All @@ -62,12 +103,14 @@ def __init__(
self.min_detections_features = int(self.min_detections_features)

def produce_to_scribe(self, astro_objects: List[AstroObject]):
commands = parse_scribe_payload(
commands = self.parse_scribe_payload(
astro_objects,
self.extractor_version,
self.extractor_group,
self.feature_name_lut
)
update_object_cmds = commands["update_object"]

update_object_cmds = commands.get("update_object", [])
update_features_cmds = commands["upserting_features"]

count_objs = 0
Expand All @@ -84,73 +127,103 @@ def produce_to_scribe(self, astro_objects: List[AstroObject]):
count_features += 1
if count_features == len(update_features_cmds):
flush = True
self.scribe_producer.produce({"payload": json.dumps(command)}, flush=flush)
oid = command["payload"]["oid"]
self.scribe_producer.producer.produce(
topic= self.scribe_topic_name,
value=json.dumps(command).encode("utf-8"),
key=str(oid).encode("utf-8"),
)

if flush:
self.scribe_producer.producer.flush()


def pre_produce(self, result: Iterable[Dict[str, Any]] | Dict[str, Any]):
self.set_producer_key_field("oid")
return result

def _get_sql_references(self, oids: List[str]) -> Optional[pd.DataFrame]:
db_references = get_sql_references(
oids, self.db_sql, keys=["oid", "rfid", "sharpnr", "chinr"]
)
db_references = db_references[db_references["chinr"] >= 0.0].copy()
return db_references

def pre_execute(self, messages: List[dict]):
filtered_messages = []

filtered_messages = []
for message in messages:
filtered_message = message.copy()
filtered_message["detections"] = discard_bogus_detections(
filtered_message["detections"]
)
filtered_messages.append(filtered_message)

def has_enough_detections(message: dict) -> bool:
n_dets = len([True for det in message["detections"] if not det["forced"]])
if self.survey == "ztf":
filtered_message["detections"] = discard_bogus_detections(
filtered_message.get("detections", [])
)
filtered_messages.append(filtered_message)
elif self.survey == "lsst":
dets = filtered_message.get('sources', []) + filtered_message.get('previous_sources', [])
dets = [elem for elem in dets if elem.get('band') is not None]
filtered_message['detections'] = dets
filtered_messages.append(filtered_message)

def has_enough_detections(message: dict) -> bool: # (ZTF)
n_dets = len([True for det in message["detections"] if not det.get("forced", False)])
return n_dets >= self.min_detections_features

if self.survey == "ztf":
filtered_messages = list(filter(has_enough_detections, filtered_messages))
else:
filtered_messages = list(filter(has_enough_detections, filtered_messages))

if len(filtered_messages) > 0:
self.logger.info("TIENE LENGTH MAYOR A CERO")

filtered_messages = filter(has_enough_detections, filtered_messages)
filtered_messages = list(filtered_messages)
return filtered_messages



def execute(self, messages):

candids = {}
astro_objects = []
messages_to_process = []

oids = set()
bands = set()
for msg in messages:
oids.add(msg["oid"])
references_db = self._get_sql_references(list(oids))

if self.survey == "ztf":
db_references = get_sql_references(
list(oids), self.db_sql, keys=["oid", "rfid", "sharpnr", "chinr"]
)
references_db = db_references[db_references["chinr"] >= 0.0].copy()
for message in messages:
if not message["oid"] in candids:
candids[message["oid"]] = []
candids[message["oid"]].extend(message["candid"])
candids[message["oid"]].extend(message[self.id_column]) #guarda los candid de cada oid
m = map(
lambda x: {**x, "index_column": str(x["candid"]) + "_" + x["oid"]},
lambda x: {**x, "index_column": str(x[self.id_column]) + "_" + str(x["oid"])},
message.get("detections", []),
)
xmatch_data = message["xmatches"]

ao = detections_to_astro_object(list(m), xmatch_data, references_db)
if self.survey == "ztf":
xmatch_data = message["xmatches"]
ao = self.detections_to_astro_object_fn(list(m), xmatch_data,references_db)
else:
forced = message.get("forced_sources", None) #si no hay detections, filtrar forced photometry
ao = self.detections_to_astro_object_fn(list(m), forced)
astro_objects.append(ao)
messages_to_process.append(message)

self.lightcurve_preprocessor.preprocess_batch(astro_objects)
self.feature_extractor.compute_features_batch(astro_objects, progress_bar=False)

# Guardar resultados en CSVs por objeto usando función externa
#batch_folder = save_astro_objects_to_csvs(astro_objects, messages_to_process, base_folder="csvs")
self.produce_to_scribe(astro_objects)
output = parse_output(astro_objects, messages_to_process, candids)
output = self.parse_output_fn(astro_objects, messages_to_process, candids)
return output

def post_execute(self, result):

self.metrics["sid"] = get_sid(result)

for message in result:
del message["reference"]
if "reference" in message:
del message["reference"]

return result

Expand Down
54 changes: 54 additions & 0 deletions feature_step/features/utils/data_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pandas as pd
import os
import uuid
from typing import List


def clean_and_flatten_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Aplana columnas con listas y remueve saltos de línea en strings."""
for col in df.columns:
if df[col].apply(lambda x: isinstance(x, list)).any():
df = df.explode(col)
df[col] = df[col].apply(lambda x: str(x).replace('\n', ' ') if isinstance(x, str) else x)
return df


def save_astro_objects_to_csvs(
astro_objects: List["AstroObject"],
messages_to_process: List[dict],
base_folder: str = "csvs",
) -> str:
"""Guarda detections y features de cada AstroObject en CSVs por OID.

Crea una carpeta base si no existe y un subfolder por batch con UUID.
Retorna la ruta del folder del batch.
"""
if not os.path.exists(base_folder):
os.makedirs(base_folder)

batch_id = str(uuid.uuid4())
batch_folder = os.path.join(base_folder, batch_id)
os.makedirs(batch_folder)
#print(len(astro_objects))

for i, (ao, msg) in enumerate(zip(astro_objects, messages_to_process)):
oid = getattr(ao, "oid", msg.get("oid", f"obj_{i}"))
#print(oid)
detections_csv_path = os.path.join(batch_folder, f"{oid}_detections.csv")
features_csv_path = os.path.join(batch_folder, f"{oid}_features.csv")

detections_df = clean_and_flatten_columns(ao.detections)
features_df = clean_and_flatten_columns(ao.features)

if "sid" in detections_df.columns:
detections_df = detections_df.drop(columns=["sid"])
if "sid" in features_df.columns:
features_df = features_df.drop(columns=["sid"])

detections_df.to_csv(detections_csv_path, index=False)
features_df.to_csv(features_csv_path, index=False)

print(f"Saved: {detections_csv_path}")
print(f"Saved: {features_csv_path}")

return batch_folder
Loading