Source code for pyselfi.likelihood

#!/usr/bin/env python
#-------------------------------------------------------------------------------------
# pySELFI v2.0 -- src/pyselfi/likelihood.py
# Copyright (C) 2019-2023 Florent Leclercq.
#
# This file is part of the pySELFI distribution
# (https://github.com/florent-leclercq/pyselfi/)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# The text of the license is located in the root directory of the source package.
#-------------------------------------------------------------------------------------

"""Routines related to the SELFI effective likelihood.

.. _Leclercqetal2019: https://arxiv.org/abs/1902.10149

.. |Leclercqetal2019| replace:: Leclercq *et al*. (2019)
"""

__author__  = "Florent Leclercq"
__version__ = "2.0"
__date__    = "2018-2023"
__license__ = "GPLv3"

[docs]class likelihood(object): """This class represents the SELFI likelihood. See equations (11)-(15) and (17)-(18) in |Leclercqetal2019|_ for expressions. Attributes ---------- blackbox : :obj:`blackbox` the blackbox simulator, defined as explained in the `SELFI documentation <../usage/new_blackbox.html>`__ theta_0 : array, double, dimension=S expansion point in parameter space Ne : int number of simulations at the expansion point, to compute the covariance matrix. Ns : int number of simulations per expansion direction in parameter space. (Ne may be larger than Ns, in which case only the first Ns simulations at the expansion point are used to compute the gradient) Delta_theta : double step size for finite differencing to compute the blackbox gradient in parameter space """ # Initialization def __init__(self,blackbox,theta_0,Ne,Ns,Delta_theta): """Initializes a likelihood object. """ self.blackbox=blackbox self.theta_0=theta_0 self.S=theta_0.shape[0] self.P=blackbox.P self.Ne=Ne self.Ns=Ns self.Delta_theta=Delta_theta self.EPS_Sigma=1e-7 self.EPS_residual=1e-3
[docs] def _get_perturb_theta(self,d,h): """Computes the perturbed theta = theta_0 + Delta_theta. See equation (18) in |Leclercqetal2019|_. Parameters ---------- d : int index giving the direction in parameter space, from 0 to S h : double step size in parameter space Returns ------- theta : double theta_0 if d=0, or theta_0 + Delta_theta if d is from 1 to S """ import numpy as np theta_0=self.theta_0 if d==0: return theta_0 else: Delta_theta = np.zeros_like(theta_0) Delta_theta[d-1] = h return theta_0 + Delta_theta
[docs] def _compute_Phi_d(self,pool_fname,d,N=None,h=None): """Computes a pool of blackbox simulations at one of the required points in parameter space. Parameters ---------- pool_fname : :obj:`str` filename of the pool d : int direction in parameter space, from 0 to S N : int, optional, default=class value for Ne if d=0, or Ns otherwise number of simulations per expansion direction in parameter space h : double, optional, default=class value step size for finite differencing to compute the blackbox gradient in parameter space """ N=N or (self.Ne if d == 0 else self.Ns) h=h or self.Delta_theta theta_d = self._get_perturb_theta(d,h) Phi_d = self.blackbox.compute_pool(theta_d, d, pool_fname, N) setattr(self, 'Phi_'+str(d), Phi_d)
[docs] @classmethod def _covariance(cls,Phi): """Returns the estimated covariance of a set of summaries, with the prefactor for a virtual signal field. See equations (13)-(14) in |Leclercqetal2019|_. Parameters ---------- Phi : array, double, dimension=(N,P) a set of N simulations Returns ------- Sigma^hat_theta' : array, double, dimension=(P,P) (N+1)/N * Sigma^hat_theta where Sigma^hat_theta is the unbiased covariance estimator """ import numpy as np from ctypes import c_double # Setup the correction factor for the evaluated covariance matrix N=Phi.shape[0] covariance_prefactor=(N+1)/N # Return the evaluated covariance (using the unbiased estimator) return covariance_prefactor * np.cov(Phi.T,ddof=1).astype(c_double)
[docs] def compute_C_0(self): """Computes the estimated covariance of a set of summaries at the expansion point, C_0, and log|2piC_0|. Assumes that Phi_0 has already been computed. """ import numpy as np self.C_0 = self._covariance(self.Phi_0.Phi) self.logdet2piC_0 = np.linalg.slogdet(2*np.pi*self.C_0)[1]
[docs] @classmethod def _inverse_covariance(cls,Phi,covariance,EPS_Sigma=1e-7,EPS_residual=1e-3): """Computes the unbiased inverse of an estimated covariance matrix, using the `Hartlap, Simon & Schneider (2007) <https://arxiv.org/abs/astro-ph/0608064>`__ correction factor. See equations (13) and (15) in |Leclercqetal2019|_. Parameters ---------- Phi : array, double, dimension=(N,P) a set of N simulations covariance : array, double, dimension=(P,P) estimated covariance matrix EPS_Sigma : double, optional, default=1e-7 epsilon to be added to the diagonal of the covariance matrix, if necessary EPS_residual : double, optional, default=1e-3 maximum residual in covariance*covariance^{-1} before attempting regularization Returns ------- Sigma^hat_theta^{-1}' : array, double, dimension=(P,P) alpha*(Sigma^hat_theta')^{-1} where (Sigma^hat_theta')^{-1} is the inverse sample covariance """ from pyselfi.utils import regular_inv # Setup the Hartlap, Simon & Schneider (2007) correction factor N=Phi.shape[0] P=Phi.shape[1] alpha = (N-P-2.)/(N-1.) if P<N-2 else 1. # Compute the inverse sample covariance inv_covariance = regular_inv(covariance, EPS_Sigma, EPS_residual) return alpha*inv_covariance
[docs] def compute_inv_C_0(self): """Computes the estimated inverse covariance of a set of summaries at the expansion point, inv_C_0. Assumes that Phi_0 has already been computed. """ self.inv_C_0 = self._inverse_covariance(self.Phi_0.Phi, self.C_0, self.EPS_Sigma, self.EPS_residual)
[docs] @classmethod def _average(cls,Phi,N): """Returns the average of the first N of a set of summaries. Parameters ---------- Phi : array, double, dimension=(NN,P) a set of NN simulations N : int take the average of the first N simulations """ import numpy as np return np.mean(Phi[:N],axis=0)
[docs] def _compute_f_d(self,d,Ns): """Computes the average blackbox at one of the required points in parameter space. Assumes Phi_d has already been computed. Parameters ---------- d : int direction in parameter space, from 0 to S Ns : int number of simulations to be used """ f_d = self._average(getattr(self, 'Phi_'+str(d)).Phi, Ns) setattr(self, 'f_'+str(d), f_d)
[docs] def _compute_grad_d(self,d,h): """Computes the gradient in one direction of parameter space, see equation (18) in |Leclercqetal2019|_. Assumes that f_0 and f_d have already been computed. Parameters ---------- d : int direction in parameter space, from 1 to S h : double step size for finite differencing """ f_0 = self.f_0 f_d = getattr(self, 'f_'+str(d)) return (f_d-f_0)/h
[docs] def _gradient(self,Ns=None,h=None): """Returns the gradient of the linearised blackbox. Parameters ---------- Ns : int, optional, default=class value number of simulations per expansion direction in parameter space h : double, optional, default=class value step size for finite differencing to compute the blackbox gradient in parameter space Returns ------- grad_f : array, double, dimension=(P,S) gradient of the linearised blackbox """ import numpy as np from ctypes import c_double Ns=Ns or self.Ns h=h or self.Delta_theta grad_f = np.zeros((self.S, self.P), dtype=c_double) # Compute the average blackbox along all directions for d in range(self.S+1): self._compute_f_d(d,Ns) # Evaluate the gradients along all directions for d in range(1,self.S+1): grad_f[d-1] = self._compute_grad_d(d,h) grad_f = grad_f.T return grad_f
[docs] def compute_gradient(self,Ns=None,h=None): """Computes the gradient of the linearised blackbox. Parameters ---------- Ns : int, optional, default=class value number of simulations per expansion direction in parameter space h : double, optional, default=class value step size for finite differencing to compute the blackbox gradient in parameter space """ self.grad_f = self._gradient(Ns,h)
[docs] def run_simulations(self, pool_prefix, pool_suffix, d=None, Ne=None, Ns=None, h=None): """Runs some or all the necessary simulations to compute the likelihood. All the necessary simulations amount to Ne + Ns*S blackbox evaluations. Parameters ---------- pool_prefix : :obj:`str` address and prefix of the filenames for simulation pools pool_suffix : :obj:`str` suffix of the filenames for simulation pools d : int or array of int, optional, default=None directions in parameter space, from 0 to S. If set to None, all simulations are run or loaded Ne : int, optional, default=class value number of simulations at the expansion point, to compute the covariance matrix Ns : int, optional, default=class value number of simulations per expansion direction in parameter space. (Ne may be larger than Ns, in which case only the first Ns simulations at the expansion point are used to compute the gradient) h : double, optional, default=class value step size for finite differencing to compute the blackbox gradient in parameter space """ # define or get the required directions to be treated import numpy as np if d is None: d=list(np.arange(0,self.S+1)) else: d=list(np.atleast_1d(d)) # run simulations at the expansion point if 0 in d: pool_fname=pool_prefix+"0"+pool_suffix self._compute_Phi_d(pool_fname, 0, Ne, h) d.remove(0) # run simulations along all directions for this_d in d: pool_fname=pool_prefix+str(this_d)+pool_suffix self._compute_Phi_d(pool_fname, this_d, Ns, h)
[docs] def compute(self,Ns=None,h=None): """Computes all the necessary quantities to characterize the likelihood (estimated covariance matrix, its inverse, and the gradient). Parameters ---------- Ns : int, optional, default=class value number of simulations per expansion direction in parameter space h : double, optional, default=class value step size for finite differencing to compute the blackbox gradient in parameter space """ self.compute_C_0() self.compute_inv_C_0() self.compute_gradient(Ns,h)
[docs] def data_model(self,theta): """Evaluates the linearised data model at a given point in parameter space, see equation (17) in |Leclercqetal2019|_. Parameters ---------- theta : array, double, dimension=S evaluation point in parameter space """ f_0 = self.f_0 grad_f = self.grad_f theta_0 = self.theta_0 f_theta = f_0 + grad_f.dot(theta-theta_0) return f_theta
[docs] def logpdf(self,theta,phi_obs): """Returns the log likelihood probability at a given point in parameter space, see equation (19) in |Leclercqetal2019|_. Parameters ---------- theta : array, double, dimension=S evaluation point in parameter space phi_obs : array, double, dimension=P observed summaries vector Returns ------- logpdf : double log likelihood probability """ phi_pred = self.data_model(theta) diff = phi_obs-phi_pred inv_C_0 = self.inv_C_0 logdet2piC_0 = self.logdet2piC_0 return -diff.dot(inv_C_0).dot(diff)/2. - logdet2piC_0/2.
[docs] def save(self,fname): """Saves the likelihood to an output file. Parameters ---------- fname : :obj:`str` output filename """ import h5py as h5 from ctypes import c_double, c_double from pyselfi.utils import PrintMessage, save_replace_dataset, save_replace_attr PrintMessage(3, "Writing likelihood in data file '{}'...".format(fname)) with h5.File(fname, 'r+') as hf: for d in range(self.S+1): save_replace_dataset(hf, '/likelihood/f_'+str(d), data=getattr(self,'f_'+str(d)).astype(c_double), maxshape=(None,), dtype=c_double) save_replace_dataset(hf, '/likelihood/C_0', data=self.C_0.astype(c_double), maxshape=(None, None), dtype=c_double) save_replace_attr(hf, '/likelihood/logdet2piC_0', self.logdet2piC_0, dtype=c_double) save_replace_dataset(hf, '/likelihood/inv_C_0', data=self.inv_C_0.astype(c_double), maxshape=(None, None), dtype=c_double) save_replace_dataset(hf, '/likelihood/grad_f', data=self.grad_f.astype(c_double), maxshape=(None, None), dtype=c_double) PrintMessage(3, "Writing likelihood in data file '{}' done.".format(fname))
[docs] def load(self,fname): """Loads the likelihood from an input file. Parameters ---------- fname : :obj:`str` input filename """ import numpy as np import h5py as h5 from ctypes import c_double from pyselfi.utils import PrintMessage PrintMessage(3, "Reading likelihood in data file '{}'...".format(fname)) with h5.File(fname, 'r') as hf: for d in range(self.S+1): setattr(self, 'f_'+str(d), np.array(hf.get('/likelihood/f_'+str(d)), dtype=c_double).astype(c_double)) self.C_0 = np.array(hf.get('/likelihood/C_0'), dtype=c_double).astype(c_double) self.logdet2piC_0 = hf.attrs['/likelihood/logdet2piC_0'] self.inv_C_0 = np.array(hf.get('/likelihood/inv_C_0'), dtype=c_double).astype(c_double) self.grad_f = np.array(hf.get('/likelihood/grad_f'), dtype=c_double).astype(c_double) PrintMessage(3, "Reading likelihood from in file '{}' done.".format(fname))
#end class(likelihood)