Skip to content
Merged
2 changes: 1 addition & 1 deletion examples/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2021 United States Government as represented by the Administrator of the
# National Aeronautics and Space Administration. All Rights Reserved.

__all__ = ['sim', 'sim_valve', 'sim_pump', 'sim_battery_eol', 'model_gen', 'benchmarking', 'new_model', 'sensitivity', 'noise', 'future_loading', 'param_est', 'derived_params', 'state_limits', 'dynamic_step_size']
__all__ = ['sim', 'sim_valve', 'sim_pump', 'sim_battery_eol', 'model_gen', 'benchmarking', 'new_model', 'sensitivity', 'noise', 'future_loading', 'param_est', 'derived_params', 'state_limits', 'dynamic_step_size', 'vectorized']
36 changes: 36 additions & 0 deletions examples/vectorized.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright © 2021 United States Government as represented by the Administrator of the
# National Aeronautics and Space Administration. All Rights Reserved.

"""
Example using simulate_to_threshold with vectorized states. In this example we are using the thrown_object model to simulate multiple thrown objects
"""

from prog_models.models.thrown_object import ThrownObject
from numpy import array, all

def run_example():
# Step 1: Setup object
m = ThrownObject()
def future_load(t, x=None):
return {} # No load for thrown objects

# Step 2: Setup vectorized initial state
# For this example we are saying there are 4 throwers of various strengths and heights
first_state = {
'x': array([1.75, 1.8, 1.85, 1.9]),
'v': array([35, 39, 22, 47])
}

# Step 3: Simulate to threshold
# Here we are simulating till impact using the first state defined above
(times, inputs, states, outputs, event_states) = m.simulate_to_threshold(future_load, {'x': first_state['x']}, x = first_state, threshold_keys=['impact'], print = True, dt=0.1, save_freq=2)

# Now lets do the same thing but only stop when all hit the ground
def thresholds_met_eqn(thresholds_met):
return all(thresholds_met['impact']) # Stop when all impact ground

(times, inputs, states, outputs, event_states) = m.simulate_to_threshold(future_load, {'x': first_state['x']}, x = first_state, thresholds_met_eqn=thresholds_met_eqn, print = True, dt=0.1, save_freq=2)

# This allows the module to be executed directly
if __name__=='__main__':
run_example()
5 changes: 5 additions & 0 deletions prog_model_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
# 4. Uncomment either dx or next_state function. dx for continuous models, and next_state for discrete
# 5. Implement logic of model in each method

# Note: To preserve vectorization use numpy math function (e.g., maximum, minimum, sign, sqrt, etc.) instead of non-vectorized functions (max, min, etc.)

from prog_models import PrognosticsModel
import math

Expand All @@ -27,6 +29,9 @@ class ProgModelTemplate(PrognosticsModel):
Template for Prognostics Model
"""

# V Uncomment Below if the class is vectorized (i.e., if it can accept input to all functions as arrays) V
# is_vectorized = True

# REPLACE THE FOLLOWING LIST WITH EVENTS BEING PREDICTED
events = [
'Example Event'
Expand Down
5 changes: 3 additions & 2 deletions src/prog_models/models/battery_circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from .. import prognostics_model

from math import exp, inf
from math import inf
from numpy import exp, minimum


class BatteryCircuit(prognostics_model.PrognosticsModel):
Expand Down Expand Up @@ -152,7 +153,7 @@ def event_state(self, x):
voltage_EOD = (z['v'] - self.parameters['VEOD']) / \
self.parameters['VDropoff']
return {
'EOD': min(charge_EOD, voltage_EOD)
'EOD': minimum(charge_EOD, voltage_EOD)
}

def output(self, x):
Expand Down
14 changes: 9 additions & 5 deletions src/prog_models/models/battery_electrochem.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

from .. import PrognosticsModel

from math import asinh, log, inf
from math import inf
from copy import deepcopy
from numpy import arcsinh, log

# Constants of nature
R = 8.3144621 # universal gas constant, J/K/mol
Expand Down Expand Up @@ -165,6 +166,7 @@ class BatteryElectroChemEOD(PrognosticsModel):
inputs = ['i']
states = ['tb', 'Vo', 'Vsn', 'Vsp', 'qnB', 'qnS', 'qpB', 'qpS']
outputs = ['t', 'v']
is_vectorized = True

default_parameters = { # Set to defaults
'qMobile': 7600,
Expand Down Expand Up @@ -241,13 +243,15 @@ def dx(self, x, u):
xnS = x['qnS']/params['qSMax']

qdotDiffusionBSn = (CnBulk-CnSurface)/params['tDiffusion']
qnBdot = -qdotDiffusionBSn
qnSdot = qdotDiffusionBSn - u["i"]

Jn = u['i']/params['Sn']
Jn0 = params['kn']*((1-xnS)*xnS)**params['alpha']

v_part = R_F*x['tb']/params['alpha']

VsnNominal = v_part*asinh(Jn/(Jn0 + Jn0))
VsnNominal = v_part*arcsinh(Jn/(Jn0 + Jn0))
Vsndot = (VsnNominal-x['Vsn'])/params['tsn']

# Positive Surface
Expand All @@ -262,7 +266,7 @@ def dx(self, x, u):
Jp = u['i']/params['Sp']
Jp0 = params['kp']*((1-xpS)*xpS)**params['alpha']

VspNominal = v_part*asinh(Jp/(Jp0+Jp0))
VspNominal = v_part*arcsinh(Jp/(Jp0+Jp0))
Vspdot = (VspNominal-x['Vsp'])/params['tsp']

# Combined
Expand All @@ -279,8 +283,8 @@ def dx(self, x, u):
'Vsn': Vsndot,
'Vsp': Vspdot,
'tb': Tbdot,
'qnB': -qdotDiffusionBSn,
'qnS': qdotDiffusionBSn - u['i'],
'qnB': qnBdot,
'qnS': qnSdot,
'qpB': qpBdot,
'qpS': qpSdot
}
Expand Down
30 changes: 18 additions & 12 deletions src/prog_models/models/centrifugal_pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import math
from math import inf
from copy import deepcopy
from numpy import array, maximum, minimum, ndarray, sqrt, sign


class CentrifugalPumpBase(prognostics_model.PrognosticsModel):
Expand Down Expand Up @@ -81,8 +82,9 @@ class CentrifugalPumpBase(prognostics_model.PrognosticsModel):
"""
events = ['ImpellerWearFailure', 'PumpOilOverheat', 'RadialBearingOverheat', 'ThrustBearingOverheat']
inputs = ['Tamb', 'V', 'pdisch', 'psuc', 'wsync']
states = ['A', 'Q', 'To', 'Tr', 'Tt', 'rRadial', 'rThrust', 'w', 'QLeak']
outputs = ['Qout', 'To', 'Tr', 'Tt', 'w']
states = ['w', 'Q', 'Tt', 'Tr', 'To', 'A', 'rRadial', 'rThrust', 'QLeak']
outputs = ['w', 'Qout', 'Tt', 'Tr', 'To']
is_vectorized = True

default_parameters = { # Set to defaults
# Environmental parameters
Expand Down Expand Up @@ -179,18 +181,22 @@ def next_state(self, x, u, dt):
rRadialdot = params['wRadial']*x['rRadial']*x['w']*x['w']
rThrustdot = params['wThrust']*x['rThrust']*x['w']*x['w']
friction = (params['r']+x['rThrust']+x['rRadial'])*x['w']
QLeak = math.copysign(params['cLeak']*params['ALeak']*math.sqrt(abs(u['psuc']-u['pdisch'])), \
u['psuc']-u['pdisch'])
if type(x['A']) == ndarray:
QLeak = array([params['cLeak']*params['ALeak'] *
sqrt(abs(u['psuc']-u['pdisch'])) * sign(u['psuc']-u['pdisch'])]*len(x['A']))
else:
QLeak = params['cLeak']*params['ALeak'] * \
sqrt(abs(u['psuc']-u['pdisch'])) * sign(u['psuc']-u['pdisch'])
Trdot = 1/params['mcRadial'] * (x['rRadial']*x['w']*x['w'] - params['HRadial1']*(x['Tr']-u['Tamb']) - params['HRadial2']*(x['Tr']-x['To']))
slipn = (u['wsync']-x['w'])/(u['wsync'])
ppump = x['A']*x['w']*x['w'] + params['b']*x['w']*x['Q']
Qout = max(0,x['Q']-QLeak)
slip = max(-1,(min(1,slipn)))
Qout = maximum(0,x['Q']-x['QLeak'])
slip = maximum(-1,(minimum(1,slipn)))
deltaP = ppump+u['psuc']-u['pdisch']
Te = params['n']*params['p']*params['R2']/(slip*(u['wsync']+0.00001)) * u['V']**2 \
/((params['R1']+params['R2']/slip)**2+(u['wsync']*params['L1'])**2)
backTorque = -params['a2']*Qout**2 + params['a1']*x['w']*Qout + params['a0']*x['w']**2
Qo = math.copysign(params['c']*math.sqrt(abs(deltaP)), deltaP)
Qo = params['c']*sqrt(abs(deltaP)) * sign(deltaP)
wdot = (Te-friction-backTorque)/params['I']
Qdot = 1/params['FluidI']*(Qo-x['Q'])

Expand All @@ -207,14 +213,14 @@ def next_state(self, x, u, dt):
}

def output(self, x):
Qout = max(0,x['Q']-x['QLeak'])
Qout = maximum(0,x['Q']-x['QLeak'])

return {
'w': x['w'],
'Qout': Qout,
'To': x['To'],
'Tr': x['Tr'],
'Tt': x['Tt'],
'w': x['w']
'Tt': x['Tt'],
'Tr': x['Tr'],
'To': x['To']
}

def event_state(self, x):
Expand Down
56 changes: 44 additions & 12 deletions src/prog_models/models/pneumatic_valve.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,25 @@
# National Aeronautics and Space Administration. All Rights Reserved.

from .. import prognostics_model
from math import sqrt, copysign, inf
from math import inf
from copy import deepcopy
from numpy import sqrt, sign, maximum, minimum, isscalar, array, any, shape

def calc_x(x, forces, Ls, new_x):
lower_wall = (x==0 and forces<0) or (new_x<0)
upper_wall = (x==Ls and forces>0) or (new_x>Ls)
if lower_wall:
return 0
if upper_wall:
return Ls
return new_x

def calc_v(x, v, dv, forces, Ls, new_x):
lower_wall = (x==0 and forces<0) or (new_x<0)
upper_wall = (x==Ls and forces>0) or (new_x>Ls)
if lower_wall or upper_wall:
return 0
return v + dv


class PneumaticValveBase(prognostics_model.PrognosticsModel):
Expand Down Expand Up @@ -104,6 +121,7 @@ class PneumaticValveBase(prognostics_model.PrognosticsModel):
"pDiff" # pL-pR
]
outputs = ["Q", "iB", "iT", "pB", "pT", "x"]
is_vectorized = True
default_parameters = { # Set to defaults
# Environmental Parameters
'R': 8.314, # Universal Gas Constant
Expand Down Expand Up @@ -182,6 +200,20 @@ def initialize(self, u, z = None):
return x0

def gas_flow(self, pIn, pOut, C, A):
# Step 1: If array- run for each element
# Note: this is so complicated because it is run multiple times with mixtures of scalars and arrays
inputs = array([pIn, pOut, C, A])
if any([not isscalar(i) for i in inputs]):
# Handle case where one or more is array
size = [shape(i) for i in inputs]
size = max([i[0] if i else 0 for i in size]) # Size of array

# Create Iterable Elements for scalars
iter_inputs = [[i] * size if isscalar(i) else i for i in inputs]

# Run each element through function
return array([self.gas_flow(a, b, c, d) for a, b, c, d in zip(*iter_inputs)])

k = self.parameters['gas_gamma']
T = self.parameters['gas_temp']
Z = self.parameters['gas_z']
Expand Down Expand Up @@ -225,16 +257,16 @@ def next_state(self, x, u, dt):
vdot = pistonForces/params['m']

new_x = x['x']+x['v']*dt
if (x['x']==0 and pistonForces<0) or (new_x<0):
vel = 0
pos = 0
elif (x['x']==params['Ls'] and pistonForces>0) or (new_x>params['Ls']):
vel = 0
pos = params['Ls']

pos = minimum(maximum(new_x, 0.0), params['Ls'])

if isscalar(pistonForces):
vel = calc_v(x['x'], x['v'], vdot*dt, pistonForces, params['Ls'], new_x)
pos = calc_x(x['x'], pistonForces, params['Ls'], new_x)
else:
# moving
vel = x['v'] + vdot*dt
pos = new_x
# If array- run for each element
vel = [calc_v(xi, vi, vdot_i*dt, force, params['Ls'], new_x_i) for xi, vi, vdot_i, force, new_x_i in zip(x['x'], x['v'], vdot, pistonForces, new_x)]
pos = [calc_x(xi, force, params['Ls'], new_x_i) for xi, force, new_x_i in zip(x['x'], pistonForces, new_x)]

return {
'x': pos,
Expand All @@ -253,10 +285,10 @@ def output(self, x):
params = self.parameters # Optimization
indicatorTopm = (x['x'] >= params['Ls']-params['indicatorTol'])
indicatorBotm = (x['x'] <= params['indicatorTol'])
maxFlow = params['Cv']*params['Av']*copysign(sqrt(2/params['rhoL']*abs(x['pDiff'])),x['pDiff'])
maxFlow = params['Cv']*params['Av']*sqrt(2/params['rhoL']*abs(x['pDiff'])) * sign(x['pDiff'])
volumeBot = params['Vbot0'] + params['Ap']*x['x']
volumeTop = params['Vtop0'] + params['Ap']*(params['Ls']-x['x'])
trueFlow = maxFlow * max(0,x['x'])/params['Ls']
trueFlow = maxFlow * maximum(0,x['x'])/params['Ls']
pressureTop = x['mTop']*params['R']*params['gas_temp']/params['gas_mass']/volumeTop
pressureBot = x['mBot']*params['R']*params['gas_temp']/params['gas_mass']/volumeBot

Expand Down
13 changes: 9 additions & 4 deletions src/prog_models/models/thrown_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# National Aeronautics and Space Administration. All Rights Reserved.

from .. import PrognosticsModel
from numpy import maximum


class ThrownObject(PrognosticsModel):
Expand All @@ -21,6 +22,7 @@ class ThrownObject(PrognosticsModel):
'falling', # Event- object is falling
'impact' # Event- object has impacted ground
]
is_vectorized = True

# The Default parameters. Overwritten by passing parameters dictionary into constructor
default_parameters = {
Expand All @@ -30,8 +32,11 @@ class ThrownObject(PrognosticsModel):
'process_noise': 0.0 # amount of noise in each step
}

def initialize(self, u, z):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.max_x = 0.0

def initialize(self, u, z):
return {
'x': self.parameters['thrower_height'], # Thrown, so initial altitude is height of thrower
'v': self.parameters['throwing_speed'] # Velocity at which the ball is thrown - this guy is a professional baseball pitcher
Expand All @@ -53,8 +58,8 @@ def threshold_met(self, x):
}

def event_state(self, x):
self.max_x = max(self.max_x, x['x']) # Maximum altitude
self.max_x = maximum(self.max_x, x['x']) # Maximum altitude
return {
'falling': max(x['v']/self.parameters['throwing_speed'],0), # Throwing speed is max speed
'impact': max(x['x']/self.max_x,0) # 1 until falling begins, then it's fraction of height
'falling': maximum(x['v']/self.parameters['throwing_speed'],0), # Throwing speed is max speed
'impact': maximum(x['x']/self.max_x,0) # 1 until falling begins, then it's fraction of height
}
Loading