diff --git a/code/__init__.py b/code/__init__.py new file mode 100644 index 0000000..f676afb --- /dev/null +++ b/code/__init__.py @@ -0,0 +1,7 @@ +# To make module work + +__author__ = 'Mitchell, FHT' +__date__ = (2017, 8, 20) +__verbose__ = True + +pass \ No newline at end of file diff --git a/code/desktop.ini b/code/desktop.ini new file mode 100644 index 0000000..309dea2 --- /dev/null +++ b/code/desktop.ini @@ -0,0 +1,5 @@ +[.ShellClassInfo] +InfoTip=This folder is shared online. +IconFile=C:\Program Files (x86)\Google\Drive\googledrivesync.exe +IconIndex=16 + \ No newline at end of file diff --git a/code/driver.py b/code/driver.py new file mode 100644 index 0000000..455fc6f --- /dev/null +++ b/code/driver.py @@ -0,0 +1,147 @@ +# driver.py + + +import collections +import numpy as np + +from method import methods +from stepper import Stepper +from problem import Problem +from timers import Timer +from typing import * + +__author__ = 'Mitchell, FHT' +__date__ = (2017, 8, 20) +__verbose__ = True + +def static_stepsize_control_factory(h: float) -> Callable: + def static_stepsize_control(t, y, yd): + return h + return static_stepsize_control + + +valid_methods = {'obr', 'adams'} +valid_problems = {'rtbp', 'shm'} + +class Driver: + def __init__(self, + problem: Problem, + h0: float, + K: int, + L: int = 1, + t0: float = 0., + tf: float = None, + method_name: str = 'obr', + corrector_steps: int = 1, + stepsize_control: Callable = None, + explicit_free_params: Mapping[str, float] = None, + implicit_free_params: Mapping[str, float] = None): + + """ + + :param problem: + :param h0: + :param K: + :param L: + :param method_name: + :param corrector_steps: + :param stepsize_control: + :return: + """ + + + assert isinstance(problem, Problem), repr(problem) + assert h0 > 0, repr(h0) + assert isinstance(K, int) and K >= 1, repr(K) + assert isinstance(L, int) and L >= 1, repr(L) + assert isinstance(method_name, str), repr(method_name) + assert isinstance(corrector_steps, int) and corrector_steps >= 0, repr(corrector_steps) + + + + + self.problem = problem + self.h0 = h0 + self.K = K + self.L = L + self.t0 = t0 + self.tf = tf + self.method_name = method_name.lower().strip() + self.corrector_steps = corrector_steps + self.stepsize_control = stepsize_control if stepsize_control is not None \ + else static_stepsize_control_factory(h0) + + self.explicit_free_params = explicit_free_params or {} + self.implicit_free_params = implicit_free_params or {} + + self.Method = methods()[self.method_name] + self.explicit = self.Method(K, L, 'explicit', self.explicit_free_params) + if corrector_steps > 0: + self.implicit = self.Method(K, L, 'implicit', self.implicit_free_params) + else: + self.implicit = None + + self.stepper = Stepper(self.stepsize_control, self.explicit, self.implicit) + + + def run(self, y0: np.ndarray, *, t0=None, tf=None) -> Tuple[np.ndarray, np.ndarray]: + + if t0 is None: + t0 = self.t0 + if tf is None: + tf = self.tf + assert tf is not None, (tf, self.tf) + + y0 = np.array(y0) + # make sure y0 is an array + if y0.shape == (): + # This allows floats to be entered for 1 D + y0 = y0[np.newaxis] + assert len(y0.shape) == 1 + + y = [y0] + yd = collections.deque([self.problem.vector_derivs(t0, y0)], maxlen=self.K) + + if __verbose__: + print('Running driver...') + + t = t0 + k = 1 + counter = 0 + ts = [t0] + if __verbose__: + total_iters = int((tf - t0)/self.h0) + timer = Timer(5) + + while t < tf: + h1, t, y1 = self.stepper.predict(t, y, yd, k) + for _ in range(self.corrector_steps): + y1d = self.problem.vector_derivs(t, y1) + y2 = self.stepper.correct(yd, y1d, k, h1) + # do anything fancy with y2 and y1 + y1 = y2 + + ts.append(t) + y.append(y1) + yd.append(self.problem.vector_derivs(t, y[-1])) + if k < self.K: # Use maximum number of previous step up to K + k += 1 + counter += 1 + if __verbose__ and timer.check(): + print(f'Time elapsed: {timer()}, Iterations: {counter:,} ' + f'(~{100.*counter/total_iters:.1f}%).') + + if __verbose__: + print('Finished iterating.') + print(f'y has length {len(y):,} and final value') + print(f' yf = [{", ".join(format(x, ".3g") for x in y[-1])}].') + print(f'Total time elapsed: {timer()}.') + print('_'*30, end='\n\n') + + return np.array(ts), np.array(y) + + + def displacement(self, y0: np.array, **kwargs): + + t, y = self.run(y0, **kwargs) + return y[-1] - y[0] \ No newline at end of file diff --git a/code/dsympy.py b/code/dsympy.py new file mode 100644 index 0000000..3a8f780 --- /dev/null +++ b/code/dsympy.py @@ -0,0 +1,195 @@ +# dsympy.py - helper functions for converting differential equations in sympy +# into numeric lambda functions + +import sympy as sp +import collections +from sympy.utilities.lambdify import lambdastr as _lambdastr +from sympy.core.function import AppliedUndef +from typing import * + +__author__ = 'Mitchell, FHT' +__date__ = (2017, 8, 20) +__verbose__ = True + +# needs to be at the top for typing. +class AutoFunc: + """ + Holds the representation of an auto-generated function + """ + + def __init__(self, func, as_str, args, sym_func=None, name=None): + self.func = func + self.as_str = as_str + self.args = self.params = tuple(str(arg) for arg in args) + self.sym_func = sym_func + self.name = name + + def __call__(self, *args, **kwargs): + return self.func(*args, **kwargs) + + def __str__(self): + return self.as_str + + def __repr__(self): + if self.name is None: + return "".format( + 's' if len(self.args) != 1 else '', + ', '.join(self.args) if self.args else () + ) + return f'' + + +def dummify_undefined_functions(expr:sp.Expr, ret_map:bool=False) -> sp.expr: + """ + Used for solving issues with lambdify and differentials. Replaces undefined + functions (eg. f(t), df/dt, dg^2/dxdy) with symbols (named f and df_dt and + df_dxdy for the previous examples respectively). + + By u/PeterE from + http://stackoverflow.com/questions/29920641/lambdify-a-sp-expression-that-contains-a-derivative-of-undefinedfunction + + Issues (u/PeterE): + + * no guard against name-collisions + * perhaps not the best possible name-scheme: df_dxdy for Derivative(f(x,y), x, y) + * it is assumed that all derivatives are of the form: Derivative(s(t), t, ...) + with s(t) being an UndefinedFunction and t a Symbol. I have no idea what will + happen if any argument to Derivative is a more complex expression. I kind of + think/hope that the (automatic) simplification process will reduce any more + complex derivative into an expression consisting of 'basic' derivatives. But + I certainly do not guard against it. + * largely untested (except for my specific use-cases) + """ + + mapping = {} + + # replace all Derivative terms + for der in expr.atoms(sp.Derivative): + f_name = der.expr.func.__name__ + var_names = [var.name for var in der.variables] + name = "d%s_d%s" % (f_name, 'd'.join(var_names)) + mapping[der] = sp.Symbol(name) + + # replace undefined functions + for f in expr.atoms(AppliedUndef): + f_name = f.func.__name__ + mapping[f] = sp.Symbol(f_name) + + new_expr = expr.subs(mapping) + return new_expr if not ret_map else (new_expr, mapping) + + +def dlambdify(params: tuple, + expr: sp.Expr, + *, + show: bool=False, + retstr: bool=False, + **kwargs + ) -> Callable: + """ + See sp.lambdify. Used to create lambdas (or strings of lambda expressions + if `retstr` is True) from sp expressions. Fixes the issues of derivatives + not working in sp's basic lambdify. + """ + + try: + iter(params) + except TypeError: + params = (params,) + + + #dparams = [dummify_undefined_functions(s) for s in params] + dexpr = dummify_undefined_functions(expr) + + if show or retstr: + s = _lambdastr(params, dexpr, dummify=False, **kwargs) + if show: + print(s) + if retstr: + return s + + return sp.lambdify(params, dexpr, dummify=False, **kwargs) + +def dlambdastr(params: tuple, expr: sp.Expr, **kwargs) -> str: + """ + Equivalent to dlambdify(params, expr, retstr=True, **kwargs) + """ + return dlambdify(params, expr, retstr=True, **kwargs) + + +def auto(expr, + consts: dict = None, + params: List[str] = None, + dfuncs: dict = None, + name: str = None, + *, + show: bool = False, + just_func: bool = False, + **kwargs + ) -> AutoFunc: + """ + Similar to dlambdify, but automatically discovers all parameters in + `expr`. + + `consts`, if used, should be dict of {sp.Symbol: float}, and will + replace any constants in `expr` with values. Otherwise, they will be included + in the final lambda. + + If `show` is True, will print the lambda python expression made. + + If `just_func` is True, will only return the function, otherwise a + AutoFunc instance with attributes: func, args, as_str. + """ + + assert hasattr(params, '__iter__'), \ + f'prams must be iterable, currently {type(params).__name__!r}' + + if consts is None: + consts = {} + for const, value in consts.items(): + expr = expr.subs(const, value) + + dexpr = dummify_undefined_functions(expr) + + if dfuncs is None: + dfuncs = {} + for dfunc, value in dfuncs.items(): + dexpr = dexpr.subs(dexpr, value) + + if params is None: + params = sorted(dexpr.atoms(sp.Symbol), + key=lambda s: [len(str(s)), str(s)]) + elif any(isinstance(p, str) for p in params): + # this actually works if params is just a str, not a tuple + params = sp.symbols(params) + + s = _lambdastr(params, dexpr, dummify=False, **kwargs) + if show: + print(s) + + f = sp.lambdify(params, dexpr, dummify=False, **kwargs) + return AutoFunc(f, s, params, sym_func=expr, name=name) if not just_func else f + + + +def test_func() -> NamedTuple: + """ + Returns t, x0, x, f, df + """ + t, x0 = sp.symbols('t x0') + x = sp.Function('x')(t) + + f = 1/sp.sqrt(x - x0) + df = sp.diff(f, t) + + return collections.namedtuple('Syms', 't, x0, x, f, df')(t, x0, x, f, df) + + +def test() -> Tuple[sp.Expr, str]: + """ + Test case to ensure all is working smoothly + """ + + t, x0, x, f, df = test_func() + return df, dlambdastr([x, sp.diff(x, t)], df) + diff --git a/code/main.py b/code/main.py new file mode 100644 index 0000000..df57cc6 --- /dev/null +++ b/code/main.py @@ -0,0 +1,33 @@ +# main.py + +from driver import Driver +from problem import Lagrange +from solver import Newton + +__author__ = 'Mitchell, FHT' +__date__ = (2017, 8, 20) +__verbose__ = True + +def lagrange(x0=-1e-2, v0=1e-3, dx=1e-5): + """ + 4 months of work... for this?!?! + + :param x0: + :param v0: + :param dx: + :return: + """ + + problem = Lagrange(1, legendre_order=3) + driver = Driver(problem, h0=0.001, K=2, L=1, tf=0.5) + solver = Newton(driver.displacement) + ans = solver.solve([x0, 0, 0, 0, v0, 0], dx=dx) + + t, y = driver.run(ans[-1]) + problem.plot(t, y) + return ans + + +if __name__ == '__main__': + ans = lagrange() + print(ans[-1]) \ No newline at end of file diff --git a/code/method.py b/code/method.py new file mode 100644 index 0000000..3af4583 --- /dev/null +++ b/code/method.py @@ -0,0 +1,292 @@ +# method.py + +import numpy as np +import sympy as sp +from collections import deque +from typing import * + +__author__ = 'Mitchell, FHT' +__date__ = (2017, 8, 20) +__verbose__ = True + +def methods(): + return {cls.__name__.lower(): cls for cls in Method.__subclasses__()} + +class Method: + + def __init__(self, maxK, L=1, mode='explicit', free_params=None): + """ + + :param maxK: + :param L: + :param mode: + :param free_params: + """ + self.maxK = maxK + self.L = L + self.mode = mode + self.free_params = free_params if free_params is not None else {} + assert isinstance(L, int) and (0 < L), L + assert isinstance(maxK, int) and (0 < maxK), maxK + + def __repr__(self): + return f'method.{self.__class__.__name__}(maxK={self.maxK}, L={self.L}, mode={self.mode!r})' + + def explicit(self, yd: deque, h: float, k: int) -> np.ndarray: + raise NotImplementedError + + def implicit(self, yd: deque, ynd: np.ndarray, h: float, k: int) -> np.ndarray: + raise NotImplementedError + + + +class Obr(Method): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.alphas = {k: self.coeffs(k) for k in range(1, self.maxK + 1)} + + + def explicit(self, yd: deque, h: float, k: int): + nexty = sum( + h**i * sum(self.alphas[k][i][j] * yd[j][i] for j in range(0, k)) + for i in range(0, self.L + 1)) + + return nexty + + + def implicit(self, yd: deque, ynd: Union[list, np.ndarray], k: int, h: float): + explicit_part = sum( + h**i * sum(self.alphas[k][i][j] * yd[j][i] for j in range(0, k)) + for i in range(0, self.L + 1)) + + implicit_part = sum(h**i * self.alphas[k][i][k] * ynd[i] + for i in range(1, self.L + 1)) + + thisy = explicit_part + implicit_part + + return thisy + + + def coeffs(self, K) -> List[List[float]]: + + L = self.L + + a, b, c = sp.symbols('a, b, c') + + if self.mode == 'explicit': + if K == 1: + if L == 1: + alpha = [[1, -1], [1]] + if L == 2: + alpha = [[1, -1], [1], [1 / 2]] + if L == 3: + alpha = [[1, -1], [1], [1 / 2], [1 / 6]] + if L == 4: + alpha = [[1, -1], [1], [1 / 2], [1 / 6], [1 / 24]] + if K == 2: + if L == 1: + alpha = [[a, 1 - a, -1], [-0.5 * (1 - a), 0.5 * (3 + a)]] + if L == 2: + alpha = [[a, 1 - a, -1], [0.5 * (3 + a), -0.5 * (1 - a)], + [(7 + a) / 12, -(a - 17) / 12]] + if L == 3: + alpha = [[a, 1 - a, -1], [-0.5 * (13 - a), 0.5 * (15 + a)], + [-0.1 * (29 - a), -0.1 * (31 + a)], + [-(49 - a) / 120, (111 + a) / 120]] + if K == 3: + if L == 1: + alpha = [[a, b, 1 - a - b, -1], + [(5 + 4 * a - b) / 12, (-4 + 4 * a + 2 * b) / 3, + (23 + 4 * a + 5 * b) / 12]] + if L == 2: + alpha = [[a, b, 1 - a - b, -1], + [(581 + 112 * a + 11 * b) / 240, + (38 + 16 * a + 8 * b) / 15, + (-949 + 112 * a + 101 * b) / 240], + [(173 + 16 * a + 3 * b) / 240, (27 + b) / 6, + (637 - 16 * a - 13 * b) / 240]] + if K == 4: + if L == 1: + alpha = [[a, b, c, 1 - a - b - c, -1], + [(-9 + 9 * a + c) / 24, + (37 + 27 * a + 8 * b - 5 * c) / 24, + (-59 + 27 * a + 32 * b + 19 * c) / 24, + (55 + 9 * a + 8 * b + 9 * c) / 24] + ] + elif self.mode == 'implicit': + if K == 1: + if L == 1: + alpha = [[1, -1], [1 / 2, 1 / 2]] + if L == 2: + alpha = [[1, -1], [1 / 2, 1 / 2], [1 / 12, -1 / 12]] + if L == 3: + alpha = [[1, -1], [1 / 2, 1 / 2], [1 / 10, -1 / 10], + [1 / 120, 1 / 120]] + if L == 4: + alpha = [[1, -1], [1 / 2, 1 / 2], [3 / 28, -3 / 28], + [1 / 84, 1 / 84], [1 / 1680, -1 / 1680]] + if K == 2: + if L == 1: + alpha = [[a, 1 - a, -1], + [-(1 - 5 * a) / 12, 2 * (1 + a) / 3, (5 - a) / 12]] + if L == 2: + alpha = [[a, 1 - a, -1], + [(11 + 101 * a) / 240, 8 * (1 + a) / 15, + (101 + 11 * a) / 240], + [(3 + 13 * a) / 240, (1 - a) / 6, + -(13 + 3 * a) / 240]] + if L == 3: + alpha = [[a, 1 - a, -1], + [-(421 - 5669 * a) / 13_440, 64 * (1 + a) / 105, + (5669 - 421 * a) / 13_440], + [-(47 - 303 * a) / 4480, (1 - a) / 8, + -(303 - 47 * a) / 4480], + [-(41 - 169 * a) / 40_320, 8 * (1 + a) / 315, + (169 - 41 * a) / 40_320]] + if K == 3: + if L == 1: + alpha = [ + [a, b, 1 - a - b, -1], + [(1 + 8 * a - b) / 24, + (-5 + 32 * a + 13 * b) / 24, + (19 + 8 * a + 13 * b) / 24, + (9 - b) / 24] + ] + if L == 2: + alpha = [ + [a, b, 1 - a - b, -1], + [(397 + 7136 * a + 243 * b) / 18_144, + (89 + 640 * a + 327 * b) / 672, + (313 + 416 * a + 327 * b) / 672, + (6893 + 640 * a + 234 * b) / 18_144], + [(163 + 1376 * a + 93 * b) / 30_240, + (269 - 512 * a + 339 * b) / 3360, + (851 - 608 * a - 339 * b) / 3360, + (-1284 - 256 * a - 93 * b) / 30_240] + ] + if K == 4: + if L == 1: + alpha = [ + [a, b, c, 1 - a - b - c, -1], + [(-19 + 243 * a - 8 * b + 11 * c) / 720, + (53 + 459 * a + 136 * b + 37 * c) / 360, + (-11 + 27 * a + 38 * b + 19 * c) / 30, + (323 + 189 * a + 136 * b + 173 * c) / 360, + (251 - 27 * a - 8 * b - 19 * c) / 720] + ] + + else: + raise ValueError( + f"mode must be 'implicit' or 'explicit', not {self.mode!r}.") + + try: + alpha + except NameError: + msg = f"Have no coefficients for K = {K}, L = {L} and mode = {self.mode!r}" + raise ValueError(msg) + + free_params = self.free_params.copy() + if free_params is not None: + # replace str with sp.symbol + for key, value in free_params.items(): + if isinstance(key, str): + del free_params[key] + free_params[sp.symbols(key)] = value + + # by default set free params to 0 (seems to work) + if a not in free_params: + free_params[a] = 0 + if b not in free_params: + free_params[b] = 0 + if c not in free_params: + free_params[c] = 0 + + # replace free params (a,b,c) with values + for rown, row in enumerate(alpha): + for coln, expr in enumerate(row): + try: + alpha[rown][coln] = float(expr.subs(free_params)) + except AttributeError: + pass + except TypeError: # Should never reach here + msg = 'Missing one of param obr.a, obr.b or obr.c in consts' + raise ValueError(msg) + + return alpha + + +class Adams(Method): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + assert self.L == 1, self.L + + + def nabla(self, yd: deque, n: int, i: int = -1) -> np.ndarray: + """ + Returns the discrete nabla^n function: nabla^n(y[i]) + :param yd: deque[list[np.ndarray[float]]] + :param n: int + :param i: int + + :return: float + """ + # assert isinstance(i, Integral), repr(i) + # assert isinstance(n, Integral), repr(n) + + # Take first element of f[i] (ie. the first derivative of y) + if n == 0: + return yd[i][1] + elif n == 1: + return yd[i][1] - yd[i - 1][1] + elif n > 1: + return self.nabla(yd, n - 1, i) - self.nabla(yd, n - 1, i - 1) + else: + raise ValueError(f'n must be 0 or greater, not {n!r}.') + + + def gamma(self, j: int) -> float: + """ + Return the adams constant gamma for a given integer j. + + :param j: int + + :return: float + """ + # assert isinstance(j, Integral), repr(j) + if j == 0: + return 1 + elif j >= 1: + return 1 - sum(self.gamma(m) / (j + 1 - m) for m in range(j)) + else: + raise ValueError(f'j must be 0 or greater, not {j!r}.') + + + def explicit(self, yd: deque, h: float, k: int) -> np.ndarray: + """ + Return y[n+k] given a list of y, a list of y_derivs (equal to f) and other + parameters. Exact same signature as obr.make_obr(). + + :param yd: deque[list[np.ndarray[float]]] + A deque of the previous k derivatives y, seen as a list of + [y, dy/dt, ...] of length l. Each element of the list is an ndarray of + length len(y) of floats representing the jth derivative of y[i]. + + :param h: float + timestep + + :param k: int + Number of previous steps to use + + + :return: np.array + The next iteration of y, ie. y[len(y) + 1] + """ + # assert isinstance(h, float) and h > 0, repr(h) + # assert isinstance(k, int) and k >= 1, repr(k) + return yd[0][-1] + h * np.sum([self.gamma(j) * self.nabla(yd[1], n=j, i=-1) + for j in range(k)], axis=0) + + # could replace y[-1] with yd[-1][0] and remove the y paramater + # from this and obr.obr \ No newline at end of file diff --git a/code/problem.py b/code/problem.py new file mode 100644 index 0000000..e7beecb --- /dev/null +++ b/code/problem.py @@ -0,0 +1,477 @@ +#problem.py + +import os +import warnings +import numpy as np +import sympy as sp +import matplotlib.pyplot as plt + +import dsympy +from timers import Timer +from typing import * + +__author__ = 'Mitchell, FHT' +__date__ = (2017, 8, 20) +__verbose__ = True + +# sympy symbols +t, zeta, mu = sp.symbols('t, zeta, mu') +x, y, z = (sp.Function(s)(t) for s in 'xyz') +vx, vy, vz = [sp.diff(s, t) for s in (x, y, z)] +rho = x ** 2 + y ** 2 + z ** 2 + +fig_path = r'D:\GoogleDrive\Work\Year 5\MScProject\Fergus Mitchell MSc (Obrechkoff)\Code\figs' + + +def problems(): + return {cls.__name__.lower(): cls for cls in Problem.__subclasses__()} + + +class Problem: + """ + Abstract Base Class. Will not run without further definiton. + """ + + ind_var: sp.symbol = t + dep_vars: List[sp.Symbol] = NotImplemented + + ODEs: List[List[sp.Expr]] = NotImplemented + default_mapping: Mapping[sp.Symbol, float] = {} + + def __init__(self, L: int, mapping: dict = None, *, auto_higher_order: bool = True): + self.name = self.__class__.__name__ + if NotImplemented in (self.dep_vars, self.ODEs): + raise NotImplementedError('Attempting to initialise ABC') + + assert all(isinstance(row, (list, tuple, np.ndarray)) for row in self.ODEs) + assert all(all(isinstance(f, sp.Expr) for f in row) for row in self.ODEs) + assert all(isinstance(x, sp.Function) for x in self.dep_vars), self.dep_vars + assert isinstance(self.ind_var, sp.Symbol), repr(self.ind_var) + + self.norder = len(self.ODEs) # The order of the system of ODEs + self.ncoords = len(self.dep_vars) # The number of coordinates in the ODEs + assert not any(len(row) - self.ncoords for row in self.ODEs) + self._nargs = (L + self.norder) * self.ncoords + # The (private) number of arguments in funcs + + self.L = L + self.mapping = self.default_mapping.copy() + if mapping is None: + mapping = {} + for key, value in mapping.items(): + if isinstance(key, str): + self.mapping[sp.symbols(key)] = value + elif isinstance(key, sp.Symbol): + self.mapping[key] = value + else: + msg = f'mapping key {key!r} str or sympy.Symbol, not {type(key).__name__!r}' + raise TypeError(msg) + + for key, value in self.mapping.items(): + setattr(self, str(key), value) + + if auto_higher_order: + try: + self.funcs + except (NameError, AttributeError): + self.funcs = [] + self.higher_orders() + else: + raise ValueError(f'{self.name}.funcs already implemented yet' + ' auto_higher_order is True') + else: + self.funcs = NotImplemented + + def __repr__(self): + return f'problem.{self.name}(L={self.L})' + + def higher_orders(self): + timer = Timer() + if __verbose__: + print(f'Running {self.name}.higher_orders() to order {self.L + self.norder - 1}.') + + dparams = self.dparams() + mapping = self.mapping.copy() + + sym_funcs = self.ODEs.copy() + for _ in range(self.L-1): + sym_funcs.append([sp.diff(f, t) for f in sym_funcs[-1]]) + + for order, row in enumerate(sym_funcs, 1): + # Get the names of each lambda from the arguments for dsympy.AutoFunc + # zip() will automatically grab correct number + names = dparams[(order*self.ncoords)+1:] + + self.funcs.append([dsympy.auto(f, mapping, params=dparams, name=n) + for f, n in zip(row, names)]) + + if __verbose__: + print(f'Lambdifying r^({order})(t)... (Time elapsed: {timer()}') + + if __verbose__: + print(f'Done lamdifying. Time taken: {timer()}') + + + def vector_derivs(self, t: float, y: np.array) -> List[np.array]: + # possibility to include "optimisation funcs" argument and code? + + args = [None] * self._nargs + + for i, yi in enumerate(y): + args[i] = yi + + for order in range(1, self.L + self.norder - 1): + for coord in range(self.ncoords): + index = ((order+1) * self.ncoords) + coord + ans = self.funcs[order][coord](t, *args) + args[index] = self.funcs[order][coord](t, *args) + + ydiffs = [y] + for order in range(1, self.L + 1): + ydiffs.append(np.array(args[order * self.ncoords:(order * self.ncoords) + len(y)])) + + return ydiffs + + __call__ = vector_derivs + + def dparams(self): + vars = [str(v).split('(')[0] for v in self.dep_vars] + d_by = str(self.ind_var) + params = [d_by] + vars + for i in range(1, self.norder + self.L): + for var in vars: + params.append(f'd{var}_d' + 'd'.join([d_by]*i)) + + return params + + @staticmethod + def plotfig(fig, file=None, show=True, filetype='.png', **kwargs): + if file is not None: + if filetype[0] != '.': + filetype = '.' + filetype + if '.' in file: + filetype = '' + fig.savefig(os.path.join(fig_path, file + filetype)) + if show: + plt.show() + + def plot(self, t, y, points=None, mode='', file=None, **kwargs): + raise NotImplementedError() + + + +class SHM(Problem): + + dep_vars = [x] + default_mapping = {zeta: 0} + omega = 1 + F = 0 + zeta: float + + v = sp.diff(x, t) + ODEs = [[v], [-2*zeta*v - x]] + + + def analytical_factory(self, y0: np.ndarray) -> Callable: + from sympy import sin, cos, exp + omega = self.omega + zeta = self.zeta # does exist + F = self.F + x0, v0 = y0 + + + om0, om1, z, f, t, c1, c2, = sp.symbols( + 'omega0, omega1, zeta, F, t, c1, c2') + + mapping = {z: zeta, om0: omega, f: F, + om1: omega * abs(1 - zeta ** 2) ** 0.5} + + if omega != 1: + raise NotImplementedError('omega must be 1.0') + if F != 0: + raise NotImplementedError('F must be constant 0.0') + + if zeta == 0: + x = c1 * cos(t) + c2 * sin(t) + elif 0 < zeta < 1: + x = exp(-z * t) * (c1 * cos(om1 * t) + c2 * sin(om1 * t)) + elif zeta == 1: + x = exp(-t) * (c1 + c2 * t) + elif zeta > 1: + x = exp(-z * t) * (c1 * exp(t * om1) + c2 * exp(-t * om1)) + else: + raise ValueError( + f'zeta must be greater than or equal to 0, not {zeta}') + + v = sp.diff(x, t) + + xm = (x - x0).subs(mapping).subs(t, 0) + vm = (v - v0).subs(mapping).subs(t, 0) + + if __verbose__: print(f'x: {xm}, v: {vm}') + + foo = sp.linsolve([xm, vm], c1, c2) + c1_val, c2_val = (float(v) for v in list(foo)[0]) + + if __verbose__: print(f"c1: {c1_val:.4f}, c2: {c2_val:.4f}") + + mapping.update({c1: c1_val, c2: c2_val}) + + xm = x.subs(mapping) + vm = v.subs(mapping) + + if __verbose__: print(f'x: {xm}, \nv: {vm}') + + xf = sp.lambdify(t, xm) + vf = sp.lambdify(t, vm) + + return xf, vf + + + def plot(self, t, y, mode='xve', file=None, data=None, **kwargs): + + fig, axs = plt.subplots(len(mode), sharex=True) + nplt = 0 + + maxerr = errdiff = None + if 'e' in mode: + analytic_xf, analytic_vf = self.analytical_factory(y[0, :]) + analytic_x = np.array([analytic_xf(ti) for ti in t]) + analytic_v = np.array([analytic_vf(ti) for ti in t]) + diffx = y[:, 0] - analytic_x + diffv = y[:, 1] - analytic_v + maxerr = np.max(np.abs(diffx)) + errdiff = y[2] - y[-1] + + if len(mode) == 1: + axs = [axs] + + if 'x' in mode: + axs[nplt].plot(t, y[:, 0], color='b', label='$x_{calc}$') + axs[nplt].set_ylabel('$x(t)$') + if 'e' in mode: + axs[nplt].plot(t, analytic_x, color='k', label='$x_{true}$', + linestyle=':') + axs[nplt].legend() + nplt += 1 + + if 'v' in mode: + axs[nplt].plot(t, y[:, 1], color='g', label='$v_{calc}$') + axs[nplt].set_ylabel('$v(t)$') + if 'e' in mode: + axs[nplt].plot(t, analytic_v, color='k', label='$v_{true}$', + linestyle=':') + axs[nplt].legend() + nplt += 1 + + if 'e' in mode: + if 'x' in mode: + axs[nplt].plot(t, diffx, color='c', label=r'$\mathrm{err}(x)$') + if 'v' in mode: + axs[nplt].plot(t, diffv, color='m', label=r'$\mathrm{err}(v)$') + axs[nplt].set_ylabel('$x_{calc}(t) - x_{true}(t)$') + axs[nplt].legend() + nplt += 1 + + axs[-1].set_xlabel('$t$') + axs[-1].set_xlim(0, t[-1]) + + if 'title' in kwargs: + title = kwargs['title'] + else: + title = rf'Spring with $\zeta = {self.zeta:.2f}$' + if 'data' is not None: + for k, v in data.items(): + if isinstance(v, float): + data[k] = format(v, '.2g') + d = ', '.join(fr"${k}={v}$" for k, v in data.items()) + title += f'\n({d})' + axs[0].set_title(title) + + self.plotfig(fig, file, **kwargs) + return fig, axs + + + +class RTBP(Problem): + + dep_vars = [x, y, z] + r1 = sp.sqrt(x**2 + y**2 + z**2) + r2 = sp.sqrt((x-1)**2 + y**2 + z**2) + + ODEs = [[vx, vy, vz], + [2*vy + x - mu - (1-mu)*x*r1**-3 - mu*(x-1)*r2**-3, + -2*vx + y - (1-mu)*y*r1**3 - mu*y*r2**-3, + -(mu-1)*z*r1**-3 - mu*z*r2**-3] + ] + + earth_mass = 5.9721986e24 + moon_mass = 7.3459e22 + default_mapping = {mu: moon_mass/(earth_mass + moon_mass)} + + R = 1 # dimensionless + theta_dot = 1 + mu: float # will be defined in self.__init__ + + default_points = {'2d': [[0,0], [1,0]]} + + + def plot(self, t, y, mode='2d', file=None, points=None, **kwargs): + + mode = mode.lower().strip() + if points is None: + if mode in self.default_points: + points = self.default_points[mode] + else: + points = () + + fig, ax = plt.subplots() + if 'title' in kwargs: + # noinspection PyStatementEffect + ax.set_title(kwargs['title']) + + if mode == '2d': + ax.plot(y[:, 0], y[:, 1], color='b', label='Path') + ax.set_xlabel('$x$') + ax.set_ylabel('$y$') + fig.tight_layout() + if 'title' not in kwargs: + ax.set_title(r'$\mu = {self.mu:.2g}') + else: + raise ValueError(f'mode not supported: {mode!r}') + + + if isinstance(points, dict): + for k, v in points.items(): + ax.scatter([v[0]], [v[1]], color='r', marker='o', label=k) + ax.legend() + else: + ax.scatter([e[0] for e in points], [e[1] for e in points], color='r', + marker='o') + + self.plotfig(fig, file, **kwargs) + return fig, ax + + + def lagrange_points(self): + + mu = self.mu + + xi = sp.symbols('xi', real=True) + + L4 = [1/2 - mu, np.sqrt(3)/2, 0] + L5 = [L4[0], -L4[1], 0] + + if mu == self.default_mapping[sp.symbols('mu')]: + soln = [-1.00512496490921, 0.836181649431693, 1.15625464982094] + else: + if __verbose__: + print('Solving lagrange points') + f = (1-mu)*abs(xi+mu)**-3*(xi+mu) + mu*abs(xi+mu-1)**-3*(xi+mu-1)-xi + try: + soln = list(map(float, sp.solve(f, xi, quick=True))) + except TypeError: # Bug in sympy converts some values to x + 0j + warnings.warn('Error encountered in sympy.solve. Reverting to ' + 'linear approximation') + soln = np.multiply(self.R, + [1-(mu/3)**(1/3), 1+(mu/3)**(1/3), -1-(5/12)*mu]) + + L3 = [soln[0], 0, 0] + L1 = [soln[1], 0, 0] + L2 = [soln[2], 0, 0] + + # noinspection PyTypeChecker + return dict(enumerate(map(np.array, [L1, L2, L3, L4, L5]), 1)) + + + + + + +class Lagrange(RTBP): + # TODO: Largrange.ODEs is broken + + gamma: float + c: List[float] + ODEs = NotImplemented + + default_points = {'2d': [[0,0]]} + + def __init__(self, L: int = 1, mapping: dict = None, lagrange_point: int = 1, + legendre_order: int = 3, **kwargs): + + assert legendre_order >= 2, repr(legendre_order) + assert 1 <= lagrange_point <= 5, repr(lagrange_point) + if lagrange_point in (3,4,5): + raise NotImplementedError('L3, L4 and L5 not implemented') + + self.legendre_order = legendre_order + self.lagrange_point = lagrange_point + + if mapping is None: + mapping = {} + + if 'mu' in mapping: + self.mu = mapping['mu'] + elif mu in mapping: + self.mu = mapping[mu] + else: + self.mu = self.default_mapping[mu] + + self.lagrange_coord = self.lagrange_points()[lagrange_point] + self.gamma = 1 - self.lagrange_coord[0] # TODO: needs checking + self.c = c = self.legendre_coeffs(self.legendre_order) + + summation = sum(c[n] * rho**n * sp.legendre(n, x/rho) + for n in range(3, self.legendre_order+1)) + + ax = 2*vy + (1+2*c[2])*x + sp.diff(summation, x) + ay = -2*vx - (c[2]-1)*y + sp.diff(summation, y) + az = -c[2]*z + sp.diff(summation, z) + + self.ODEs = [[vx, vy, vz], [ax, ay, az]] + + # noinspection PyArgumentList + super().__init__(L, mapping, **kwargs) + + def legendre_coeffs(self, N): + + mu = self.mu + gamma = self.gamma + + return [gamma**-3 * (mu+(-1)**n) * (1-mu) * (gamma/(1-gamma))**(n+1) + for n in range(N+1)] + + def convert_coords(self, r, *, inv=False): + + r = np.array(r) + assert r.shape == (3,) + + if not inv: + newr = r / self.gamma + newr[0] = (r[0] - 1 + self.mu + self.gamma)/self.gamma + else: + newr = r * self.gamma + newr[0] = (r[0] * self.gamma) - self.gamma - self.mu + 1 + + return newr + + def plot(self, t, y, mode='2d', file=None, points=None, **kwargs): + + mode = mode.lower().strip() + + if 'title' not in kwargs: + if mode == '2d': + title = fr'Motion at L{self.lagrange_point} ($\mu = {self.mu:.2g}$)' + + try: + # noinspection PyUnboundLocalVariable + kwargs['title'] = title + except NameError: + pass + + fig, ax = super().plot(t, y, mode, file, points, **kwargs) + + + + return fig, ax \ No newline at end of file diff --git a/code/solver.py b/code/solver.py new file mode 100644 index 0000000..c8341bb --- /dev/null +++ b/code/solver.py @@ -0,0 +1,56 @@ +# solver.py + +import numpy as np +from timers import Timer +from typing import * + +__author__ = 'Mitchell, FHT' +__date__ = (2017, 8, 20) +__verbose__ = True + +class Solver: + # ABC for later? + pass + +class Newton(Solver): + + def __init__(self, func: Callable, *, rtol: float = 1e-5, atol: float = 1e-8, + max_runs: int = 100): + self.func = func + self.rtol = rtol + self.atol = atol + self.max_runs = max_runs + + def solve(self, y0: np.ndarray, dx: float) -> np.ndarray: + y0 = np.array(y0) + assert len(y0.shape) == 1 + + size = len(y0) + zero = np.zeros(size) + J = np.empty((size, size)) + + y = [y0] + + timer = Timer(5) + for run in range(self.max_runs): + y1 = self.func(y[-1]) + + if np.isclose(y1, zero, rtol=self.rtol, atol=self.atol).all(): + if __verbose__: + print(f'Solution reached. Time taken: {timer()}.\n') + return y + + for index in range(size): + dy = np.zeros(size) + dy[index] = dx + J[:, index] = (self.func(y[-1] + dy) - y1)/dx + + if __verbose__: + print(f"Completed run {run}. Time taken: {timer()}.") + print('_'*70) + + y.append(y[-1] - np.linalg.inv(J)@y1) + + if __verbose__: + print(f"max_runs {self.max_runs} reached. Time taken: {timer()}") + return y \ No newline at end of file diff --git a/code/stepper.py b/code/stepper.py new file mode 100644 index 0000000..8b49134 --- /dev/null +++ b/code/stepper.py @@ -0,0 +1,39 @@ +#stepper.py + +import numpy as np +from method import Method +from collections import deque +from typing import * + +__author__ = 'Mitchell, FHT' +__date__ = (2017, 8, 20) +__verbose__ = True + +class Stepper: + + def __init__(self, stepsize_control: Callable, explicit: Method, + implicit: Method = None): + + self.stepsize_control = stepsize_control + self.explicit = explicit + if implicit is not None: + self.implicit = implicit + else: + self.implicit = _raise_no_implict + + def predict(self, t: float, y:np.array, yd: deque, k: int): + + h = self.stepsize_control(t, y, yd) + t1 = t + h + + y1 = self.explicit.explicit(yd, h, k) + + return h, t1, y1 + + def correct(self, yd: deque, ynd: Union[np.ndarray, list], k: int, h: float): + + return self.implicit.implicit(yd, ynd, k, h) + + +def _raise_no_implict(*args, **kwargs): + raise ValueError('Implicit method undefined') \ No newline at end of file diff --git a/code/tests.py b/code/tests.py new file mode 100644 index 0000000..28cfb55 --- /dev/null +++ b/code/tests.py @@ -0,0 +1,81 @@ +# tests.py + +import numpy as np +import sympy as sp +import matplotlib.pyplot as plt +from problem import SHM, RTBP, Lagrange +from driver import Driver +from solver import Newton +from pprint import pprint + +__author__ = 'Mitchell, FHT' +__date__ = (2017, 8, 20) +__verbose__ = True + + +def test_driver(zeta=0): + shm = SHM(1, {'zeta': zeta}) + driver = Driver(shm, 0.1, 3, 1) + t, y = driver.run([1, 0], tf=10) + shm.plot(t, y, data=dict(h0=0.01, L=3, K=1, mode='obr')) + return + +def test_rtbp(xx=None, vy=None): + rtbp = RTBP(1) + moon_sma = 384_400e3 + moon_period = 27 * 24 * 60**2. + if vy is None: + vy = 2.5e3 + v = [0, vy * moon_period/moon_sma, 0] + + leo = 7e6 + L1 = (1 - np.cbrt(rtbp.mu / 3)) * moon_sma + if xx is None: + xx = leo + x = [xx/moon_sma, 0, 0] + print(f"x = {x} \nv = {v}") + driver = Driver(rtbp, h0=0.0001, K=2, L=1, t0=0, tf=3) + t, y = driver.run(x + v) + fig, ax = plt.subplots() + ax.scatter([0, 1], [0, 0], marker='o', color='r') + ax.plot(y[:,0], y[:,1], color='b') + ax.set_xlabel('$x$') + ax.set_ylabel('$y$') + plt.show() + return + +def test_lagrange(mu=None, tf=4): + mapping = {'mu': mu} if mu is not None else {} + lagrange = Lagrange(1, mapping, lagrange_point=1, legendre_order=2) + driver = Driver(lagrange, h0=0.01, K=2, L=1, t0=0, tf=tf) + t, y = driver.run([0, -1e-3, 0, 1e-3, 0, 0]) + assert all(v not in y for v in (np.nan, np.inf, -np.inf)) + + lagrange.plot(t, y) + return + + +def test_solver(): + """ + url = http://fourier.eng.hmc.edu/e176/lectures/NM/node21.html + """ + x0 = np.ones(3) + + def f(x): + x1, x2, x3 = x + + y1 = 3*x1 - np.cos(x2*x3) - 3/2 + y2 = 4*x1**2 - 625*x2**2 + 2*x3 - 1 + y3 = 20*x3 + np.exp(-x1*x2) + 9 + + return np.array([y1, y2, y3]) + + solver = Newton(f, max_runs=10) + + ans = np.array(solver.solve(x0, 0.001)) + print(ans) + + +if __name__ == '__main__': + test_lagrange(tf=10) + pass \ No newline at end of file diff --git a/code/timers.py b/code/timers.py new file mode 100644 index 0000000..773758a --- /dev/null +++ b/code/timers.py @@ -0,0 +1,126 @@ +# it/timers.py + +import time + +__author__ = 'Mitchell, FHT' +__date__ = (2017, 8, 20) +__verbose__ = True + +def timestamp(unix_time = None, show_zone = True): + """Show time (current if None) in the format 'yyyy-mm-dd HH:MM:SS [TZ]'""" + if unix_time is None: + unix_time = time.time() + str_ = '%Y-%m-%d %H:%M:%S' + if show_zone: + str_ += ' %z' + return time.strftime(str_, time.localtime(unix_time)) + +def time_diff_repr(unix_start, unix_end=0, unit=None, sig=1): + unit_dict = { + 'e': lambda t: '{1:.{0}e} seconds'.format(sig, t), + 's': lambda t: '{1:.{0}f} seconds'.format(sig, t), + 'm': lambda t: '{1:.{0}f} minutes'.format(sig, t / 60), + 'h': lambda t: '{1:.{0}f} hours'.format(sig, t / 3600), + 'd': lambda t: '{1:.{0}f} days'.format(sig, t / (3600 * 24)) + } + + diff = float(abs(unix_end - unix_start)) + + if unit is None: + repr_dict = { + 0.1: 'e', + 120: 's', + 120 * 60: 'm', + 48 * 3600: 'h', + } + for key, value in repr_dict.items(): + if diff < key: + return unit_dict[value](diff) + else: + return unit_dict['d'](diff) + else: + try: + return unit_dict[unit[0]](diff) + except KeyError: + print('Valid keys are {}.'.format(list(unit_dict.keys()))) + raise + + + + +class Clock(object): + + time = staticmethod(time.time) + + @staticmethod + def ftime(show_zone=True): + return timestamp(show_zone=show_zone) + + def __repr__(self): + return "<{}: time=`{}`>".format(self.__class__.__name__, + self.ftime) + + + +class Stopwatch(Clock): + """ + A stopwatch, starts counting from first instancing and upon restart(). + Call an instance to find the time in seconds since timer started/restarted. + Call str to print how much time has past in reasonable units. + """ + + + def __init__(self): + self._tic = time.time() + + def restart(self): + self._tic = time.time() + + @property + def tic(self): + return self._tic + + @property + def toc(self): + return time.time() - self.tic + + def __call__(self, f=None, *args, **kwargs): + if f is None: + return self.ftoc(*args, **kwargs) + return f(self.toc, *args, **kwargs) + + def ftoc(self, unit=None, sig=1): + """ + Time since (re)start in a given unit to sig significant places. + If unit is None an appropriate unit is chosen. + """ + return time_diff_repr(time.time(), self.tic, unit, sig) + + def __repr__(self): + return '<{}: tic=`{}`>'.format(self.__class__.__name__, + timestamp(self.tic)) + + def __str__(self): + return self.ftoc() + + + +class Timer(Stopwatch): + + def __init__(self, checktime=5): + super(Timer, self).__init__() + self.checktime = checktime + self.checker = Stopwatch() + + def __repr__(self): + return '<{}: tic=`{}`, checktime={}>'.format(self.__class__.__name__, + timestamp(self.tic), + self.checktime) + + def check(self, checktime=None): + if checktime is None: + checktime = self.checktime + if self.checker.toc > checktime: + self.checker.restart() + return True + return False