Skip to content

Commit

Permalink
Adding impacts.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Kathleen Kiker committed Dec 17, 2024
1 parent 7e83cd6 commit 470681b
Showing 1 changed file with 80 additions and 22 deletions.
102 changes: 80 additions & 22 deletions src/adam_core/dynamics/impacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import pyarrow.compute as pc
import quivr as qv

from adam_core.constants import KM_P_AU
from adam_core.constants import Constants as c
from adam_core.coordinates import CartesianCoordinates
from adam_core.ray_cluster import initialize_use_ray

Expand All @@ -20,6 +22,12 @@

logger = logging.getLogger(__name__)

C = c.C

# Use the Earth's equatorial radius as used in DE4XX ephemerides
# adam_core defines it in au but we need it in km
EARTH_RADIUS_KM = c.R_EARTH_EQUATORIAL * KM_P_AU

RAY_INSTALLED = False
try:
import ray
Expand Down Expand Up @@ -49,6 +57,7 @@ class EarthImpacts(qv.Table):


class ImpactProbabilities(qv.Table):
condition_id = qv.LargeStringColumn()
orbit_id = qv.LargeStringColumn()
impacts = qv.Int64Column()
variants = qv.Int64Column()
Expand All @@ -57,6 +66,24 @@ class ImpactProbabilities(qv.Table):
maximum_impact_time = Timestamp.as_column(nullable=True)


class CollisionConditions(qv.Table):
condition_id = qv.LargeStringColumn()
collision_object_name = qv.LargeStringColumn()
collision_distance = qv.Float64Column()
stopping_condition = qv.BooleanColumn()


class CollisionEvent(qv.Table):
orbit_id = qv.LargeStringColumn()
distance = qv.Float64Column() # Distance from collision object in km
coordinates = CartesianCoordinates.as_column()
variant_id = qv.LargeStringColumn(nullable=True)
condition_id = qv.LargeStringColumn()
collision_object_name = qv.LargeStringColumn()
collision_distance = qv.Float64Column()
stopping_condition = qv.BooleanColumn()


class ImpactMixin:
"""
`~adam_core.propagator.Propagator` mixin with signature for detecting Earth impacts.
Expand All @@ -74,6 +101,16 @@ def _detect_impacts(
"""
pass

def _detect_collisions(
self, orbits: Orbits, num_days: float, conditions: CollisionConditions
) -> Tuple[OrbitType, CollisionEvent]:
"""
Detect collisions for the given orbits.
THIS FUNCTION SHOULD NOT BE OVERRIDDEN BY THE USER.
"""
pass

def detect_impacts(
self,
orbits: OrbitType,
Expand Down Expand Up @@ -158,7 +195,8 @@ def calculate_impacts(
num_samples: int = 1000,
processes: Optional[int] = None,
seed: Optional[int] = None,
) -> Tuple[Orbits, EarthImpacts]:
impact_conditions: Optional[CollisionConditions] = None,
) -> Tuple[OrbitType, CollisionEvent]:
"""
Calculate the impacts for each variant orbit generated from the input orbits.
Expand Down Expand Up @@ -188,18 +226,27 @@ def calculate_impacts(
variants = VariantOrbits.create(
orbits, method="monte-carlo", num_samples=num_samples, seed=seed
)
if impact_conditions is None:
impact_conditions = CollisionConditions.from_kwargs(
condition_id=["Default"],
collision_object_name=["Earth"],
collision_distance=[EARTH_RADIUS_KM],
stopping_condition=[True],
)
logger.info("Detecting impacts...")
results, impacts = propagator.detect_impacts(
results, collisions = propagator._detect_collisions(
variants,
num_days,
max_processes=processes,
impact_conditions,
)

return results, impacts
return results, collisions


def calculate_impact_probabilities(
variants: VariantOrbits, impacts: EarthImpacts
variants: VariantOrbits,
collision_events: CollisionEvent,
conditions: Optional[CollisionConditions] = None,
) -> ImpactProbabilities:
"""
Calculate the impact probabilities for each variant orbit generated from the input orbits.
Expand All @@ -215,34 +262,45 @@ def calculate_impact_probabilities(
Impact probabilities for the variant orbits.
"""

if conditions is None:
conditions = CollisionConditions.from_kwargs(
condition_id=["Default"],
collision_object_name=["Earth"],
collision_distance=[EARTH_RADIUS_KM],
stopping_condition=[True],
)

# Loop through the unique set of orbit_ids within variants using quivr
unique_orbits = pc.unique(variants.orbit_id).to_pylist()

earth_impact_probabilities = None
impact_probabilities = None

for orbit_id in unique_orbits:
# mask = pc.equal(variants.orbit_id, orbit_id)
variant_masked = variants.select("orbit_id", orbit_id)
variant_count = len(variant_masked)

impacts_masked = impacts.select("orbit_id", orbit_id)
impact_count = len(impacts_masked)
impacts_masked = collision_events.select("orbit_id", orbit_id)

ip = ImpactProbabilities.from_kwargs(
orbit_id=[orbit_id],
impacts=[impact_count],
variants=[variant_count],
cumulative_probability=[impact_count / variant_count],
)

if earth_impact_probabilities is None:
earth_impact_probabilities = ip
else:
earth_impact_probabilities = qv.concatenate(
[earth_impact_probabilities, ip]
for unique_condition in conditions:
impacts_per_condition = impacts_masked.select(
"condition_id", unique_condition.condition_id[0]
)
impact_count = len(impacts_per_condition)

ip = ImpactProbabilities.from_kwargs(
condition_id=[unique_condition.condition_id[0]],
orbit_id=[orbit_id],
impacts=[impact_count],
variants=[variant_count],
cumulative_probability=[impact_count / variant_count],
)

return earth_impact_probabilities
if impact_probabilities is None:
impact_probabilities = ip
else:
impact_probabilities = qv.concatenate([impact_probabilities, ip])

return impact_probabilities


def link_impacting_variants(variants, impacts):
Expand Down

0 comments on commit 470681b

Please sign in to comment.