Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
564240a
correction and tests working
claudiomansillab May 8, 2025
9448e66
formatted all files in correction ms and changes workflow to add inte…
claudiomansillab May 8, 2025
e4fe761
added config yaml
claudiomansillab May 8, 2025
a5982bf
fixed input_path in config yaml
claudiomansillab May 8, 2025
7c41b8e
fixed come variables
claudiomansillab May 8, 2025
082b058
Merge branch 'main' of https://github.com/alercebroker/pipeline into …
claudiomansillab May 9, 2025
bf37630
added initial files for scribe parser
claudiomansillab May 19, 2025
31982fa
done scribe_parser and added test_correction to test the parser
claudiomansillab May 19, 2025
eab5c17
added tests for scribe_parser
claudiomansillab May 19, 2025
6b728ed
removed pre prod, added a parser in utils to fulfill function
kaymedinam May 20, 2025
122b368
new config for scribe parser, added test por the scribe parser
claudiomansillab May 22, 2025
58c9cfa
changed schema_path in scribe producer config to relative
claudiomansillab May 22, 2025
79f7347
fixed scribe_parser calls in test_db_operations and post_execute in step
claudiomansillab May 22, 2025
4c3204d
produce_scribe function fixed
claudiomansillab May 23, 2025
9c633d7
pull from main
ignacioreyes May 23, 2025
c739d26
step writing messages, step and run_step minor changes
claudiomansillab May 23, 2025
03e90cc
Merge branch 'scribe-ms' of https://github.com/alercebroker/pipeline …
claudiomansillab May 23, 2025
6c04d33
changed path on run_step
claudiomansillab May 23, 2025
7f49e52
formatted with black
claudiomansillab May 23, 2025
cd04828
added measurement_id in scribe
claudiomansillab May 23, 2025
34c54bb
Merge branch 'main' of https://github.com/alercebroker/pipeline into …
claudiomansillab May 27, 2025
6ead22c
added 3 new functions to reuse in parser_sql
claudiomansillab May 28, 2025
74c72be
Passed the main functions to parser_utils to have parser_sql the smal…
claudiomansillab May 29, 2025
c4062bd
added a drop_db in test, deleted a print in parser_sql and passed scr…
claudiomansillab May 30, 2025
549004d
deleted drop_db from test_db and added prints to debug in _connection.py
claudiomansillab May 30, 2025
6f7508f
Merge branch 'main' of https://github.com/alercebroker/pipeline into …
claudiomansillab May 30, 2025
7cc2c85
changed name of pk in models ztf_ss and ztf_ps1 for a conflict name
claudiomansillab May 30, 2025
70dce2e
refactored execute function from step (just order) new functions adde…
claudiomansillab Jun 6, 2025
b8e1ee3
pulled changes from main
claudiomansillab Jun 6, 2025
a23ddb6
changed a yaml path
claudiomansillab Jun 6, 2025
8fd5d7a
changed psql_config
claudiomansillab Jun 6, 2025
fcc5721
used black on correction and added a step to create a db on the test_…
claudiomansillab Jun 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions correction_multistream_ztf_step/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
correction_venv
247 changes: 28 additions & 219 deletions correction_multistream_ztf_step/core/parsers/parser_sql.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,16 @@
from db_plugins.db.sql.models import (
ZtfDetection,
ZtfForcedPhotometry,
ForcedPhotometry,
NonDetection,
Detection,
)
import math

# We assign tid and sid to 0, subject to changes in the future

GENERIC_FIELDS = [
"tid",
"sid",
"oid",
"pid",
"mjd",
"fid",
"ra",
"dec",
"measurement_id",
"isdiffpos",
"parent_candid",
"has_stamp",
"magpsf",
"sigmapsf",
"mag",
"e_mag",
]

CHANGE_VALUES = [
"tid",
"sid",
]

GENERIC_FIELDS_FP = [
"tid",
"sid",
"oid",
"pid",
"mjd",
"fid",
"ra",
"dec",
"isdiffpos",
"parent_candid",
"has_stamp",
]

CHANGE_NAMES = { # outside extrafields
"magpsf": "mag",
"sigmapsf": "e_mag",
}

CHANGE_NAMES_2 = { # inside extrafields
"sigmapsf_corr": "e_mag_corr",
"sigmapsf_corr_ext": "e_mag_corr_ext",
"magpsf_corr": "mag_corr",
}
ERRORS = {
1: 0.065,
2: 0.085,
3: 0.01,
}


def _e_ra(dec, fid):
try:
return ERRORS[fid] / abs(math.cos(math.radians(dec)))
except ZeroDivisionError:
return float("nan")
# Function imports
from core.parsers.parser_utils import (
ddbb_to_dict,
dicts_through_model,
join_ztf,
calc_ra_dec,
)


def parse_sql_detection(ztf_models: list, models: list, *, oids) -> list:
Expand All @@ -77,192 +19,59 @@ def parse_sql_detection(ztf_models: list, models: list, *, oids) -> list:
and we change some names helped by the CHANGE_NAMES dict in the top of the code.

"""
parsed_ztf_dets = []
extra_fields_list = []

# Here we take all the info from the DDBB and split it into dictionaries. Also we save some variables called extra_fields where this
# variables are NOT in the GENERIC_FIELDS dict.
for det in ztf_models:
det: dict = det[0].__dict__
extra_fields = {}
parsed_det = {}
for field, value in det.items():
if field.startswith("_"):
continue
elif not field in GENERIC_FIELDS:
extra_fields[field] = value
else:
if field in CHANGE_VALUES:
parsed_det[field] = 0
else:
parsed_det[field] = value
parsed_ztf_dets.append(parsed_det)
extra_fields_list.append(extra_fields)

parsed_dets = []

# Here we take all the info from the DDBB and split it into dictionaries too.
for d in models:
d: dict = d[0].__dict__
parsed_d = {}
for field, value in d.items():
if field.startswith("_"):
continue
else:
if field in CHANGE_VALUES:
parsed_d[field] = 0
else:
parsed_d[field] = value
parsed_dets.append(parsed_d)
# We take the ddbb and pass to dict format with certain conditions.
parsed_ztf_detections, extra_fields_list = ddbb_to_dict(ztf_models, ztf=True)
parsed_dets = ddbb_to_dict(models, ztf=False)

parsed_ztf_list = []

# Here we form the parsed_ztf object for every dict in parsed_ztf_dets.
for det in parsed_ztf_dets:
parsed_ztf = ZtfDetection(
**det,
)
parsed_ztf_list.append(parsed_ztf)
# Here we form the parsed_ztf object for every dict in parsed_ztf_detections.
parsed_ztf_list = dicts_through_model(parsed_ztf_detections, ZtfDetection)

# Here we join detections and ztf_detections in one. Also we hardcode forced and new as False because this is from the DDBB
# and is not a new object. On the other land as we are in detections, this is not forced so forced is false. Also we join the extra_fields field
# and change some names in this field.
for detections in parsed_dets:
for key, value in detections.items():
if not key in parsed_ztf_dets[parsed_dets.index(detections)].keys():
setattr(parsed_ztf_list[parsed_dets.index(detections)], key, value)

setattr(parsed_ztf_list[parsed_dets.index(detections)], "forced", False)
setattr(parsed_ztf_list[parsed_dets.index(detections)], "new", False)
setattr(
parsed_ztf_list[parsed_dets.index(detections)],
"extra_fields",
extra_fields_list[parsed_dets.index(detections)],
)
for name in CHANGE_NAMES.keys():
parsed_ztf_list[parsed_dets.index(detections)].__dict__[CHANGE_NAMES[name]] = (
parsed_ztf_list[parsed_dets.index(detections)].__dict__[name]
)
del parsed_ztf_list[parsed_dets.index(detections)].__dict__[name]

for name in CHANGE_NAMES_2.keys():
parsed_ztf_list[parsed_dets.index(detections)].__dict__["extra_fields"][
CHANGE_NAMES_2[name]
] = parsed_ztf_list[parsed_dets.index(detections)].__dict__["extra_fields"][name]
del parsed_ztf_list[parsed_dets.index(detections)].__dict__["extra_fields"][name]
parsed_ztf_list = join_ztf(
parsed_dets, parsed_ztf_list, parsed_ztf_detections, extra_fields_list, True
)

dict_parsed = list(map(vars, parsed_ztf_list))

# Here we calculate e_ra and e_dec and join into every final dict. These variables are used to calculate meanra and meandec in correction.
for d in dict_parsed:
e_ra = _e_ra(d["dec"], d["band"])
e_dec = ERRORS[d["band"]]

d["e_ra"] = e_ra
d["e_dec"] = e_dec

d["sid"] = 0
d["tid"] = 0

for d in dict_parsed:
del d["_sa_instance_state"]
dict_parsed = calc_ra_dec(dict_parsed)

return dict_parsed


def parse_sql_non_detection(ztf_models: list, *, oids) -> list:
non_dets = []

for d in ztf_models:
parsed_non_det = {}
for field, value in d[0].__dict__.items():
if field.startswith("_"):
continue
else:
if field in CHANGE_VALUES:
parsed_non_det[field] = 0
else:
parsed_non_det[field] = value
non_dets.append(parsed_non_det)

non_dets_parsed = []
for non_det in non_dets:
non_detection = NonDetection(**non_det)
non_dets_parsed.append(non_detection)
def parse_sql_non_detection(models: list, *, oids) -> list:

parsed_non_dets = ddbb_to_dict(models, ztf=False)
non_dets_parsed = dicts_through_model(parsed_non_dets, NonDetection)
dict_parsed = list(map(vars, non_dets_parsed))
for d in dict_parsed:
del d["_sa_instance_state"]
d["sid"] = 0
d["tid"] = 0

return dict_parsed


def parse_sql_forced_photometry(ztf_models: list, models: list, *, oids) -> list:
"""
Here we join the ztf_fp and fp in one dict for every oid. Also we add the field extra_fields to the final result.
"""
parsed_ztf_dets = []
extra_fields_list = []
# Here we take the info from the DDBB and make the extra_fields list of dicts.
for d in ztf_models:
parsed_fp_d = {}
extra_fields = {}
for key, value in d[0].__dict__.items():
if key.startswith("_"):
continue
elif not key in GENERIC_FIELDS:
extra_fields[key] = value
else:
if key in CHANGE_VALUES:
parsed_fp_d[key] = 0
else:
parsed_fp_d[key] = value
parsed_ztf_dets.append(parsed_fp_d)
extra_fields_list.append(extra_fields)
parsed_dets = []

# Here we take the info from the DDBB and make the extra_fields list of dicts.
for d in models:
d: dict = d[0].__dict__
parsed_d = {}
for field, value in d.items():
if field.startswith("_"):
continue
else:
if field in CHANGE_VALUES:
parsed_d[field] = 0
else:
parsed_d[field] = value
parsed_dets.append(parsed_d)
The functions are the same as in detections.
"""
parsed_ztf_detections, extra_fields_list = ddbb_to_dict(ztf_models, ztf=True)

parsed_ztf_list = []
parsed_dets = ddbb_to_dict(ztf_models, ztf=False)

for forced in parsed_ztf_dets:
parsed_ztf = (ZtfForcedPhotometry(**forced),)
parsed_ztf_list.append(parsed_ztf[0])
parsed_ztf_list = dicts_through_model(parsed_ztf_detections, ZtfForcedPhotometry)

# Here we join ztf_fp and fp and put forced to true and new to false.
for detections in parsed_dets:
for key, value in detections.items():
if not key in parsed_ztf_dets[parsed_dets.index(detections)].keys():
setattr(parsed_ztf_list[parsed_dets.index(detections)], key, value)

setattr(parsed_ztf_list[parsed_dets.index(detections)], "forced", True)
setattr(parsed_ztf_list[parsed_dets.index(detections)], "new", False)
setattr(
parsed_ztf_list[parsed_dets.index(detections)],
"extra_fields",
extra_fields_list[parsed_dets.index(detections)],
)
parsed_ztf_list = join_ztf(
parsed_dets, parsed_ztf_list, parsed_ztf_detections, extra_fields_list, False
)

dict_parsed = list(map(vars, parsed_ztf_list))

for d in dict_parsed:
del d["_sa_instance_state"]
d["sid"] = 0
d["tid"] = 0
del d["pid"] # Temporal fix to avoid error (Not all pids have value in the new DDBB)
d["pid"] = 0

return dict_parsed
Loading