Skip to content

Extend LinearStateSpace class #569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 97 additions & 35 deletions quantecon/lss.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import numpy as np
from numpy.random import multivariate_normal
from scipy.linalg import solve
from .matrix_eqn import solve_discrete_lyapunov
from numba import jit
from .util import check_random_state

Expand Down Expand Up @@ -281,59 +282,61 @@ def moment_sequence(self):
mu_x = A.dot(mu_x)
Sigma_x = A.dot(Sigma_x).dot(A.T) + C.dot(C.T)

def stationary_distributions(self, max_iter=200, tol=1e-5):
def stationary_distributions(self):
r"""
Compute the moments of the stationary distributions of :math:`x_t` and
:math:`y_t` if possible. Computation is by iteration, starting from
the initial conditions self.mu_0 and self.Sigma_0

Parameters
----------
max_iter : scalar(int), optional(default=200)
The maximum number of iterations allowed
tol : scalar(float), optional(default=1e-5)
The tolerance level that one wishes to achieve
:math:`y_t` if possible. Computation is by solving the discrete
Lyapunov equation.

Returns
-------
mu_x_star : array_like(float)
mu_x : array_like(float)
An n x 1 array representing the stationary mean of :math:`x_t`
mu_y_star : array_like(float)
mu_y : array_like(float)
An k x 1 array representing the stationary mean of :math:`y_t`
Sigma_x_star : array_like(float)
Sigma_x : array_like(float)
An n x n array representing the stationary var-cov matrix
of :math:`x_t`
Sigma_y_star : array_like(float)
Sigma_y : array_like(float)
An k x k array representing the stationary var-cov matrix
of :math:`y_t`
Sigma_yx : array_like(float)
An k x n array representing the stationary cov matrix
between :math:`y_t` and :math:`x_t`.

"""
# == Initialize iteration == #
m = self.moment_sequence()
mu_x, mu_y, Sigma_x, Sigma_y = next(m)
i = 0
error = tol + 1
self.__partition()
num_const, sorted_idx = self.num_const, self.sorted_idx
A21, A22 = self.A21, self.A22
CC2 = self.C2 @ self.C2.T
n = self.n

if num_const > 0:
μ = solve(np.eye(n-num_const) - A22, A21)
else:
μ = solve(np.eye(n-num_const) - A22, np.zeros((n, 1)))
Σ = solve_discrete_lyapunov(A22, CC2, method='bartels-stewart')

# == Loop until convergence or failure == #
while error > tol:
mu_x = np.empty((n, 1))
mu_x[:num_const] = self.mu_0[sorted_idx[:num_const]]
mu_x[num_const:] = μ

if i > max_iter:
fail_message = 'Convergence failed after {} iterations'
raise ValueError(fail_message.format(max_iter))
Sigma_x = np.zeros((n, n))
Sigma_x[num_const:, num_const:] = Σ

else:
i += 1
mu_x1, mu_y1, Sigma_x1, Sigma_y1 = next(m)
error_mu = np.max(np.abs(mu_x1 - mu_x))
error_Sigma = np.max(np.abs(Sigma_x1 - Sigma_x))
error = max(error_mu, error_Sigma)
mu_x, Sigma_x = mu_x1, Sigma_x1
mu_x = self.P.T @ mu_x
Sigma_x = self.P.T @ Sigma_x @ self.P

# == Prepare return values == #
mu_x_star, Sigma_x_star = mu_x, Sigma_x
mu_y_star, Sigma_y_star = mu_y1, Sigma_y1
mu_y = self.G @ mu_x
Sigma_y = self.G @ Sigma_x @ self.G.T
if self.H is not None:
Sigma_y += self.H @ self.H.T
Sigma_yx = self.G @ Sigma_x

self.mu_x, self.mu_y = mu_x, mu_y
self.Sigma_x, self.Sigma_y, self.Sigma_yx = Sigma_x, Sigma_y, Sigma_yx

return mu_x_star, mu_y_star, Sigma_x_star, Sigma_y_star
return mu_x, mu_y, Sigma_x, Sigma_y, Sigma_yx
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably break a significant amount of code using this function. An easy way around this is to use a keyword argument to optionally return Sigma_yx. Otherwise, I'm guessing we'll need a PR to update multiple lectures.


def geometric_sums(self, beta, x_t):
r"""
Expand Down Expand Up @@ -406,3 +409,62 @@ def impulse_response(self, j=5):
Apower = np.dot(Apower, A)

return xcoef, ycoef

def __partition(self):
r"""
Reorder the states by shifting the constant terms to the
top of the state vector. Then partition the linear state
space model following Appendix C in RMT Ch2 such that the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this Appendix C in the latest version of RMT? In an older version, it seems that it was "Appendix B". If so, we might need a more precise reference.

A22 matrix associated with non-constant states have eigenvalues
all strictly smaller than 1.

.. math::
\left[\begin{array}{c}
const\\
x_{2,t+1}
\end{array}\right]=\left[\begin{array}{cc}
I & 0\\
A_{21} & A_{22}
\end{array}\right]\left[\begin{array}{c}
1\\
x_{2,t}
\end{array}\right]+\left[\begin{array}{c}
0\\
C_{2}
\end{array}\right]w_{t+1}

Returns
-------
A22 : array_like(float)
Lower right part of the reordered matrix A.
A21 : array_like(float)
Lower left part of the reordered matrix A.
"""
A, C = self.A, self.C
n = self.n

sorted_idx = []
A_diag = np.diag(A)
num_const = 0
for idx in range(n):
if (A_diag[idx] == 1) and (C[idx, :] == 0).all() \
and np.linalg.norm(A[idx, :]) == 1:
sorted_idx.insert(0, idx)
num_const += 1
else:
sorted_idx.append(idx)
self.num_const = num_const
self.sorted_idx = sorted_idx

P = np.zeros((n, n))
P[range(n), sorted_idx] = 1

sorted_A = P @ A @ P.T
sorted_C = P @ C
A21 = sorted_A[num_const:, :num_const]
A22 = sorted_A[num_const:, num_const:]
C2 = sorted_C[num_const:, :]

self.P, self.A21, self.A22, self.C2 = P, A21, A22, C2

return A21, A22
43 changes: 33 additions & 10 deletions quantecon/tests/test_lss.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,58 @@
class TestLinearStateSpace(unittest.TestCase):

def setUp(self):
# Initial Values
# Example 1
A = .95
C = .05
G = 1.
mu_0 = .75

self.ss = LinearStateSpace(A, C, G, mu_0=mu_0)
self.ss1 = LinearStateSpace(A, C, G, mu_0=mu_0)

# Example 2
ρ1 = 0.5
ρ2 = 0.3
α = 0.5

A = np.array([[ρ1, ρ2, α], [1, 0, 0], [0, 0, 1]])
C = np.array([[1], [0], [0]])
G = np.array([[1, 0, 0]])
mu_0 = [0.5, 0.5, 1]

self.ss2 = LinearStateSpace(A, C, G, mu_0=mu_0)

def tearDown(self):
del self.ss
del self.ss1
del self.ss2

def test_stationarity(self):
vals = self.ss.stationary_distributions(max_iter=1000, tol=1e-9)
ssmux, ssmuy, sssigx, sssigy = vals
vals = self.ss1.stationary_distributions()
ssmux, ssmuy, sssigx, sssigy, sssigyx = vals

self.assertTrue(abs(ssmux - ssmuy) < 2e-8)
self.assertTrue(abs(sssigx - sssigy) < 2e-8)
self.assertTrue(abs(ssmux) < 2e-8)
self.assertTrue(abs(sssigx - self.ss.C**2/(1 - self.ss.A**2)) < 2e-8)
self.assertTrue(abs(sssigx - self.ss1.C**2/(1 - self.ss1.A**2)) < 2e-8)
self.assertTrue(abs(sssigyx - self.ss1.G @ sssigx) < 2e-8)

vals = self.ss2.stationary_distributions()
ssmux, ssmuy, sssigx, sssigy, sssigyx = vals

assert_allclose(ssmux.flatten(), np.array([2.5, 2.5, 1]))
assert_allclose(ssmuy.flatten(), np.array([2.5]))
assert_allclose(sssigx, self.ss2.A @ sssigx @ self.ss2.A.T + self.ss2.C @ self.ss2.C.T)
assert_allclose(sssigy, self.ss2.G @ sssigx @ self.ss2.G.T)
assert_allclose(sssigyx, self.ss2.G @ sssigx)

def test_simulate(self):
ss = self.ss
ss = self.ss1

sim = ss.simulate(ts_length=250)
for arr in sim:
self.assertTrue(len(arr[0])==250)

def test_simulate_with_seed(self):
ss = self.ss
ss = self.ss1

xval, yval = ss.simulate(ts_length=5, random_state=5)
expected_output = np.array([0.75 , 0.73456137, 0.6812898, 0.76876387,
Expand All @@ -51,14 +74,14 @@ def test_simulate_with_seed(self):
assert_allclose(yval[0], expected_output)

def test_replicate(self):
xval, yval = self.ss.replicate(T=100, num_reps=5000)
xval, yval = self.ss1.replicate(T=100, num_reps=5000)

assert_allclose(xval, yval)
self.assertEqual(xval.size, 5000)
self.assertLessEqual(abs(np.mean(xval)), .05)

def test_replicate_with_seed(self):
xval, yval = self.ss.replicate(T=100, num_reps=5, random_state=5)
xval, yval = self.ss1.replicate(T=100, num_reps=5, random_state=5)
expected_output = np.array([0.06871204, 0.06937119, -0.1478022,
0.23841252, -0.06823762])

Expand Down