Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions socs/agents/acu/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
'ccat': {
'enabled': False,
'exclusion_radius': 20,
'el_horizon': 10,
'el_horizon': 0,
'el_dodging': True,
'min_sun_time': 1800,
'response_time': 7200,
},
Expand Down Expand Up @@ -2374,8 +2375,10 @@ def monitor_sun(self, session, params):
78.24269024936028,
60.919554369324096
],
"sun_down": false,
"sun_dist": 41.75087242151837,
"sun_safe_time": 71760
"sun_safe_time": 71760,
"platform_down": false
},
"avoidance": {
"safety_unknown": false,
Expand Down Expand Up @@ -2431,6 +2434,8 @@ def lookup(keys, tree):
'sun_el': (('sun_pos', 'sun_azel', 1), float),
'sun_dist': (('sun_pos', 'sun_dist'), float),
'sun_safe_time': (('sun_pos', 'sun_safe_time'), float),
'sun_down': (('sun_pos', 'sun_down'), int),
'platform_down': (('sun_pos', 'platform_down'), int),
}
for k in ['warning_zone', 'danger_zone',
'escape_triggered', 'escape_active']:
Expand Down
207 changes: 153 additions & 54 deletions socs/agents/acu/avoidance.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,33 @@
called the ``exclusion_radius`` or the field-of-view radius.

The Safety of the instrument at any given moment is parametrized
by two numbers:
by two numbers and a boolean:

``sun_dist``
The separation between the Sun and the boresight (az, el), in
degrees.
degrees. This is defined without regards for the horizon --
i.e. even if the Sun and the boresight are both below "the
horizon", the sun distance can be small.

``sun_time``
The minimum time, in seconds, which must elapse before the current
(az, el) pointing of the boresight will lie within the exclusion
radius of the Sun.
radius of the Sun. Although some positions will "never" see the
Sun, the sun_time for such positions is set to about 2 days, as a
place-holder for "the future". When the boresight is well below
the horizon, the sun_time is set to "never", i.e. 2 days. When
the sun is below the horizon, then the sun_time will always be at
least the time until next sun-rise.

While the ``sun_dist`` is an important indicator of whether the
instrument is currently in immediate danger, the ``sun_time`` is
helpful with looking forward and avoiding positions that will soon be
dangerous.
``sun_down``
A boolean indicating whether the sun is below the horizon
elevation (which is a parameter of the instrument).


While the ``sun_dist`` and ``sun_down`` are important indicators of
whether the instrument is currently in immediate danger, the
``sun_time`` is useful for looking forward in order to avoid positions
that will soon be dangerous.

The user-defined policy for Sun Safety is captured in the following
settings:
Expand Down Expand Up @@ -74,6 +86,7 @@
"""

import datetime
import functools
import math
import time

Expand All @@ -91,6 +104,7 @@

HOUR = 3600
DAY = 86400
SIDEREAL_DAY = 86164.0905
NO_TIME = DAY * 2

#: Default policy to apply when evaluating Sun-safety and planning
Expand Down Expand Up @@ -122,7 +136,7 @@ class SunTracker:
If not None, pass an so3g.proj.EarthlySite or compatible.
map_res (float, deg): resolution to use for the Sun Safety Map.
sun_time_shift (float, seconds): For debugging and testing,
compute the Sun's position as though it were this manys
compute the Sun's position as though it were this many
seconds in the future. If None or zero, this feature is
disabled.
fake_now (float, seconds): For debugging and testing, replace
Expand Down Expand Up @@ -174,9 +188,26 @@ def _now(self):

def _sun(self, t):
self._site.date = \
datetime.datetime.utcfromtimestamp(t + self.sun_time_shift)
datetime.datetime.utcfromtimestamp(t)
return ephem.Sun(self._site)

@staticmethod
def _horizon_branch(az=0, el=0):
"""Given telescope boresight (el, az), which could be vectors,
return (az_can, el_can, inverted), where el_can is in the
branch most useful for Sun avoidance assessment, [-90, 90],
and az_can is adjusted such that (az, el) and (az_can, el_can)
correspond to same on-sky point. For such points where the
rebranching also implies an inversion of the focal plane,
inverted is True.

"""
q = quat.rotation_lonlat(-az * coords.DEG, el * coords.DEG)
lon, lat, phi = quat.decompose_lonlat(q)
inverted = np.zeros(lon.shape, bool)
inverted[abs(phi) > .001 * coords.DEG] = True
return -lon / coords.DEG, lat / coords.DEG, inverted

def reset(self, base_time=None):
"""Compute and store the Sun Safety Map for a specific
timestamp.
Expand All @@ -190,53 +221,70 @@ def reset(self, base_time=None):
if base_time is None:
base_time = self._now()

# Identify zenith (ra, dec) at base_time.
Qz = coords.CelestialSightLine.naive_az_el(
base_time, 180. * DEG, 90. * DEG).Q
ra_z, dec_z, _ = quat.decompose_lonlat(Qz)

# Map extends from dec -80 to +80.
shape, wcs = enmap.band_geometry(
dec_cut=80 * DEG, res=self.res, proj='car')

# The map of sun time deltas
sun_times = enmap.zeros(shape, wcs=wcs) - 1
# Map where each pixel is distance to the Sun.
sun_dist = enmap.zeros(shape, wcs=wcs) - 1

# Map of time-to-sun-danger, when sun is above horizon.
sun_times_up = sun_dist.copy()

# Map of time-to-sun-danger, when sun is below horizon.
sun_times_dn = sun_dist.copy()

# Quaternion rotation for each point in the map.
dec, ra = sun_times.posmap()
dec, ra = sun_dist.posmap()
map_q = quat.rotation_lonlat(ra.ravel(), dec.ravel())

v = self._sun(base_time)
# Look up the sun position at our shifted time; but also shift
# explicitly in RA for earth rotation.
v = self._sun(base_time + self.sun_time_shift)
sun_dec = v.dec
sun_ra = v.ra - self.sun_time_shift * (2 * np.pi / SIDEREAL_DAY)
del v

# Get the map of angular distance to the Sun.
qsun = quat.rotation_lonlat(v.ra, v.dec)
sun_dist[:] = (quat.decompose_iso(~qsun * map_q)[0]
.reshape(sun_dist.shape) / coords.DEG)
# Compute the map of angular distance to the Sun, at
# base_time.
qsun = quat.rotation_lonlat(sun_ra, sun_dec)
qoff = ~qsun * map_q
sun_dist[:] = (quat.decompose_iso(~qoff)[0]
.reshape(sun_dist.shape) / DEG)

# Get the map where each pixel says the time delay between
# base_time and when the time when the sky coordinate will be
# in the Sun mask.
# For the sun_times_* maps, each pixel will give the time
# delay between base_time and the time when the sky coordinate
# will be inside the Sun exclusion mask.
dt = -ra[0] * DAY / (2 * np.pi)
qsun = quat.rotation_lonlat(v.ra, v.dec)
qoff = ~qsun * map_q
r = quat.decompose_iso(qoff)[0].reshape(sun_times.shape) / DEG
sun_times[r <= self.policy['exclusion_radius']] = 0.
for g in sun_times:
if (g < 0).all():

# For sun_times_up (sun above horizon), the region around the
# sun is bad right now.
sun_times_up[sun_dist <= self.policy['exclusion_radius']] = 0.

# Loop over rows of the sun_times_* maps.
for g_up, g_dn in zip(sun_times_up, sun_times_dn):
if (g_up < 0).all():
# Sun mask does not touch this declination.
continue
# Identify pixel on the right of the masked region.
flips = ((g == 0) * np.hstack((g[:-1] != g[1:], g[-1] != g[0]))).nonzero()[0]
flips = ((g_up == 0)
* np.hstack((g_up[:-1] != g_up[1:],
g_up[-1] != g_up[0]))).nonzero()[0]
dt0 = dt[flips[0]]
_dt = (dt - dt0) % DAY
g[g < 0] = _dt[g < 0]
# For sun_times_up, fill only pixels outside sun mask.
g_up[g_up < 0] = _dt[g_up < 0]
# For sun_times_dn, fill all pixels.
g_dn[:] = _dt[:]

# Fill in remaining -1 with NO_TIME.
sun_times[sun_times < 0] = NO_TIME
sun_times_up[sun_times_up < 0] = NO_TIME
sun_times_dn[sun_times_dn < 0] = NO_TIME

# Store the sun_times map and stuff.
self.base_time = base_time
self.sun_times = sun_times
self.sun_times_up = sun_times_up
self.sun_times_dn = sun_times_dn
self.sun_dist = sun_dist
self.map_q = map_q

Expand All @@ -261,20 +309,21 @@ def _azel_pix(self, az, el, dt=0, round=True, segments=False):
qt = coords.CelestialSightLine.naive_az_el(
self.base_time + dt, az * DEG, el * DEG).Q
ra, dec, _ = quat.decompose_lonlat(qt)
pix_ji = self.sun_times.sky2pix((dec, ra))
src_map = self.sun_dist # Only used for coordinate ops.
pix_ji = src_map.sky2pix((dec, ra))
if round:
pix_ji = pix_ji.round().astype(int)
# Handle out of bounds as follows:
# - RA indices are mod-ed into range.
# - dec indices are clamped to the map edge.
j, i = pix_ji
j[j < 0] = 0
j[j >= self.sun_times.shape[-2]] = self.sun_times.shape[-2] - 1
i[:] = i % self.sun_times.shape[-1]
j[j >= src_map.shape[-2]] = src_map.shape[-2] - 1
i[:] = i % src_map.shape[-1]

if segments:
jumps = ((abs(np.diff(pix_ji[0])) > self.sun_times.shape[-2] / 2)
+ (abs(np.diff(pix_ji[1])) > self.sun_times.shape[-1] / 2))
jumps = ((abs(np.diff(pix_ji[0])) > src_map.shape[-2] / 2)
+ (abs(np.diff(pix_ji[1])) > src_map.shape[-1] / 2))
jump = jumps.nonzero()[0]
starts = np.hstack((0, jump + 1))
stops = np.hstack((jump + 1, len(pix_ji[0])))
Expand Down Expand Up @@ -304,13 +353,26 @@ def check_trajectory(self, az, el, t=None, raw=False):
"""
if t is None:
t = self._now()
az, el = np.asarray(az), np.asarray(el)
j, i = self._azel_pix(az, el, dt=t - self.base_time)
sun_delta = self.sun_times[j, i]
sun_delta = self.sun_times_up[j, i]
sun_dists = self.sun_dist[j, i]

# If sun is below horizon, rail sun_dist to 180 deg.
if self.get_sun_pos(t=t)['sun_azel'][1] < self.policy['el_horizon']:
sun_dists[:] = 180.
# If the sun is below the horizon, sun times are modified.
az_sun, el_sun = self.get_sun_pos(t=t)['sun_azel']
if el_sun < self.policy['el_horizon']:
if (az_sun % 360.) > 180:
# The setting problem:
sun_delta = self.sun_times_dn[j, i]
else:
# The rising problem:
dt_rise = self._next_rise_time(t) - t
sun_delta[sun_delta < dt_rise] = dt_rise

# Positions below the modified horizon are always safe.
safe_el = self.policy['el_horizon'] - self.policy['exclusion_radius']
_, el_can, _ = self._horizon_branch(az, el)
sun_delta[el_can < safe_el] = NO_TIME

if raw:
return sun_delta, sun_dists
Expand All @@ -332,26 +394,57 @@ def get_sun_pos(self, az=None, el=None, t=None):
"""
if t is None:
t = self._now()
v = self._sun(t)
qsun = quat.rotation_lonlat(v.ra, v.dec)
eff_t = t + self.sun_time_shift

qzen = coords.CelestialSightLine.naive_az_el(t, 0, np.pi / 2).Q
neg_zen_az, zen_el, _ = quat.decompose_lonlat(~qzen * qsun)
v = self._sun(eff_t)
qsun = quat.rotation_lonlat(v.ra, v.dec)
qzen = coords.CelestialSightLine.naive_az_el(eff_t, 0, np.pi / 2).Q
neg_sun_az, sun_el, _ = quat.decompose_lonlat(~qzen * qsun)

results = {
'sun_radec': (v.ra / DEG, v.dec / DEG),
'sun_azel': ((-neg_zen_az / DEG) % 360., zen_el / DEG),
'sun_azel': ((-neg_sun_az / DEG) % 360., sun_el / DEG),
'sun_down': sun_el / DEG < self.policy['el_horizon'],
}
if self.sun_time_shift != 0:
results['WARNING'] = 'Fake Sun Position is in use!'

if az is not None:
qtel = coords.CelestialSightLine.naive_az_el(
t, az * DEG, el * DEG).Q
eff_t, az * DEG, el * DEG).Q
r = quat.decompose_iso(~qtel * qsun)[0]
results['sun_dist'] = r / DEG
_, el_can, _ = self._horizon_branch(az, el)
results['platform_down'] = \
el_can < (self.policy['el_horizon'] - self.policy['exclusion_radius'])
return results

def _next_rise_time(self, t):
"""Compute the smallest time, greater than or equal to t, at
which the Sun will be above the el_horizon. Accurate to 1s or
so.

"""
# Since the helper is cached, include anything in the args
# that could affect the computed value.
return self._next_rise_time_cache_helper(t, self.sun_time_shift)

@functools.lru_cache
def _next_rise_time_cache_helper(self, t, *args):
horizon = self.policy['el_horizon']
el0 = self.get_sun_pos(t=t)['sun_azel'][1]
if el0 > horizon:
return t
step = 3600
while abs(step) > 1.:
t += step
el = self.get_sun_pos(t=t)['sun_azel'][1]
if el > horizon:
step = -abs(step) / 2
else:
step = abs(step)
return t

def show_map(self, axes=None, show=True):
"""Plot the Sun Safety Map and Sun Distance Map on the provided axes
(a list)."""
Expand All @@ -365,7 +458,7 @@ def show_map(self, axes=None, show=True):
for axi, ax in enumerate(axes):
if axi == 0:
# Sun safe time
x = self.sun_times / HOUR
x = self.sun_times_up / HOUR
x[x == NO_TIME] = np.nan
title = 'Sun safe time (hours)'
elif axi == 1:
Expand Down Expand Up @@ -503,9 +596,15 @@ def find_escape_paths(self, az0, el0, t=None,
# Clip el0 into the allowed range.
el0 = np.clip(el0, self.policy['min_el'], self.policy['max_el'])

# Preference is to not change altitude; but allow for lowering.
n_els = math.ceil(el0 - self.policy['min_el']) + 1
els = np.linspace(el0, self.policy['min_el'], n_els)
# Preference is to not change altitude; but allow for lowering
# (in absolute terms) -- there is "more sky" when you move
# away from zenith.
if el0 <= 90:
n_el = math.ceil(el0 - self.policy['min_el']) + 1
els = np.linspace(el0, self.policy['min_el'], n_el)
else:
n_el = math.ceil(self.policy['max_el'] - el0) + 1
els = np.linspace(el0, self.policy['max_el'], n_el)

path = None
for el1 in els:
Expand Down
Loading