Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2021 United States Government as represented by the Administrator of the
# National Aeronautics and Space Administration. All Rights Reserved.

__all__ = ['sim', 'sim_valve', 'sim_pump', 'sim_battery_eol', 'model_gen', 'benchmarking', 'new_model', 'sensitivity', 'noise', 'future_loading', 'param_est', 'derived_params', 'state_limits', 'dynamic_step_size']
__all__ = ['sim', 'sim_valve', 'sim_pump', 'sim_battery_eol', 'model_gen', 'benchmarking', 'new_model', 'sensitivity', 'noise', 'future_loading', 'param_est', 'derived_params', 'state_limits', 'dynamic_step_size', 'vectorized']
36 changes: 36 additions & 0 deletions examples/vectorized.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright © 2021 United States Government as represented by the Administrator of the
# National Aeronautics and Space Administration. All Rights Reserved.

"""
Example using simulate_to_threshold with vectorized states. In this example we are using the thrown_object model to simulate multiple thrown objects
"""

from prog_models.models.thrown_object import ThrownObject
from numpy import array, all

def run_example():
# Step 1: Setup object
m = ThrownObject()
def future_load(t, x=None):
return {} # No load for thrown objects

# Step 2: Setup vectorized initial state
# For this example we are saying there are 4 throwers of various strengths and heights
first_state = {
'x': array([1.75, 1.8, 1.85, 1.9]),
'v': array([35, 39, 22, 47])
}

# Step 3: Simulate to threshold
# Here we are simulating till impact using the first state defined above
(times, inputs, states, outputs, event_states) = m.simulate_to_threshold(future_load, {'x': first_state['x']}, x = first_state, threshold_keys=['impact'], print = True, dt=0.1, save_freq=2)

# Now lets do the same thing but only stop when all hit the ground
def thresholds_met_eqn(thresholds_met):
return all(thresholds_met['impact']) # Stop when all impact ground

(times, inputs, states, outputs, event_states) = m.simulate_to_threshold(future_load, {'x': first_state['x']}, x = first_state, thresholds_met_eqn=thresholds_met_eqn, print = True, dt=0.1, save_freq=2)

# This allows the module to be executed directly
if __name__=='__main__':
run_example()
5 changes: 5 additions & 0 deletions prog_model_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
# 4. Uncomment either dx or next_state function. dx for continuous models, and next_state for discrete
# 5. Implement logic of model in each method

# Note: To preserve vectorization use numpy math function (e.g., maximum, minimum, sign, sqrt, etc.) instead of non-vectorized functions (max, min, etc.)

from prog_models import PrognosticsModel
import math

Expand All @@ -27,6 +29,9 @@ class ProgModelTemplate(PrognosticsModel):
Template for Prognostics Model
"""

# V Uncomment Below if the class is vectorized (i.e., if it can accept input to all functions as arrays) V
# is_vectorized = True

# REPLACE THE FOLLOWING LIST WITH EVENTS BEING PREDICTED
events = [
'Example Event'
Expand Down
5 changes: 3 additions & 2 deletions src/prog_models/models/battery_circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from .. import prognostics_model

from math import exp, inf
from math import inf
from numpy import exp, minimum


class BatteryCircuit(prognostics_model.PrognosticsModel):
Expand Down Expand Up @@ -152,7 +153,7 @@ def event_state(self, x):
voltage_EOD = (z['v'] - self.parameters['VEOD']) / \
self.parameters['VDropoff']
return {
'EOD': min(charge_EOD, voltage_EOD)
'EOD': minimum(charge_EOD, voltage_EOD)
}

def output(self, x):
Expand Down
29 changes: 15 additions & 14 deletions src/prog_models/models/battery_electrochem.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from math import inf
from copy import deepcopy
from numpy import maximum, minimum, sign, sqrt, array, ndarray, arcsinh, log, atleast_1d
from numpy import arcsinh, log

# Constants of nature
R = 8.3144621 # universal gas constant, J/K/mol
Expand Down Expand Up @@ -166,6 +166,7 @@ class BatteryElectroChemEOD(PrognosticsModel):
inputs = ['i']
states = ['tb', 'Vo', 'Vsn', 'Vsp', 'qnB', 'qnS', 'qpB', 'qpS']
outputs = ['t', 'v']
is_vectorized = True

default_parameters = { # Set to defaults
'qMobile': 7600,
Expand Down Expand Up @@ -278,14 +279,14 @@ def dx(self, x, u):
Tbdot = voltage_eta*u['i']/mC + (params['x0']['tb'] - x['tb'])/tau # Newman

return {
'Vo': atleast_1d(Vodot),
'Vsn': atleast_1d(Vsndot),
'Vsp': atleast_1d(Vspdot),
'tb': atleast_1d(Tbdot),
'qnB': atleast_1d(qnBdot),
'qnS': atleast_1d(qnSdot),
'qpB': atleast_1d(qpBdot),
'qpS': atleast_1d(qpSdot)
'Vo': Vodot,
'Vsn': Vsndot,
'Vsp': Vspdot,
'tb': Tbdot,
'qnB': qnBdot,
'qnS': qnSdot,
'qpB': qpBdot,
'qpS': qpSdot
}

def event_state(self, x):
Expand Down Expand Up @@ -348,8 +349,8 @@ def output(self, x):
Vep = params['U0p'] + R*x['tb']/F*log((1-xpS)/xpS) + sum(VepParts)

return {
't': atleast_1d(x['tb'] - 273.15),
'v': atleast_1d(Vep - Ven - x['Vo'] - x['Vsn'] - x['Vsp'])
't': x['tb'] - 273.15,
'v': Vep - Ven - x['Vo'] - x['Vsn'] - x['Vsp']
}

def threshold_met(self, x):
Expand Down Expand Up @@ -425,9 +426,9 @@ def dx(self, x, u):
params = self.parameters

return {
'qMax': atleast_1d(params['wq'] * abs(u['i'])),
'Ro': atleast_1d(params['wr'] * abs(u['i'])),
'D': atleast_1d(params['wd'] * abs(u['i']))
'qMax': params['wq'] * abs(u['i']),
'Ro': params['wr'] * abs(u['i']),
'D': params['wd'] * abs(u['i'])
}

def event_state(self, x):
Expand Down
51 changes: 26 additions & 25 deletions src/prog_models/models/centrifugal_pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import math
from math import inf
from copy import deepcopy
from numpy import maximum, minimum, sign, sqrt, array, ndarray, atleast_1d
from numpy import array, maximum, minimum, ndarray, sqrt, sign


class CentrifugalPumpBase(prognostics_model.PrognosticsModel):
Expand Down Expand Up @@ -84,6 +84,7 @@ class CentrifugalPumpBase(prognostics_model.PrognosticsModel):
inputs = ['Tamb', 'V', 'pdisch', 'psuc', 'wsync']
states = ['w', 'Q', 'Tt', 'Tr', 'To', 'A', 'rRadial', 'rThrust', 'QLeak']
outputs = ['w', 'Qout', 'Tt', 'Tr', 'To']
is_vectorized = True

default_parameters = { # Set to defaults
# Environmental parameters
Expand Down Expand Up @@ -143,14 +144,14 @@ class CentrifugalPumpBase(prognostics_model.PrognosticsModel):

# Initial state
'x0': {
'w': atleast_1d(376.991118431), # 3600 rpm (rad/sec)
'Q': atleast_1d(0),
'Tt': atleast_1d(290),
'Tr': atleast_1d(290),
'To': atleast_1d(290),
'A': atleast_1d(12.7084),
'rThrust': atleast_1d(1.4e-6),
'rRadial': atleast_1d(1.8e-6)
'w': 376.991118431, # 3600 rpm (rad/sec)
'Q': 0,
'Tt': 290,
'Tr': 290,
'To': 290,
'A': 12.7084,
'rThrust': 1.4e-6,
'rRadial': 1.8e-6
}
}

Expand All @@ -165,9 +166,9 @@ class CentrifugalPumpBase(prognostics_model.PrognosticsModel):

def initialize(self, u, z = None):
x0 = self.parameters['x0']
x0['QLeak'] = atleast_1d(math.copysign(\
x0['QLeak'] = math.copysign(\
self.parameters['cLeak']*self.parameters['ALeak']*\
math.sqrt(abs(u['psuc']-u['pdisch'])), u['psuc']-u['pdisch']))
math.sqrt(abs(u['psuc']-u['pdisch'])), u['psuc']-u['pdisch'])
return x0

def next_state(self, x, u, dt):
Expand Down Expand Up @@ -200,26 +201,26 @@ def next_state(self, x, u, dt):
Qdot = 1/params['FluidI']*(Qo-x['Q'])

return {
'w': atleast_1d(x['w'] + wdot * dt),
'Q': atleast_1d(x['Q'] + Qdot * dt),
'Tt': atleast_1d(x['Tt'] + Ttdot * dt),
'Tr': atleast_1d(x['Tr'] + Trdot * dt),
'To': atleast_1d(x['To'] + Todot * dt),
'A': atleast_1d(x['A'] + Adot * dt),
'rRadial': atleast_1d(x['rRadial'] + rRadialdot * dt),
'rThrust': atleast_1d(x['rThrust'] + rThrustdot * dt),
'QLeak': atleast_1d(QLeak)
'w': x['w'] + wdot * dt,
'Q': x['Q'] + Qdot * dt,
'Tt': x['Tt'] + Ttdot * dt,
'Tr': x['Tr'] + Trdot * dt,
'To': x['To'] + Todot * dt,
'A': x['A'] + Adot * dt,
'rRadial': x['rRadial'] + rRadialdot * dt,
'rThrust': x['rThrust'] + rThrustdot * dt,
'QLeak': QLeak
}

def output(self, x):
Qout = maximum(0,x['Q']-x['QLeak'])

return {
'w': atleast_1d(x['w']),
'Qout': atleast_1d(Qout),
'Tt': atleast_1d(x['Tt']),
'Tr': atleast_1d(x['Tr']),
'To': atleast_1d(x['To'])
'w': x['w'],
'Qout': Qout,
'Tt': x['Tt'],
'Tr': x['Tr'],
'To': x['To']
}

def event_state(self, x):
Expand Down
56 changes: 44 additions & 12 deletions src/prog_models/models/pneumatic_valve.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,25 @@
# National Aeronautics and Space Administration. All Rights Reserved.

from .. import prognostics_model
from math import sqrt, copysign, inf
from math import inf
from copy import deepcopy
from numpy import sqrt, sign, maximum, minimum, isscalar, array, any, shape

def calc_x(x, forces, Ls, new_x):
lower_wall = (x==0 and forces<0) or (new_x<0)
upper_wall = (x==Ls and forces>0) or (new_x>Ls)
if lower_wall:
return 0
if upper_wall:
return Ls
return new_x

def calc_v(x, v, dv, forces, Ls, new_x):
lower_wall = (x==0 and forces<0) or (new_x<0)
upper_wall = (x==Ls and forces>0) or (new_x>Ls)
if lower_wall or upper_wall:
return 0
return v + dv


class PneumaticValveBase(prognostics_model.PrognosticsModel):
Expand Down Expand Up @@ -104,6 +121,7 @@ class PneumaticValveBase(prognostics_model.PrognosticsModel):
"pDiff" # pL-pR
]
outputs = ["Q", "iB", "iT", "pB", "pT", "x"]
is_vectorized = True
default_parameters = { # Set to defaults
# Environmental Parameters
'R': 8.314, # Universal Gas Constant
Expand Down Expand Up @@ -182,6 +200,20 @@ def initialize(self, u, z = None):
return x0

def gas_flow(self, pIn, pOut, C, A):
# Step 1: If array- run for each element
# Note: this is so complicated because it is run multiple times with mixtures of scalars and arrays
inputs = array([pIn, pOut, C, A])
if any([not isscalar(i) for i in inputs]):
# Handle case where one or more is array
size = [shape(i) for i in inputs]
size = max([i[0] if i else 0 for i in size]) # Size of array

# Create Iterable Elements for scalars
iter_inputs = [[i] * size if isscalar(i) else i for i in inputs]

# Run each element through function
return array([self.gas_flow(a, b, c, d) for a, b, c, d in zip(*iter_inputs)])

k = self.parameters['gas_gamma']
T = self.parameters['gas_temp']
Z = self.parameters['gas_z']
Expand Down Expand Up @@ -225,16 +257,16 @@ def next_state(self, x, u, dt):
vdot = pistonForces/params['m']

new_x = x['x']+x['v']*dt
if (x['x']==0 and pistonForces<0) or (new_x<0):
vel = 0
pos = 0
elif (x['x']==params['Ls'] and pistonForces>0) or (new_x>params['Ls']):
vel = 0
pos = params['Ls']

pos = minimum(maximum(new_x, 0.0), params['Ls'])

if isscalar(pistonForces):
vel = calc_v(x['x'], x['v'], vdot*dt, pistonForces, params['Ls'], new_x)
pos = calc_x(x['x'], pistonForces, params['Ls'], new_x)
else:
# moving
vel = x['v'] + vdot*dt
pos = new_x
# If array- run for each element
vel = [calc_v(xi, vi, vdot_i*dt, force, params['Ls'], new_x_i) for xi, vi, vdot_i, force, new_x_i in zip(x['x'], x['v'], vdot, pistonForces, new_x)]
pos = [calc_x(xi, force, params['Ls'], new_x_i) for xi, force, new_x_i in zip(x['x'], pistonForces, new_x)]

return {
'x': pos,
Expand All @@ -253,10 +285,10 @@ def output(self, x):
params = self.parameters # Optimization
indicatorTopm = (x['x'] >= params['Ls']-params['indicatorTol'])
indicatorBotm = (x['x'] <= params['indicatorTol'])
maxFlow = params['Cv']*params['Av']*copysign(sqrt(2/params['rhoL']*abs(x['pDiff'])),x['pDiff'])
maxFlow = params['Cv']*params['Av']*sqrt(2/params['rhoL']*abs(x['pDiff'])) * sign(x['pDiff'])
volumeBot = params['Vbot0'] + params['Ap']*x['x']
volumeTop = params['Vtop0'] + params['Ap']*(params['Ls']-x['x'])
trueFlow = maxFlow * max(0,x['x'])/params['Ls']
trueFlow = maxFlow * maximum(0,x['x'])/params['Ls']
pressureTop = x['mTop']*params['R']*params['gas_temp']/params['gas_mass']/volumeTop
pressureBot = x['mBot']*params['R']*params['gas_temp']/params['gas_mass']/volumeBot

Expand Down
13 changes: 9 additions & 4 deletions src/prog_models/models/thrown_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# National Aeronautics and Space Administration. All Rights Reserved.

from .. import PrognosticsModel
from numpy import maximum


class ThrownObject(PrognosticsModel):
Expand All @@ -21,6 +22,7 @@ class ThrownObject(PrognosticsModel):
'falling', # Event- object is falling
'impact' # Event- object has impacted ground
]
is_vectorized = True

# The Default parameters. Overwritten by passing parameters dictionary into constructor
default_parameters = {
Expand All @@ -30,8 +32,11 @@ class ThrownObject(PrognosticsModel):
'process_noise': 0.0 # amount of noise in each step
}

def initialize(self, u, z):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.max_x = 0.0

def initialize(self, u, z):
return {
'x': self.parameters['thrower_height'], # Thrown, so initial altitude is height of thrower
'v': self.parameters['throwing_speed'] # Velocity at which the ball is thrown - this guy is a professional baseball pitcher
Expand All @@ -53,8 +58,8 @@ def threshold_met(self, x):
}

def event_state(self, x):
self.max_x = max(self.max_x, x['x']) # Maximum altitude
self.max_x = maximum(self.max_x, x['x']) # Maximum altitude
return {
'falling': max(x['v']/self.parameters['throwing_speed'],0), # Throwing speed is max speed
'impact': max(x['x']/self.max_x,0) # 1 until falling begins, then it's fraction of height
'falling': maximum(x['v']/self.parameters['throwing_speed'],0), # Throwing speed is max speed
'impact': maximum(x['x']/self.max_x,0) # 1 until falling begins, then it's fraction of height
}
Loading