Source code for pydl.pydlutils.bspline

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""This module corresponds to the bspline directory in idlutils.
"""
import numpy as np


[docs]class bspline(object): """Bspline class. Functions in the bspline library are implemented as methods on this class. Parameters ---------- x : :class:`numpy.ndarray` The data. nord : :class:`int`, optional To be documented. npoly : :class:`int`, optional To be documented. bkpt : :class:`numpy.ndarray`, optional To be documented. bkspread : :class:`float`, optional To be documented. verbose : :class:`bool`, optional. If ``True`` print extra information. Attributes ---------- breakpoints To be documented. nord To be documented. npoly To be documented. mask To be documented. coeff To be documented. icoeff To be documented. xmin To be documented. xmax To be documented. funcname To be documented. """ def __init__(self, x, nord=4, npoly=1, bkpt=None, bkspread=1.0, verbose=False, **kwargs): """Init creates an object whose attributes are similar to the structure returned by the create_bspline function. """ # # Set the breakpoints. # if bkpt is None: startx = x.min() rangex = x.max() - startx if 'placed' in kwargs: w = ((kwargs['placed'] >= startx) & (kwargs['placed'] <= startx+rangex)) if w.sum() < 2: bkpt = np.arange(2, dtype='f') * rangex + startx else: bkpt = kwargs['placed'][w] elif 'bkspace' in kwargs: nbkpts = int(rangex/kwargs['bkspace']) + 1 if nbkpts < 2: nbkpts = 2 tempbkspace = rangex/float(nbkpts-1) bkpt = np.arange(nbkpts, dtype='f')*tempbkspace + startx elif 'nbkpts' in kwargs: nbkpts = kwargs['nbkpts'] if nbkpts < 2: nbkpts = 2 tempbkspace = rangex/float(nbkpts-1) bkpt = np.arange(nbkpts, dtype='f') * tempbkspace + startx elif 'everyn' in kwargs: npkpts = max(nx/kwargs['everyn'], 1) if nbkpts == 1: xspot = [0] else: xspot = int(nx/(nbkpts-1)) * np.arange(nbkpts, dtype='i4') bkpt = x[xspot].astype('f') else: raise ValueError('No information for bkpts.') imin = bkpt.argmin() imax = bkpt.argmax() if x.min() < bkpt[imin]: if verbose: print('Lowest breakpoint does not cover lowest x value: changing.') bkpt[imin] = x.min() if x.max() > bkpt[imax]: if verbose: print('Highest breakpoint does not cover highest x value: changing.') bkpt[imax] = x.max() nshortbkpt = bkpt.size fullbkpt = bkpt.copy() if nshortbkpt == 1: bkspace = np.float32(bkspread) else: bkspace = (bkpt[1] - bkpt[0]) * np.float32(bkspread) for i in np.arange(1, nord, dtype=np.float32): fullbkpt = np.insert(fullbkpt, 0, bkpt[0]-bkspace*i) fullbkpt = np.insert(fullbkpt, fullbkpt.shape[0], bkpt[nshortbkpt-1] + bkspace*i) # # Set the attributes # nc = fullbkpt.size - nord self.breakpoints = fullbkpt self.nord = nord self.npoly = npoly self.mask = np.ones((fullbkpt.size,), dtype='bool') if npoly > 1: self.coeff = np.zeros((npoly, nc), dtype='d') self.icoeff = np.zeros((npoly, nc), dtype='d') else: self.coeff = np.zeros((nc,), dtype='d') self.icoeff = np.zeros((nc,), dtype='d') self.xmin = 0.0 self.xmax = 1.0 self.funcname = 'legendre' return
[docs] def fit(self, xdata, ydata, invvar, x2=None): """Calculate a B-spline in the least-squares sense. Fit is based on two variables: x which is sorted and spans a large range where bkpts are required y which can be described with a low order polynomial. Parameters ---------- xdata : :class:`numpy.ndarray` Independent variable. ydata : :class:`numpy.ndarray` Dependent variable. invvar : :class:`numpy.ndarray` Inverse variance of `ydata`. x2 : :class:`numpy.ndarray`, optional Orthogonal dependent variable for 2d fits. Returns ------- :func:`tuple` A tuple containing an integer error code, and the evaluation of the b-spline at the input values. An error code of -2 is a failure, -1 indicates dropped breakpoints, 0 is success, and positive integers indicate ill-conditioned breakpoints. """ goodbk = self.mask[self.nord:] nn = goodbk.sum() if nn < self.nord: yfit = np.zeros(ydata.shape, dtype='f') return (-2, yfit) nfull = nn * self.npoly bw = self.npoly * self.nord a1, lower, upper = self.action(xdata, x2=x2) foo = np.tile(invvar, bw).reshape(bw, invvar.size).transpose() a2 = a1 * foo alpha = np.zeros((bw, nfull+bw), dtype='d') beta = np.zeros((nfull+bw,), dtype='d') bi = np.arange(bw, dtype='i4') bo = np.arange(bw, dtype='i4') for k in range(1, bw): bi = np.append(bi, np.arange(bw-k, dtype='i4')+(bw+1)*k) bo = np.append(bo, np.arange(bw-k, dtype='i4')+bw*k) for k in range(nn-self.nord+1): itop = k*self.npoly ibottom = min(itop, nfull) + bw - 1 ict = upper[k] - lower[k] + 1 if ict > 0: work = np.dot(a1[lower[k]:upper[k]+1, :].T, a2[lower[k]:upper[k]+1, :]) wb = np.dot(ydata[lower[k]:upper[k]+1], a2[lower[k]:upper[k]+1, :]) alpha.T.flat[bo+itop*bw] += work.flat[bi] beta[itop:ibottom+1] += wb min_influence = 1.0e-10 * invvar.sum() / nfull errb = cholesky_band(alpha, mininf=min_influence) # ,verbose=True) if isinstance(errb[0], int) and errb[0] == -1: a = errb[1] else: yfit, foo = self.value(xdata, x2=x2, action=a1, upper=upper, lower=lower) return (self.maskpoints(errb[0]), yfit) errs = cholesky_solve(a, beta) if isinstance(errs[0], int) and errs[0] == -1: sol = errs[1] else: # # It is not possible for this to get called, because cholesky_solve # has only one return statement, & that statement guarantees that # errs[0] == -1 # yfit, foo = self.value(xdata, x2=x2, action=a1, upper=upper, lower=lower) return (self.maskpoints(errs[0]), yfit) if self.npoly > 1: self.icoeff[:, goodbk] = np.array(a[0, 0:nfull].reshape(self.npoly, nn), dtype=a.dtype) self.coeff[:, goodbk] = np.array(sol[0:nfull].reshape(self.npoly, nn), dtype=sol.dtype) else: self.icoeff[goodbk] = np.array(a[0, 0:nfull], dtype=a.dtype) self.coeff[goodbk] = np.array(sol[0:nfull], dtype=sol.dtype) yfit, foo = self.value(xdata, x2=x2, action=a1, upper=upper, lower=lower) return (0, yfit)
[docs] def action(self, x, x2=None): """Construct banded bspline matrix, with dimensions [ndata, bandwidth]. Parameters ---------- x : :class:`numpy.ndarray` Independent variable. x2 : :class:`numpy.ndarray`, optional Orthogonal dependent variable for 2d fits. Returns ------- :func:`tuple` A tuple containing the b-spline action matrix; the 'lower' parameter, a list of pixel positions, each corresponding to the first occurence of position greater than breakpoint indx; and 'upper', Same as lower, but denotes the upper pixel positions. """ from .. import uniq from ..goddard.math import flegendre from .trace import fchebyshev nx = x.size nbkpt = self.mask.sum() if nbkpt < 2*self.nord: return (-2, 0, 0) n = nbkpt - self.nord gb = self.breakpoints[self.mask] bw = self.npoly*self.nord lower = np.zeros((n - self.nord + 1,), dtype='i4') upper = np.zeros((n - self.nord + 1,), dtype='i4') - 1 indx = self.intrv(x) bf1 = self.bsplvn(x, indx) action = bf1 aa = uniq(indx, np.arange(indx.size, dtype='i4')) upper[indx[aa]-self.nord+1] = aa rindx = indx[::-1] bb = uniq(rindx, np.arange(rindx.size, dtype='i4')) lower[rindx[bb]-self.nord+1] = nx - bb - 1 if x2 is not None: if x2.size != nx: raise ValueError('Dimensions of x and x2 do not match.') x2norm = 2.0 * (x2 - self.xmin) / (self.xmax - self.xmin) - 1.0 if self.funcname == 'poly': temppoly = np.ones((nx, self.npoly), dtype='f') for i in range(1, self.npoly): temppoly[:, i] = temppoly[:, i-1] * x2norm elif self.funcname == 'poly1': temppoly = np.tile(x2norm, self.npoly).reshape(nx, self.npoly) for i in range(1, self.npoly): temppoly[:, i] = temppoly[:, i-1] * x2norm elif self.funcname == 'chebyshev': temppoly = fchebyshev(x2norm, self.npoly) elif self.funcname == 'legendre': temppoly = flegendre(x2norm, self.npoly) else: raise ValueError('Unknown value of funcname.') action = np.zeros((nx, bw), dtype='d') counter = -1 for ii in range(self.nord): for jj in range(self.npoly): counter += 1 action[:, counter] = bf1[:, ii]*temppoly[:, jj] return (action, lower, upper)
[docs] def intrv(self, x): """Find the segment between breakpoints which contain each value in the array x. The minimum breakpoint is nbkptord -1, and the maximum is nbkpt - nbkptord - 1. Parameters ---------- x : :class:`numpy.ndarray` Data values, assumed to be monotonically increasing. Returns ------- :class:`numpy.ndarray` Position of array elements with respect to breakpoints. """ gb = self.breakpoints[self.mask] n = gb.size - self.nord indx = np.zeros((x.size,), dtype='i4') ileft = self.nord - 1 for i in range(x.size): while x[i] > gb[ileft+1] and ileft < n - 1: ileft += 1 indx[i] = ileft return indx
[docs] def bsplvn(self, x, ileft): """To be documented. Parameters ---------- x : :class:`numpy.ndarray` To be documented. ileft : :class:`int` To be documented Returns ------- :class:`numpy.ndarray` To be documented. """ bkpt = self.breakpoints[self.mask] vnikx = np.zeros((x.size, self.nord), dtype=x.dtype) deltap = vnikx.copy() deltam = vnikx.copy() j = 0 vnikx[:, 0] = 1.0 while j < self.nord - 1: ipj = ileft+j+1 deltap[:, j] = bkpt[ipj] - x imj = ileft-j deltam[:, j] = x - bkpt[imj] vmprev = 0.0 for l in range(j+1): vm = vnikx[:, l]/(deltap[:, l] + deltam[:, j-l]) vnikx[:, l] = vm*deltap[:, l] + vmprev vmprev = vm*deltam[:, j-l] j += 1 vnikx[:, j] = vmprev return vnikx
[docs] def value(self, x, x2=None, action=None, lower=None, upper=None): """Evaluate a bspline at specified values. Parameters ---------- x : :class:`numpy.ndarray` Independent variable. x2 : :class:`numpy.ndarray`, optional Orthogonal dependent variable for 2d fits. action : :class:`numpy.ndarray`, optional Action matrix to use. If not supplied it is calculated. lower : :class:`numpy.ndarray`, optional If the action parameter is supplied, this parameter must also be supplied. upper : :class:`numpy.ndarray`, optional If the action parameter is supplied, this parameter must also be supplied. Returns ------- :func:`tuple` A tuple containing the results of the bspline evaluation and a mask indicating where the evaluation was good. """ xsort = x.argsort() xwork = x[xsort] if x2 is not None: x2work = x2[xsort] else: x2work = None if action is not None: if lower is None or upper is None: raise ValueError('Must specify lower and upper if action is set.') else: action, lower, upper = self.action(xwork, x2=x2work) yfit = np.zeros(x.shape, dtype=x.dtype) bw = self.npoly * self.nord spot = np.arange(bw, dtype='i4') goodbk = self.mask.nonzero()[0] coeffbk = self.mask[self.nord:].nonzero()[0] n = self.mask.sum() - self.nord if self.npoly > 1: goodcoeff = self.coeff[:, coeffbk] else: goodcoeff = self.coeff[coeffbk] # maskthis = np.zeros(xwork.shape,dtype=xwork.dtype) for i in range(n-self.nord+1): ict = upper[i] - lower[i] + 1 if ict > 0: yfit[lower[i]:upper[i]+1] = np.dot( action[lower[i]:upper[i]+1, :], goodcoeff[i*self.npoly+spot]) yy = yfit.copy() yy[xsort] = yfit mask = np.ones(x.shape, dtype='bool') gb = self.breakpoints[goodbk] outside = ((x < gb[self.nord-1]) | (x > gb[n])) if outside.any(): mask[outside] = False hmm = ((np.diff(goodbk) > 2).nonzero())[0] for jj in range(hmm.size): inside = ((x >= self.breakpoints[goodbk[hmm[jj]]]) & (x <= self.breakpoints[goodbk[hmm[jj]+1]-1])) if inside.any(): mask[inside] = False return (yy, mask)
[docs] def maskpoints(self, err): """Perform simple logic of which breakpoints to mask. Parameters ---------- err : :class:`numpy.ndarray` The list of indexes returned by the cholesky routines. Returns ------- :class:`int` An integer indicating the results of the masking. -1 indicates that the error points were successfully masked. -2 indicates failure; the calculation should be aborted. Notes ----- The mask attribute is modified, assuming it is possible to create the mask. """ nbkpt = self.mask.sum() if nbkpt <= 2*self.nord: return -2 hmm = err[np.unique(err/self.npoly)]/self.npoly n = nbkpt - self.nord if np.any(hmm >= n): return -2 test = np.zeros(nbkpt, dtype='bool') for jj in range(-np.ceil(nord/2.0), nord/2.0): foo = np.where((hmm+jj) > 0, hmm+jj, np.zeros(hmm.shape, dtype=hmm.dtype)) inside = np.where((foo+nord) < n-1, foo+nord, np.zeros(hmm.shape, dtype=hmm.dtype)+n-1) test[inside] = True if test.any(): reality = self.mask[test] if self.mask[reality].any(): self.mask[reality] = False return -1 else: return -2 else: return -2
[docs]def cholesky_band(l, mininf=0.0, verbose=False): """Compute Cholesky decomposition of banded matrix. Parameters ---------- l : :class:`numpy.ndarray` A matrix on which to perform the Cholesky decomposition. mininf : :class:`float`, optional Entries in the `l` matrix are considered negative if they are less than this value (default 0.0). verbose : :class:`bool`, optional If set to ``True``, print some debugging information. Returns ------- :func:`tuple` If problems were detected, the first item will be the index or indexes where the problem was detected, and the second item will simply be the input matrix. If no problems were detected, the first item will be -1, and the second item will be the Cholesky decomposition. """ from warnings import warn from . import PydlutilsUserWarning lower = l.copy() bw, nn = lower.shape n = nn - bw negative = lower[0, 0:n] <= mininf if negative.any() or not np.all(np.isfinite(lower)): warn('Bad entries: ' + str(negative.nonzero()[0]), PydlutilsUserWarning) return (negative.nonzero()[0], l) kn = bw - 1 spot = np.arange(kn, dtype='i4') + 1 bi = np.arange(kn, dtype='i4') for i in range(1, kn): bi = np.append(bi, np.arange(kn-i, dtype='i4') + (kn+1)*i) for j in range(n): lower[0, j] = np.sqrt(lower[0, j]) lower[spot, j] /= lower[0, j] x = lower[spot, j] if not np.all(np.isfinite(x)): warn('NaN found in cholesky_band.', PydlutilsUserWarning) return (j, l) hmm = np.outer(x, x) here = bi+(j+1)*bw lower.T.flat[here] -= hmm.flat[bi] return (-1, lower)
[docs]def cholesky_solve(a, bb): """Solve the equation Ax=b where A is a Cholesky-banded matrix. Parameters ---------- a : :class:`numpy.ndarray` :math:`A` in :math:`A x = b`. bb : :class:`numpy.ndarray` :math:`b` in :math:`A x = b`. Returns ------- :func:`tuple` A tuple containing the status and the result of the solution. The status is always -1. """ b = bb.copy() bw = a.shape[0] n = b.shape[0] - bw kn = bw - 1 spot = np.arange(kn, dtype='i4') + 1 for j in range(n): b[j] /= a[0, j] b[j+spot] -= b[j]*a[spot, j] spot = kn - np.arange(kn, dtype='i4') for j in range(n-1, -1, -1): b[j] = (b[j] - np.sum(a[spot, j] * b[j+spot]))/a[0, j] return (-1, b)
[docs]def iterfit(xdata, ydata, invvar=None, upper=5, lower=5, x2=None, maxiter=10, **kwargs): """Iteratively fit a b-spline set to data, with rejection. Parameters ---------- xdata : :class:`numpy.ndarray` Independent variable. ydata : :class:`numpy.ndarray` Dependent variable. invvar : :class:`numpy.ndarray` Inverse variance of `ydata`. If not set, it will be calculated based on the standard deviation. upper : :class:`int` or :class:`float` Upper rejection threshold in units of sigma, defaults to 5 sigma. lower : :class:`int` or :class:`float` Lower rejection threshold in units of sigma, defaults to 5 sigma. x2 : :class:`numpy.ndarray`, optional Orthogonal dependent variable for 2d fits. maxiter : :class:`int`, optional Maximum number of rejection iterations, default 10. Set this to zero to disable rejection. Returns ------- :func:`tuple` A tuple containing the fitted bspline object and an output mask. """ from .math import djs_reject nx = xdata.size if ydata.size != nx: raise ValueError('Dimensions of xdata and ydata do not agree.') if invvar is not None: if invvar.size != nx: raise ValueError('Dimensions of xdata and invvar do not agree.') else: # # This correction to the variance makes it the same # as IDL's variance() # var = ydata.var()*(float(nx)/float(nx-1)) if var == 0: var = 1.0 invvar = np.ones(ydata.shape, dtype=ydata.dtype)/var if x2 is not None: if x2.size != nx: raise ValueError('Dimensions of xdata and x2 do not agree.') yfit = np.zeros(ydata.shape) if invvar.size == 1: outmask = True else: outmask = np.ones(invvar.shape, dtype='bool') xsort = xdata.argsort() maskwork = (outmask & (invvar > 0))[xsort] if 'oldset' in kwargs: sset = kwargs['oldset'] sset.mask = True sset.coeff = 0 else: if not maskwork.any(): raise ValueError('No valid data points.') # return (None,None) if 'fullbkpt' in kwargs: fullbkpt = kwargs['fullbkpt'] else: sset = bspline(xdata[xsort[maskwork]], **kwargs) if maskwork.sum() < sset.nord: print('Number of good data points fewer than nord.') return (sset, outmask) if x2 is not None: if 'xmin' in kwargs: xmin = kwargs['xmin'] else: xmin = x2.min() if 'xmax' in kwargs: xmax = kwargs['xmax'] else: xmax = x2.max() if xmin == xmax: xmax = xmin + 1 sset.xmin = xmin sset.xmax = xmax if 'funcname' in kwargs: sset.funcname = kwargs['funcname'] xwork = xdata[xsort] ywork = ydata[xsort] invwork = invvar[xsort] if x2 is not None: x2work = x2[xsort] else: x2work = None iiter = 0 error = 0 qdone = -1 while (error != 0 or qdone == -1) and iiter <= maxiter: goodbk = sset.mask.nonzero()[0] if maskwork.sum() <= 1 or not sset.mask.any(): sset.coeff = 0 iiter = maxiter + 1 else: if 'requiren' in kwargs: i = 0 while xwork[i] < sset.breakpoints[goodbk[sset.nord]] and i < nx-1: i += 1 ct = 0 for ileft in range(sset.nord, sset.mask.sum()-sset.nord+1): while (xwork[i] >= sset.breakpoints[goodbk[ileft]] and xwork[i] < sset.breakpoints[goodbk[ileft+1]] and i < nx-1): ct += invwork[i]*maskwork[i] > 0 i += 1 if ct >= kwargs['requiren']: ct = 0 else: sset.mask[goodbk[ileft]] = False error, yfit = sset.fit(xwork, ywork, invwork*maskwork, x2=x2work) iiter += 1 inmask = maskwork if error == -2: return (sset, outmask) elif error == 0: maskwork, qdone = djs_reject(ywork, yfit, invvar=invwork, inmask=inmask, outmask=maskwork, upper=upper, lower=lower) else: pass outmask[xsort] = maskwork temp = yfit yfit[xsort] = temp return (sset, outmask)