Coverage for starry/_core/math.py : 91%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2from .. import config
3from .utils import *
4from .._constants import *
5import theano
6import theano.tensor as tt
7import numpy as np
8from scipy.linalg import block_diag as scipy_block_diag
9import theano.tensor.slinalg as sla
10import scipy
12__all__ = ["math", "linalg"]
15class MathType(type):
16 """Wrapper for theano/numpy functions."""
18 def cholesky(cls, *args, **kwargs):
19 if config.lazy:
20 return sla.cholesky(*args, **kwargs)
21 else:
22 return scipy.linalg.cholesky(*args, **kwargs, lower=True)
24 def atleast_2d(cls, arg):
25 if config.lazy:
26 return arg * tt.ones((1, 1))
27 else:
28 return np.atleast_2d(arg)
30 def vectorize(cls, *args):
31 """
32 Vectorize all ``args`` so that they have the same length
33 along the first axis.
35 TODO: Add error catching if the dimensions don't agree.
36 """
37 if config.lazy:
38 args = [arg * tt.ones(1) for arg in args]
39 size = tt.max([arg.shape[0] for arg in args])
40 args = [tt.repeat(arg, size // arg.shape[0], 0) for arg in args]
41 else:
42 args = [np.atleast_1d(arg) for arg in args]
43 size = np.max([arg.shape[0] for arg in args])
44 args = tuple(
45 [
46 arg
47 * np.ones(
48 (size,) + tuple(np.ones(len(arg.shape) - 1, dtype=int))
49 )
50 for arg in args
51 ]
52 )
53 if len(args) == 1:
54 return args[0]
55 else:
56 return args
58 def cross(x, y):
59 """Cross product of two 3-vectors.
61 Based on ``https://github.com/Theano/Theano/pull/3008``
62 """
63 if config.lazy:
64 eijk = np.zeros((3, 3, 3))
65 eijk[0, 1, 2] = eijk[1, 2, 0] = eijk[2, 0, 1] = 1
66 eijk[0, 2, 1] = eijk[2, 1, 0] = eijk[1, 0, 2] = -1
67 return tt.as_tensor_variable(tt.dot(tt.dot(eijk, y), x))
68 else:
69 return np.cross(x, y)
71 def cast(cls, *args):
72 if config.lazy:
73 return cls.to_tensor(*args)
74 else:
75 if len(args) == 1:
76 return np.array(args[0], dtype=tt.config.floatX)
77 else:
78 return [np.array(arg, dtype=tt.config.floatX) for arg in args]
80 def to_array_or_tensor(cls, x):
81 if config.lazy:
82 return tt.as_tensor_variable(x)
83 else:
84 return np.array(x)
86 def block_diag(self, *mats):
87 if config.lazy:
88 N = [mat.shape[0] for mat in mats]
89 Nsum = tt.sum(N)
90 res = tt.zeros((Nsum, Nsum), dtype=theano.config.floatX)
91 n = 0
92 for mat in mats:
93 inds = slice(n, n + mat.shape[0])
94 res = tt.set_subtensor(res[tuple((inds, inds))], mat)
95 n += mat.shape[0]
96 return res
97 else:
98 return scipy_block_diag(*mats)
100 def to_tensor(cls, *args):
101 """Convert all ``args`` to Theano tensor variables.
103 Converts to tensor regardless of whether `config.lazy` is True or False.
104 """
105 if len(args) == 1:
106 return tt.as_tensor_variable(args[0]).astype(tt.config.floatX)
107 else:
108 return [
109 tt.as_tensor_variable(arg).astype(tt.config.floatX)
110 for arg in args
111 ]
113 def __getattr__(cls, attr):
114 if config.lazy:
115 return getattr(tt, attr)
116 else:
117 return getattr(np, attr)
120class math(metaclass=MathType):
121 """Alias for ``numpy`` or ``theano.tensor``, depending on `config.lazy`."""
123 pass
126# Cholesky solve
127_solve_lower = sla.Solve(A_structure="lower_triangular", lower=True)
128_solve_upper = sla.Solve(A_structure="upper_triangular", lower=False)
131def _cho_solve(cho_A, b):
132 return _solve_upper(tt.transpose(cho_A), _solve_lower(cho_A, b))
135class LinAlgType(type):
136 """Linear algebra operations."""
138 @autocompile
139 def cho_solve(self, cho_A, b):
140 return _cho_solve(cho_A, b)
142 @autocompile
143 def solve(self, X, flux, cho_C, mu, LInv):
144 """
145 Compute the maximum a posteriori (MAP) prediction for the
146 spherical harmonic coefficients of a map given a flux timeseries.
148 Args:
149 X (matrix): The flux design matrix.
150 flux (array): The flux timeseries.
151 cho_C (scalar/vector/matrix): The lower cholesky factorization
152 of the data covariance.
153 mu (array): The prior mean of the spherical harmonic coefficients.
154 LInv (scalar/vector/matrix): The inverse prior covariance of the
155 spherical harmonic coefficients.
157 Returns:
158 The vector of spherical harmonic coefficients corresponding to the
159 MAP solution and the Cholesky factorization of the corresponding
160 covariance matrix.
162 """
163 # Compute C^-1 . X
164 if cho_C.ndim == 0:
165 CInvX = X / cho_C ** 2
166 elif cho_C.ndim == 1:
167 CInvX = tt.dot(tt.diag(1 / cho_C ** 2), X)
168 else:
169 CInvX = _cho_solve(cho_C, X)
171 # Compute W = X^T . C^-1 . X + L^-1
172 W = tt.dot(tt.transpose(X), CInvX)
173 if LInv.ndim == 0:
174 W = tt.inc_subtensor(
175 W[tuple((tt.arange(W.shape[0]), tt.arange(W.shape[0])))], LInv
176 )
177 LInvmu = mu * LInv
178 elif LInv.ndim == 1:
179 W = tt.inc_subtensor(
180 W[tuple((tt.arange(W.shape[0]), tt.arange(W.shape[0])))], LInv
181 )
182 LInvmu = mu * LInv
183 else:
184 W += LInv
185 LInvmu = tt.dot(LInv, mu)
187 # Compute the max like y and its covariance matrix
188 cho_W = sla.cholesky(W)
189 M = _cho_solve(cho_W, tt.transpose(CInvX))
190 yhat = tt.dot(M, flux) + _cho_solve(cho_W, LInvmu)
191 ycov = _cho_solve(cho_W, tt.eye(cho_W.shape[0]))
192 cho_ycov = sla.cholesky(ycov)
194 return yhat, cho_ycov
196 @autocompile
197 def lnlike(cls, X, flux, C, mu, L):
198 """
199 Compute the log marginal likelihood of the data given a design matrix.
201 Args:
202 X (matrix): The flux design matrix.
203 flux (array): The flux timeseries.
204 C (scalar/vector/matrix): The data covariance matrix.
205 mu (array): The prior mean of the spherical harmonic coefficients.
206 L (scalar/vector/matrix): The prior covariance of the spherical
207 harmonic coefficients.
209 Returns:
210 The log marginal likelihood of the `flux` vector conditioned on
211 the design matrix `X`. This is the likelihood marginalized over
212 all possible spherical harmonic vectors, which is analytically
213 computable for the linear `starry` model.
215 """
216 # Compute the GP mean
217 gp_mu = tt.dot(X, mu)
219 # Compute the GP covariance
220 if L.ndim == 0:
221 XLX = tt.dot(X, tt.transpose(X)) * L
222 elif L.ndim == 1:
223 XLX = tt.dot(tt.dot(X, tt.diag(L)), tt.transpose(X))
224 else:
225 XLX = tt.dot(tt.dot(X, L), tt.transpose(X))
227 if C.ndim == 0 or C.ndim == 1:
228 gp_cov = tt.inc_subtensor(
229 XLX[tuple((tt.arange(XLX.shape[0]), tt.arange(XLX.shape[0])))],
230 C,
231 )
232 else:
233 gp_cov = C + XLX
235 cho_gp_cov = sla.cholesky(gp_cov)
237 # Compute the marginal likelihood
238 N = X.shape[0]
239 r = tt.reshape(flux - gp_mu, (-1, 1))
240 lnlike = -0.5 * tt.dot(tt.transpose(r), _cho_solve(cho_gp_cov, r))
241 lnlike -= tt.sum(tt.log(tt.diag(cho_gp_cov)))
242 lnlike -= 0.5 * N * tt.log(2 * np.pi)
244 return lnlike[0, 0]
246 @autocompile
247 def lnlike_woodbury(cls, X, flux, CInv, mu, LInv, lndetC, lndetL):
248 """
249 Compute the log marginal likelihood of the data given a design matrix
250 using the Woodbury identity.
252 Args:
253 X (matrix): The flux design matrix.
254 flux (array): The flux timeseries.
255 CInv (scalar/vector/matrix): The inverse data covariance matrix.
256 mu (array): The prior mean of the spherical harmonic coefficients.
257 L (scalar/vector/matrix): The inverse prior covariance of the
258 spherical harmonic coefficients.
260 Returns:
261 The log marginal likelihood of the `flux` vector conditioned on
262 the design matrix `X`. This is the likelihood marginalized over
263 all possible spherical harmonic vectors, which is analytically
264 computable for the linear `starry` model.
266 """
267 # Compute the GP mean
268 gp_mu = tt.dot(X, mu)
270 # Residual vector
271 r = tt.reshape(flux - gp_mu, (-1, 1))
273 # Inverse of GP covariance via Woodbury identity
274 if CInv.ndim == 0:
275 U = X * CInv
276 elif CInv.ndim == 1:
277 U = tt.dot(tt.diag(CInv), X)
278 else:
279 U = tt.dot(CInv, X)
281 if LInv.ndim == 0:
282 W = tt.dot(tt.transpose(X), U) + LInv * tt.eye(U.shape[1])
283 elif LInv.ndim == 1:
284 W = tt.dot(tt.transpose(X), U) + tt.diag(LInv)
285 else:
286 W = tt.dot(tt.transpose(X), U) + LInv
287 cho_W = sla.cholesky(W)
289 if CInv.ndim == 0:
290 SInv = CInv * tt.eye(U.shape[0]) - tt.dot(
291 U, _cho_solve(cho_W, tt.transpose(U))
292 )
293 elif CInv.ndim == 1:
294 SInv = tt.diag(CInv) - tt.dot(
295 U, _cho_solve(cho_W, tt.transpose(U))
296 )
297 else:
298 SInv = CInv - tt.dot(U, _cho_solve(cho_W, tt.transpose(U)))
300 # Determinant of GP covariance
301 lndetW = 2 * tt.sum(tt.log(tt.diag(cho_W)))
302 lndetS = lndetW + lndetC + lndetL
304 # Compute the marginal likelihood
305 N = X.shape[0]
306 lnlike = -0.5 * tt.dot(tt.transpose(r), tt.dot(SInv, r))
307 lnlike -= 0.5 * lndetS
308 lnlike -= 0.5 * N * tt.log(2 * np.pi)
310 return lnlike[0, 0]
313class linalg(metaclass=LinAlgType):
314 """Miscellaneous linear algebra operations."""
316 class Covariance(object):
317 """A container for covariance matrices.
319 Args:
320 C (scalar, vector, or matrix, optional): The covariance.
321 Defaults to None.
322 cho_C (matrix, optional): The lower Cholesky factorization of
323 the covariance. Defaults to None.
324 N (int, optional): The number of rows/columns in the covariance
325 matrix, required if ``C`` is a scalar. Defaults to None.
326 """
328 def __init__(self, C=None, cho_C=None, N=None):
330 # User provided the Cholesky factorization
331 if cho_C is not None:
333 self.cholesky = math.cast(cho_C)
334 self.value = math.dot(
335 self.cholesky, math.transpose(self.cholesky)
336 )
337 self.inverse = self._cho_solve(self.cholesky)
338 self.lndet = 2 * math.sum(math.log(math.diag(self.cholesky)))
339 self.kind = "cholesky"
340 self.N = cho_C.shape[0]
342 # User provided the covariance as a scalar, vector, or matrix
343 elif C is not None:
345 C = math.cast(C)
347 if hasattr(C, "ndim"):
349 if C.ndim == 0:
351 assert (
352 N is not None
353 ), "Please provide a matrix size `N`."
354 self.cholesky = math.sqrt(C)
355 self.inverse = math.cast(1.0 / C)
356 self.lndet = math.cast(N * math.log(C))
357 self.value = C
358 self.kind = "scalar"
359 self.N = N
361 elif C.ndim == 1:
363 self.cholesky = math.sqrt(C)
364 self.inverse = 1.0 / C
365 self.lndet = math.sum(math.log(C))
366 self.value = C
367 self.kind = "vector"
368 self.N = C.shape[0]
370 else:
372 self.cholesky = math.cholesky(C)
373 self.inverse = self._cho_solve(self.cholesky)
374 self.lndet = 2 * math.sum(
375 math.log(math.diag(self.cholesky))
376 )
377 self.value = C
378 self.kind = "matrix"
379 self.N = C.shape[0]
381 # Assume it's a scalar
382 else:
384 assert N is not None, "Please provide a matrix size `N`."
385 self.cholesky = math.sqrt(C)
386 self.inverse = math.cast(1.0 / C)
387 self.lndet = math.cast(N * math.log(C))
388 self.value = C
389 self.kind = "scalar"
390 self.N = N
392 # ?!
393 else:
394 raise ValueError(
395 "Either the covariance or its Cholesky factorization must be provided."
396 )
398 def _cho_solve(self, cho_A, b=None):
399 """Apply the cholesky factorization to a vector or matrix."""
400 if config.lazy:
401 if b is None:
402 b = tt.eye(cho_A.shape[0])
403 return _cho_solve(cho_A, b)
404 else:
405 if b is None:
406 b = np.eye(cho_A.shape[0])
407 return scipy.linalg.cho_solve((cho_A, True), b)