#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
The kernel density estimators.
"""
from __future__ import (division, print_function, absolute_import, unicode_literals)
import numpy as np
import numpy.ma as ma
from scipy.misc import logsumexp
from scipy import linalg as la
from scipy.cluster.vq import kmeans, vq
# Avoid log(0) warnings when weights go to 0
np.seterr(divide='ignore')
[docs]def optimized_kde(data, pool=None, kde=None, max_samples=None, **kwargs):
"""
Iteratively run a k-means clustering algorithm, estimating the distibution of each identified
cluster with an independent kernel density estimate. Starting with ``k = 1``, the distribution
is estimated and the Bayes Information criterion (BIC) is calculated. `k` is increased until
the BIC stops increasing.
:param data:
An `(N, ndim)`-shaped array, containing `N` samples from the target distribution.
:param pool: (optional)
A pool of processes with a :func:`map` function to use.
:param kde: (optional)
An old KDE to inherit samples from.
:param max_samples: (optional)
The maximum number of samples to use for constructing or updating the KDE. If a KDE is
supplied and adding the samples from it will go over this, old samples are thinned by
factors of two until under the limit.
:param kwargs: (optional)
Keyword arguments to pass to :class:`ClusteredKDE`.
:returns: :meth:`ClusteredKDE` that maximizes the BIC.
"""
# Trim data if too many samples were given
n_new = len(data)
if kde is None and n_new == 0:
return None
if max_samples is not None and max_samples <= n_new:
data = data[:max_samples]
else:
# Combine data, thinning old data if we need room
if kde is not None:
old_data = kde.data
if max_samples is not None:
nsamps = len(old_data) + n_new
while nsamps > max_samples:
old_data = old_data[::2]
nsamps = len(old_data) + n_new
if n_new == 0:
# If there's no new data, just use the old
data = old_data
else:
# Otherwise combine the old and the new
data = np.concatenate((old_data, data))
best_bic = -np.inf
best_kde = None
k = 1
while True:
try:
kde = ClusteredKDE(data, k, **kwargs)
bic = kde.bic(pool=pool)
except la.LinAlgError:
bic = -np.inf
if bic > best_bic:
best_kde = kde
best_bic = bic
else:
break
k += 1
return best_kde
[docs]class ClusteredKDE(object):
"""
Run a k-means clustering algorithm, estimating the distibution of each identified cluster with
an independent kernel density estimate. The full distibution is then estimated by combining the
individual KDE's, weighted by the fraction of samples assigned to each cluster.
:param data:
An `(N, ndim)`-shaped array, containing `N` samples from the target distribution.
:param k:
The number of clusters for k-means clustering.
"""
def __init__(self, data, k=1):
self._data = data
self._nclusters = k
self._mean = np.mean(data, axis=0)
self._std = np.std(data, axis=0)
# Cluster data that's mean 0 and scaled to unit width in each parameter independently
white_data = self._whiten(data)
self._centroids, _ = kmeans(white_data, k)
self._assignments, _ = vq(white_data, self.centroids)
self._kdes = [KDE(self.data[self.assignments == c]) for c in range(k)]
self._logweights = np.log([np.count_nonzero(self.assignments == c)/self.size
for c in range(k)])
[docs] def draw(self, size=1):
"""Draw `size` samples from the KDE."""
# Pick clusters randomly with the assigned weights
cumulative_weights = np.cumsum(np.exp(self._logweights))
clusters = np.searchsorted(cumulative_weights, np.random.rand(size))
draws = np.empty((size, self.ndim))
for cluster in range(self.nclusters):
sel = clusters == cluster
draws[sel] = self._kdes[cluster].draw(np.count_nonzero(sel))
return draws
[docs] def logpdf(self, pts, pool=None):
"""Evaluate the logpdf of the KDE at `pts`."""
logpdfs = [logweight + kde(pts, pool=pool)
for logweight, kde in zip(self._logweights, self._kdes)]
if len(pts.shape) == 1:
return logsumexp(logpdfs)
else:
return logsumexp(logpdfs, axis=0)
def _whiten(self, data):
"""Whiten `data`, probably before running k-means."""
return (data - self._mean)/self._std
def _color(self, data):
"""Recolor `data`, reversing :meth:`_whiten`."""
return data * self._std + self._mean
[docs] def bic(self, pool=None):
r"""
Evaluate Bayes Information Criterion for the KDE's estimate of the distribution
.. math::
\mathrm{BIC} = \mathrm{ln}\mathcal{L}_\mathrm{max} - \frac{d_m}{2} \mathrm{ln} N
where :math:`d_m` is the number of dimensions of the KDE model (:math:`n_\mathrm{clusters}
d` centroid location parameters, :math:`n_\mathrm{clusters} - 1` normalized weights, and
:math:`n_\mathrm{clusters} (d+1)*d/2` kernel covariance parameters, one matrix for each of
:math:`n_\mathrm{clusters}` clusters), and :math:`N` is the number of samples used to build
the KDE.
"""
log_l = np.sum(self.logpdf(self.data, pool=pool))
# Determine the total number of parameters in clustered-KDE
# Account for centroid locations
nparams = self.nclusters * self.ndim
# One for each cluster, minus one for constraint that all sum to unity
nparams += self.nclusters - 1
# Separate kernel covariances for each cluster
nparams += self.nclusters * (self.ndim + 1) * self.ndim/2
return log_l - nparams/2 * np.log(self.size)
@property
def data(self):
"""Samples used to build the KDE."""
return self._data
@property
def nclusters(self):
"""The number of clusters used for k-means."""
return self._nclusters
@property
def assignments(self):
"""Cluster assignments from k-means."""
return self._assignments
@property
def centroids(self):
"""Cluster centroids from k-means."""
return self._centroids
@property
def ndim(self):
"""The number of dimensions of the KDE."""
return self.data.shape[1]
@property
def size(self):
"""The number of samples used to build the KDE."""
return self.data.shape[0]
__call__ = logpdf
__len__ = size
[docs]class KDE(object):
"""
A Gaussian kernel density estimator that provides a means for evaluating the estimated
probability density function, and drawing additional samples from the estimated distribution.
Cholesky decomposition of the covariance matrix makes this class a bit more stable than the
:mod:`scipy`'s Gaussian KDE.
:param data:
An `(N, ndim)`-shaped array, containing `N` samples from the target distribution.
"""
def __init__(self, data):
self._data = np.atleast_2d(data)
self._mean = np.mean(data, axis=0)
self._cov = None
if self.data.shape[0] > 1:
try:
self._cov = np.cov(data.T)
# Try factoring now to see if regularization is needed
la.cho_factor(self._cov)
except la.LinAlgError:
self._cov = oas_cov(data)
self._set_bandwidth()
# store transformation variables for drawing random values
alphas = np.std(data, axis=0)
ms = 1./alphas
m_i, m_j = np.meshgrid(ms, ms)
ms = m_i * m_j
self._draw_cov = ms * self._kernel_cov
self._scale_fac = alphas
def __enter__(self):
return self
def _set_bandwidth(self):
r"""
Use Scott's rule to set the kernel bandwidth:
.. math::
\mathcal{K} = n^{-1/(d+4)} \Sigma^{1/2}
Also store Cholesky decomposition for later.
"""
if self.size > 0 and self._cov is not None:
self._kernel_cov = self._cov * self.size ** (-2/(self.ndim + 4))
# Used to evaluate PDF with cho_solve()
self._cho_factor = la.cho_factor(self._kernel_cov)
# Make sure the estimated PDF integrates to 1.0
self._lognorm = self.ndim/2 * np.log(2*np.pi) + np.log(self.size) +\
np.sum(np.log(np.diag(self._cho_factor[0])))
else:
self._lognorm = -np.inf
[docs] def draw(self, size=1):
"""
Draw samples from the estimated distribution.
"""
# Return nothing if this is an empty KDE
if self.size == 0:
return []
# Draw vanilla samples from a zero-mean multivariate Gaussian
draws = self._scale_fac * np.random.multivariate_normal(np.zeros(self.ndim),
self._draw_cov, size=size)
# Pick N random kernels as means
kernels = np.random.randint(0, self.size, size)
# Shift vanilla draws to be about chosen kernels
return self.data[kernels] + draws
[docs] def logpdf(self, pts, pool=None):
"""Evaluate the logpdf at `pts` as estimated by the KDE."""
pts = np.atleast_2d(pts)
npts, ndim = pts.shape
assert ndim == self.ndim
# Apply across the pool if it exists
if pool:
this_map = pool.map
else:
this_map = map
# Return -inf if this is an empty KDE
if np.isinf(self._lognorm):
results = np.zeros(npts) - np.inf
else:
args = [(pt, self.data, self._cho_factor) for pt in pts]
results = list(this_map(_evaluate_point_logpdf, args))
# Normalize and return
return np.array(results) - self._lognorm
@property
def data(self):
"""Samples used to build the KDE."""
return self._data
@property
def ndim(self):
"""The number of dimensions of the KDE."""
return self.data.shape[1]
@property
def size(self):
"""The number of samples used to build the KDE."""
return self.data.shape[0]
__len__ = size
__call__ = logpdf
[docs]def unique_spaces(mask):
"""
Determine the unique sets of dimensions based on a mask. Inverted 1D masks are returned to use
as selectors.
"""
ncols = mask.shape[1]
# Do some magic with views so `np.unique` can be used to find the unique sets of dimensions.
dtype = mask.dtype.descr * ncols
struct = mask.view(dtype)
uniq = np.unique(struct)
uniq = uniq.view(mask.dtype).reshape(-1, ncols)
return ~uniq
[docs]class TransdimensionalKDE(object):
"""
A generalized Gaussian kernel density estimator that reads masked arrays, constructs a
:class:`ClusteredKDE` using :func:`optimized_kde` for each unique parameter space, then weighs
the KDEs based on the number of samples in each parameter space.
:param data:
An `(N, max_dim)`-shaped masked array, containing N samples from the the target distribution.
:param kde: (optional)
An old trans-dimensional KDE to inherit samples from.
:param max_samples: (optional)
The maximum number of samples to use for constructing or updating the kde in each unique
parameter space. If a KDE is supplied and adding the samples from `data` will go over this,
old samples are thinned by factors of two until under the limit in each parameter space.
"""
def __init__(self, data, kde=None, max_samples=None, pool=None):
npts_new, max_ndim = data.shape
self._max_ndim = max_ndim
if kde is None:
# Save an (inverted) mask for each unique set of dimensions
self._spaces = unique_spaces(data.mask)
else:
# Inherit old space definitions, in case the new sample has no points in a subspace
self._spaces = kde.spaces
# Construct a separate clustered-KDE for each parameter space
weights = []
self._kdes = []
for space_id, space in enumerate(self.spaces):
# Construct a selector for the samples from this space
subspace = np.all(~data.mask == space, axis=1)
# Determine weights from only the new samples
npts_subspace = np.count_nonzero(subspace)
weight = npts_subspace/npts_new
weights.append(weight)
fixd_data = data[subspace]
if npts_subspace > 0:
fixd_data = np.asarray(fixd_data[~fixd_data.mask].reshape((npts_subspace, -1)))
old_kde = None
if kde is not None:
old_kde = kde.kdes[space_id]
self._kdes.append(optimized_kde(fixd_data, pool, old_kde, max_samples))
self._logweights = np.log(np.array(weights))
[docs] def draw(self, size=1, spaces=None):
"""
Draw samples from the transdimensional distribution.
"""
if spaces is not None:
if len(spaces) != size:
raise ValueError('Sample size inconsistent with number of spaces saved')
space_inds = np.empty(size)
for space_id, space in enumerate(self.spaces):
subspace = np.all(spaces == space, axis=1)
space_inds[subspace] = space_id
else:
# Draws spaces randomly with the assigned weights
cumulative_weights = np.cumsum(np.exp(self._logweights))
space_inds = np.searchsorted(cumulative_weights, np.random.rand(size))
draws = ma.masked_all((size, self._max_ndim))
for space_id in range(len(self.spaces)):
sel = space_inds == space_id
n_fixedd = np.count_nonzero(sel)
if n_fixedd > 0:
# Populate only the valid entries for this parameter space
draws[np.ix_(sel, self._spaces[space_id])] = self.kdes[space_id].draw(n_fixedd)
return draws
[docs] def logpdf(self, pts, pool=None):
"""Evaluate the log-transdimensional-pdf at `pts` as estimated by the KDE."""
logpdfs = []
for logweight, space, kde in zip(self._logweights,
self.spaces,
self.kdes):
# Calculate the probability for each parameter space individually
if np.all(space == ~pts.mask) and np.isfinite(logweight):
logpdfs.append(logweight + kde(pts[space], pool=pool))
return logsumexp(logpdfs, axis=0)
@property
def kdes(self):
"""List of fixed-dimension :meth:`ClusteredKDE` s"""
return self._kdes
@property
def spaces(self):
"""Unique sets of dimensions, usable as selectors."""
return self._spaces
__call__ = logpdf
def _evaluate_point_logpdf(args):
"""
Evaluate the Gaussian KDE at a given point `p`. This lives outside the KDE method to allow for
parallelization using :mod:`multipocessing`. Since :func:`map` only allows single-argument
functions, the following arguments to be packed into a single tuple.
:param p:
The point to evaluate the KDE at.
:param data:
The `(N, ndim)`-shaped array of data used to construct the KDE.
:param cho_factor:
A Cholesky decomposition of the kernel covariance matrix.
"""
point, data, cho_factor = args
# Use Cholesky decomposition to avoid direct inversion of covariance matrix
diff = data - point
tdiff = la.cho_solve(cho_factor, diff.T, check_finite=False).T
diff *= tdiff
# Work in the log to avoid large numbers
return logsumexp(-np.sum(diff, axis=1)/2)
[docs]def oas_cov(pts):
r"""
Estimate the covariance matrix using the Oracle Approximating Shrinkage algorithm
.. math::
(1 - s)\Sigma + s \mu \mathcal{I}_d
where :math:`\mu = \mathrm{tr}(\Sigma) / d`. This ensures the covariance matrix estimate is
well behaved for small sample sizes.
:param pts:
An `(N, ndim)`-shaped array, containing `N` samples from the target distribution.
This follows the implementation in `scikit-learn
<https://github.com/scikit-learn/scikit-learn/blob/31c5497/
sklearn/covariance/shrunk_covariance_.py>`_.
"""
pts = np.atleast_2d(pts)
npts, ndim = pts.shape
emperical_cov = np.cov(pts.T)
mean = np.trace(emperical_cov) / ndim
alpha = np.mean(emperical_cov * emperical_cov)
num = alpha + mean * mean
den = (npts + 1) * (alpha - (mean * mean) / ndim)
shrinkage = min(num / den, 1)
shrunk_cov = (1 - shrinkage) * emperical_cov
shrunk_cov[np.diag_indices(ndim)] += shrinkage * mean
return shrunk_cov