#!/usr/bin/env python
#-------------------------------------------------------------------------------------
# pySELFI v2.0 -- src/pyselfi_examples/simbelmyne/model/blackbox_SBMY.py
# Copyright (C) 2019-2023 Florent Leclercq.
#
# This file is part of the pySELFI distribution
# (https://github.com/florent-leclercq/pyselfi/)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# The text of the license is located in the root directory of the source package.
#-------------------------------------------------------------------------------------
"""A blackbox to generate synthetic galaxy observations and evaluate their power spectrum, using the Simbelynë code.
"""
__author__ = "Florent Leclercq"
__version__ = "2.0"
__date__ = "2018-2023"
__license__ = "GPLv3"
[docs]class blackbox(object):
"""This class represents a SELFI blackbox.
Attributes
----------
P : int
number of output summary statistics. This is the only mandatory argument
theta2P : func
function to go from input parameters theta to cosmological power spectrum
k_s : array, double, dimension=S
vector of support wavenumbers
G_sim : :obj:`FourierGrid`
Fourier grid of the simulation
G_ss : :obj:`FourierGrid`
Fourier grid on which to compute the output summaries of the blackbox
P_ss : :obj:`PowerSpectrum`
fiducial summaries at the expansion point, to normalize the output of the blackbox
corner0 : double
x-position of the corner of the box with respect to the observer (which is at (0,0,0))
corner1 : double
y-position of the corner of the box with respect to the observer (which is at (0,0,0))
corner2 : double
z-position of the corner of the box with respect to the observer (which is at (0,0,0))
Np0 : int
particle grid size x
Npm0 : int
simulation particle-mesh grid size x
fdir : :obj:`str`
directory for inference outputs
fsimdir : :obj:`str`
directory for simulation outputs
fname_inputsurveygeometry : :obj:`str`
input survey geometry filename
b_cut : double
cut in galactic latitude to generate the mock survey geometry
bright_apparent_magnitude_cut : double
bright cut in absolute magnitude for Schechter completeness
faint_apparent_magnitude_cut : double
faint cut in apparent magnitude for Schechter completeness
bright_absolute_magnitude_cut : double
bright cut in absolute magnitude for Schechter completeness
faint_absolute_magnitude_cut : double
faint cut in absolute magnitude for Schechter completeness
Mstar : double
Schechter completeness parameter
alpha : double
Schechter completeness parameter
save_frequency : int
saves the output on the blackbox on disk each save_frequency evaluations
"""
# Initialization
def __init__(self, P, **kwargs):
"""Initializes the blackbox object.
"""
self.P=P
for key, value in kwargs.items():
setattr(self,key,value)
[docs] def _save_cosmo(self, cosmo, fname_cosmo, force_cosmo=False):
"""Saves cosmological parameters in json format.
Parameters
----------
cosmo : dictionary
cosmological parameters (and some infrastructure parameters) to be saved
fname_cosmo : :obj:`str`
name of the output json file
force_cosmo : bool, optional, default=False
overwrite if the file already exists?
"""
import json
from os.path import exists
if not exists(fname_cosmo) or force_cosmo:
with open(fname_cosmo, 'w') as fp:
json.dump(cosmo, fp)
[docs] def _get_powerspectrum_from_cosmo(self, cosmo, fname_powerspectrum, force_powerspectrum=False):
"""Loads or computes the power spectrum from input cosmological parameters.
Parameters
----------
cosmo : dictionary
cosmological parameters (and some infrastructure parameters)
fname_powerspectrum : :obj:`str`
name of input/output power spectrum file
force : bool, optional, default=False
force recomputation?
"""
from os.path import exists
from pysbmy.power import PowerSpectrum
if exists(fname_powerspectrum) and not force_powerspectrum:
P=PowerSpectrum.read(fname_powerspectrum)
else:
G_sim=self.G_sim
L0=G_sim.L0; L1=G_sim.L1; L2=G_sim.L2; N0=G_sim.N0; N1=G_sim.N2; N2=G_sim.N2
P=PowerSpectrum(L0,L1,L2,N0,N1,N2,cosmo)
P.write(fname_powerspectrum)
[docs] def _get_powerspectrum_from_theta(self, theta, fname_powerspectrum, force_powerspectrum=False):
"""Returns a power spectrum from its value at support wavenumbers, by performing a Spline interpolation.
Parameters
----------
theta : array, double, dimension=S
vector of power spectrum values at the support wavenumbers
fname_powerspectrum : :obj:`str`
name of input/output power spectrum file
force_powerspectrum : bool, optional, default=False
force recomputation?
"""
from os.path import exists
from scipy.interpolate import InterpolatedUnivariateSpline
from pysbmy.power import PowerSpectrum
if exists(fname_powerspectrum) and not force_powerspectrum:
P=PowerSpectrum.read(fname_powerspectrum)
else:
G_sim=self.G_sim
P=self.theta2P(theta)
Spline=InterpolatedUnivariateSpline(self.k_s, P, k=5)
powerspectrum=Spline(G_sim.k_modes)
powerspectrum[0]=0. # fix zero-mode by hand
P=PowerSpectrum.from_FourierGrid(G_sim,powerspectrum=powerspectrum)
P.write(fname_powerspectrum)
[docs] def _setup_parfiles(self, d, fname_simparfile, fname_mockparfile, fname_powerspectrum, fname_RngStateLPT, fname_whitenoise, fname_outputinitialdensity, fname_outputrealspacedensity, fname_outputdensity, fname_RngStateMock, fname_outputmock, fname_outputss, force_parfiles=False):
"""Sets up Simbelynë parameter file given the necessary inputs (see the Simbelynë documentation for details).
Parameters
----------
d : int
index giving the direction in parameter space: -1 for mock data, 0 for the expansion point, or from 1 to S
fname_simparfile : :obj:`str`
name of output simulation parameter file
fname_mockparfile : :obj:`str`
name of output mock parameter file
fname_powerspectrum : :obj:`str`
name of input power spectrum file
fname_RngStateLPT : :obj:`str`
name of output random number generator state file for LPT module
fname_whitenoise : :obj:`str`
name of output white noise file
fname_outputinitialdensity : :obj:`str`
name of output initial density field file
fname_outputrealspacedensity : :obj:`str`
name of output real-space density field file
fname_outputdensity : :obj:`str`
name of output redshift-space density field file
fname_RngStateMock : :obj:`str`
name of output random number generator state file for Mock module
fname_outputmock : :obj:`str`
name of output mock field file
fname_outputss : :obj:`str`
name of output summary statistics file
force_parfiles : bool, optional, default=False
overwrite if files already exists?
"""
from os.path import exists
from pysbmy import param_file
Particles=self.Np0
Mesh=self.G_sim.N0
BoxSize=self.G_sim.L0
corner0=self.corner0
corner1=self.corner1
corner2=self.corner2
InputRngStateLPT=self.fsimdir+"/RngStateLPT.rng"
OutputRngStateLPT=self.fsimdir+"/RngStateLPT.rng"
WriteInitialConditions=0
RedshiftLPT=19.
ModulePMCOLA=1
EvolutionMode=2
ParticleMesh=self.Npm0
NumberOfTimeSteps=20
RedshiftFCs=0
WriteFinalSnapshot=0
WriteFinalDensity=0
InputRngStateMocks=self.fsimdir+"/RngStateMocks.rng"
OutputRngStateMocks=self.fsimdir+"/RngStateMocks.rng"
NoiseModel=0
NumberOfNoiseRealizations=1
InputSurveyGeometry=self.fsimdir+"/"+self.fname_inputsurveygeometry
InputSummaryStatskGrid=self.fdir+"G_ss.h5"
WriteMocks=0
if not exists(fname_simparfile) or force_parfiles:
if d==-1: # mock data
S=param_file(ModuleLPT=1,
Particles=Particles,
Mesh=Mesh,
BoxSize=BoxSize,
corner0=corner0,
corner1=corner1,
corner2=corner2,
InputRngStateLPT=InputRngStateLPT,
OutputRngStateLPT=OutputRngStateLPT,
ICsMode=0,
WriteICsRngState=1,
OutputICsRngState=fname_RngStateLPT,
WriteWhiteNoise=1,
OutputWhiteNoise=fname_whitenoise,
WriteInitialConditions=1,
OutputInitialConditions=fname_outputinitialdensity,
InputPowerSpectrum=fname_powerspectrum,
RedshiftLPT=RedshiftLPT,
WriteLPTSnapshot=0,
WriteLPTDensity=0,
ModulePMCOLA=ModulePMCOLA,
EvolutionMode=EvolutionMode,
ParticleMesh=ParticleMesh,
NumberOfTimeSteps=NumberOfTimeSteps,
RedshiftFCs=RedshiftFCs,
WriteFinalSnapshot=WriteFinalSnapshot,
WriteFinalDensity=1,
OutputFinalDensity=fname_outputrealspacedensity,
ModuleRSD=1,
DoNonLinearMapping=1,
WriteRSSnapshot=0,
WriteRSDensity=1,
OutputRSDensity=fname_outputdensity
)
elif d==0: # expansion point
S=param_file(ModuleLPT=1,
Particles=Particles,
Mesh=Mesh,
BoxSize=BoxSize,
corner0=corner0,
corner1=corner1,
corner2=corner2,
InputRngStateLPT=InputRngStateLPT,
OutputRngStateLPT=OutputRngStateLPT,
ICsMode=0,
WriteICsRngState=1,
OutputICsRngState=fname_RngStateLPT,
WriteWhiteNoise=1,
OutputWhiteNoise=fname_whitenoise,
WriteInitialConditions=WriteInitialConditions,
OutputInitialConditions=fname_outputinitialdensity,
InputPowerSpectrum=fname_powerspectrum,
RedshiftLPT=RedshiftLPT,
WriteLPTSnapshot=0,
WriteLPTDensity=0,
ModulePMCOLA=ModulePMCOLA,
EvolutionMode=EvolutionMode,
ParticleMesh=ParticleMesh,
NumberOfTimeSteps=NumberOfTimeSteps,
RedshiftFCs=RedshiftFCs,
WriteFinalSnapshot=WriteFinalSnapshot,
WriteFinalDensity=WriteFinalDensity,
OutputFinalDensity=fname_outputrealspacedensity,
ModuleRSD=1,
DoNonLinearMapping=1,
WriteRSSnapshot=0,
WriteRSDensity=1,
OutputRSDensity=fname_outputdensity
)
else: # points needed for gradients
S=param_file(ModuleLPT=1,
Particles=Particles,
Mesh=Mesh,
BoxSize=BoxSize,
corner0=corner0,
corner1=corner1,
corner2=corner2,
InputRngStateLPT=InputRngStateLPT,
OutputRngStateLPT=OutputRngStateLPT,
ICsMode=1,
WriteICsRngState=0,
InputWhiteNoise=fname_whitenoise,
WriteInitialConditions=0,
InputPowerSpectrum=fname_powerspectrum,
RedshiftLPT=RedshiftLPT,
WriteLPTSnapshot=0,
WriteLPTDensity=0,
ModulePMCOLA=ModulePMCOLA,
EvolutionMode=EvolutionMode,
ParticleMesh=ParticleMesh,
NumberOfTimeSteps=NumberOfTimeSteps,
RedshiftFCs=RedshiftFCs,
WriteFinalSnapshot=WriteFinalSnapshot,
WriteFinalDensity=WriteFinalDensity,
ModuleRSD=1,
DoNonLinearMapping=1,
WriteRSSnapshot=0,
WriteRSDensity=1,
OutputRSDensity=fname_outputdensity
)
S.write(fname_simparfile)
if not exists(fname_mockparfile) or force_parfiles:
if d<=0: # mock data or expansion point
S=param_file(ModuleLPT=0,
ModulePMCOLA=0,
ModuleRSD=0,
ModuleMocks=1,
InputRngStateMocks=InputRngStateMocks,
OutputRngStateMocks=OutputRngStateMocks,
WriteMocksRngState=1,
OutputMocksRngState=fname_RngStateMock,
NoiseModel=NoiseModel,
NumberOfNoiseRealizations=NumberOfNoiseRealizations,
InputDensityMocks=fname_outputdensity,
InputSurveyGeometry=InputSurveyGeometry,
InputSummaryStatskGrid=InputSummaryStatskGrid,
WriteMocks=WriteMocks,
OutputMockBase=fname_outputmock,
WriteSummaryStats=1,
OutputSummaryStats=fname_outputss
)
else: # points needed for gradients
S=param_file(ModuleLPT=0,
ModulePMCOLA=0,
ModuleRSD=0,
ModuleMocks=1,
InputRngStateMocks=fname_RngStateMock,
WriteMocksRngState=0,
NoiseModel=NoiseModel,
NumberOfNoiseRealizations=NumberOfNoiseRealizations,
InputDensityMocks=fname_outputdensity,
InputSurveyGeometry=InputSurveyGeometry,
InputSummaryStatskGrid=InputSummaryStatskGrid,
WriteMocks=WriteMocks,
OutputMockBase=fname_outputmock,
WriteSummaryStats=1,
OutputSummaryStats=fname_outputss
)
S.write(fname_mockparfile)
[docs] def _run_sim(self, fname_simparfile, fname_simlogs, fname_outputdensity, force_sim=False):
"""Runs a simulation with Simbelynë.
Parameters
----------
fname_simparfile : :obj:`str`
name of the input parameter file
fname_simlogs : :obj:`str`
name of the output Simbelynë logs
fname_outputdensity : :obj:`str`
name of the output density field to be written
force_sim : bool, optional, default=False
force recomputation if output density already exists?
"""
from os.path import exists
if not exists(fname_outputdensity) or force_sim:
from pysbmy import pySbmy
pySbmy(fname_simparfile, fname_simlogs)
[docs] def _run_mock(self, fname_mockparfile, fname_mocklogs, fname_outputss, force_mock=False):
"""Runs a simulation with Simbelynë for mock data.
Parameters
----------
fname_mockparfile : :obj:`str`
name of the input parameter file
fname_mocklogs : :obj:`str`
name of the output Simbelynë logs
fname_outputss : :obj:`str`
name of the output summary statistics file to be written
force_mock : bool, optional, default=False
force recomputation if output summary statistics file already exists?
"""
from os.path import exists
if not exists(fname_outputss) or force_mock:
from pysbmy import pySbmy
pySbmy(fname_mockparfile, fname_mocklogs)
[docs] def _compute_Phi(self, fname_outputss):
"""Computes summary statistics (Phi) from Simbelynë output files.
Parameters
----------
fname_outputss : :obj:`str`
name of Simbelynë output summary statistics file
Returns
-------
Phi : array, double, dimension=P
vector of summary statistics
"""
import h5py as h5
from pysbmy import c_float
# Read the Simbelynë output
with h5.File(fname_outputss) as f:
# Normalize the output
Phi = f['scalars']['Pk'][0][0]/self.P_ss.powerspectrum
# Convert to c_float
Phi = Phi.astype(c_float)
return Phi
[docs] def _compute_Phi_DM(self, fname_outputdensity):
"""Compute summary statistics (Phi) from Simbelynë output files. Alternative routine to _compute_Phi using the dark matter field instead of galaxies, for testing purposes.
Parameters
----------
fname_outputdensity : :obj:`str`
name of Simbelynë output density field file
Returns
-------
Phi : array, double, dimension=P
vector of summary statistics
"""
from pysbmy.field import read_field
from pysbmy.correlations import get_autocorrelation
A=read_field(fname_outputdensity)
Pk,Vk=get_autocorrelation(A, self.G_ss)
return Pk/self.P_ss.powerspectrum
[docs] def _clean_sbmy_outputs(self, fname_outputdensity, fname_outputmock, fname_outputss):
"""Removes Simbelynë output files to save disk space.
Parameters
----------
fname_outputdensity : :obj:`str`
name of output redshift-space density field file
fname_outputmock : :obj:`str`
name of output mock field file
fname_outputss : :obj:`str`
name of output summary statistics file
"""
from os.path import exists
from os import remove
# Remove the Simbelynë outputs
if exists(fname_outputdensity): remove(fname_outputdensity)
if exists(fname_outputmock): remove(fname_outputmock)
if exists(fname_outputss): remove(fname_outputss)
[docs] def _aux_blackbox(self, d, fname_powerspectrum, fname_simparfile, fname_mockparfile, fname_RngStateLPT, fname_whitenoise, fname_outputinitialdensity, fname_outputrealspacedensity, fname_outputdensity, fname_RngStateMock, fname_outputmock, fname_outputss, fname_simlogs, fname_mocklogs, force_parfiles=False, force_sim=False, force_mock=False):
"""Auxiliary routine for the Simbelynë blackbox: generates a noisy realization from an input power spectrum object, and returns its normalized estimated power spectrum.
Parameters
----------
d : int
index giving the direction in parameter space: -1 for mock data, 0 for the expansion point, or from 1 to S
fname_powerspectrum : :obj:`str`
name of input power spectrum file
fname_simparfile : :obj:`str`
name of output simulation parameter file
fname_mockparfile : :obj:`str`
name of output mock parameter file
fname_RngStateLPT : :obj:`str`
name of output random number generator state file for LPT module
fname_whitenoise : :obj:`str`
name of output white noise file
fname_outputinitialdensity : :obj:`str`
name of output initial density field file
fname_outputrealspacedensity : :obj:`str`
name of output real-space density field file
fname_outputdensity : :obj:`str`
name of output redshift-space density field file
fname_RngStateMock : :obj:`str`
name of output random number generator state file for Mock module
fname_outputmock : :obj:`str`
name of output mock field file
fname_outputss : :obj:`str`
name of output summary statistics file
fname_simlogs : :obj:`str`
name of the output Simbelynë logs
fname_mocklogs : :obj:`str`
name of the output Simbelynë logs
force_parfiles : bool, optional, default=False
overwrite if parameter files already exists?
force_sim : bool, optional, default=False
force recomputation if output density already exists?
force_mock : bool, optional, default=False
force recomputation if output mock summary statistics file already exists?
Returns
-------
Phi : array, double, dimension=P
vector of summary statistics
"""
# Prepare parameter files
self._setup_parfiles(d, fname_simparfile, fname_mockparfile, fname_powerspectrum, fname_RngStateLPT, fname_whitenoise, fname_outputinitialdensity, fname_outputrealspacedensity, fname_outputdensity, fname_RngStateMock, fname_outputmock, fname_outputss, force_parfiles)
# Run simulations
self._run_sim(fname_simparfile, fname_simlogs, fname_outputdensity, force_sim)
self._run_mock(fname_mockparfile, fname_mocklogs, fname_outputss, force_mock)
Phi = self._compute_Phi(fname_outputss)
#Phi = self._compute_Phi_DM(fname_outputdensity) # TEST: Alternatively, the dark matter field can be used instead of galaxies
return Phi
[docs] def make_data(self, cosmo, i=0, force_powerspectrum=False, force_parfiles=False, force_sim=False, force_mock=False, force_cosmo=False):
"""Evaluates the Simbelynë blackbox to make mock data, from input cosmological parameters.
Parameters
----------
cosmo : dictionary
cosmological parameters (and some infrastructure parameters)
i : int, optional, default=0
current evaluation index of the blackbox
force_powerspectrum : bool, optional, default=False
force recomputation of the power spectrum?
force_parfiles : bool, optional, default=False
overwrite if parameter files already exists?
force_sim : bool, optional, default=False
force recomputation if output density already exists?
force_mock : bool, optional, default=False
force recomputation if output mock summary statistics file already exists?
Returns
-------
Phi : array, double, dimension=P
vector of summary statistics
"""
from pyselfi.utils import PrintMessage, INDENT, UNINDENT
PrintMessage(3, "Making mock data...")
INDENT()
# Define filenames
fname_cosmo = self.fsimdir+"/data/input_cosmo_"+str(i)+".json"
fname_powerspectrum = self.fsimdir+"/data/input_power_"+str(i)+".h5"
fname_simparfile = self.fsimdir+"/data/sim_"+str(i)+".sbmy"
fname_mockparfile = self.fsimdir+"/data/mock_"+str(i)+".sbmy"
fname_RngStateLPT = self.fsimdir+"/data/RngStateLPT_"+str(i)+".rng"
fname_whitenoise = self.fsimdir+"/data/initial_density_white_"+str(i)+".h5"
fname_outputinitialdensity = self.fsimdir+"/data/initial_density_"+str(i)+".h5"
fname_outputrealspacedensity = self.fsimdir+"/data/output_realspace_density_"+str(i)+".h5"
fname_outputdensity = self.fsimdir+"/data/output_density_"+str(i)+".h5"
fname_RngStateMock = self.fsimdir+"/data/RngStateMocks_"+str(i)+".rng"
fname_outputmock = self.fsimdir+"/data/output_mock_"+str(i)+"_"
fname_outputss = self.fsimdir+"/data/output_ss_"+str(i)+".h5"
fname_simlogs = self.fsimdir+"/data/logs_sim_"+str(i)+".txt"
fname_mocklogs = self.fsimdir+"/data/logs_mock_"+str(i)+".txt"
# Save cosmological parameters
self._save_cosmo(cosmo, fname_cosmo, force_cosmo)
# Generate input initial power spectrum from cosmological parameters
self._get_powerspectrum_from_cosmo(cosmo, fname_powerspectrum, force_powerspectrum)
# Call auxiliary blackbox method
Phi = self._aux_blackbox(-1, fname_powerspectrum, fname_simparfile, fname_mockparfile, fname_RngStateLPT, fname_whitenoise, fname_outputinitialdensity, fname_outputrealspacedensity, fname_outputdensity, fname_RngStateMock, fname_outputmock, fname_outputss, fname_simlogs, fname_mocklogs, force_parfiles, force_sim, force_mock)
UNINDENT()
PrintMessage(3, "Making mock data done.")
return Phi
[docs] def evaluate(self, theta, d, i=0, N=0, force_powerspectrum=False, force_parfiles=False, force_sim=False, force_mock=False, remove_sbmy=True):
"""Evaluates the Simbelynë blackbox from an input vector of power spectrum coefficients at the support wavenumbers.
Parameters
----------
theta : array, double, dimension=S
vector of power spectrum values at the support wavenumbers
d : int
index giving the direction in parameter space: -1 for mock data, 0 for the expansion point, or from 1 to S
i : int, optional, default=0
current evaluation index of the blackbox
N : int, optional, default=0
total number of evaluations of the blackbox
force_powerspectrum : bool, optional, default=False
force recomputation of the power spectrum?
force_parfiles : bool, optional, default=False
overwrite if parameter files already exists?
force_sim : bool, optional, default=False
force recomputation if output density already exists?
force_mock : bool, optional, default=False
force recomputation if output mock summary statistics file already exists?
remove_sbmy : bool, optional, default=True
remove Simbelynë output files from disk?
Returns
-------
Phi : array, double, dimension=P
vector of summary statistics
"""
from pyselfi.utils import PrintMessage, PrintValue, INDENT, UNINDENT
PrintMessage(3, "Evaluating blackbox ({}/{})...".format(i,N))
INDENT()
# Define filenames
fname_powerspectrum = self.fsimdir+"/d"+str(d)+"/input_power_d"+str(d)+".h5"
fname_simparfile = self.fsimdir+"/d"+str(d)+"/sim_d"+str(d)+"_p"+str(i-1)+".sbmy"
fname_mockparfile = self.fsimdir+"/d"+str(d)+"/mock_d"+str(d)+"_p"+str(i-1)+".sbmy"
fname_RngStateLPT = self.fsimdir+"/RngStateLPT_p"+str(i-1)+".rng"
fname_whitenoise = self.fsimdir+"/initial_density_white_p"+str(i-1)+".h5"
fname_outputinitialdensity = self.fsimdir+"/initial_density_p"+str(i-1)+".h5"
fname_outputrealspacedensity = self.fsimdir+"/d"+str(d)+"/output_realspace_density_d"+str(d)+"_p"+str(i-1)+".h5"
fname_outputdensity = self.fsimdir+"/d"+str(d)+"/output_density_d"+str(d)+"_p"+str(i-1)+".h5"
fname_RngStateMock = self.fsimdir+"/RngStateMocks_p"+str(i-1)+".rng"
fname_outputmock = self.fsimdir+"/d"+str(d)+"/output_mock_d"+str(d)+"_p"+str(i-1)+"_"
fname_outputss = self.fsimdir+"/d"+str(d)+"/output_ss_d"+str(d)+"_p"+str(i-1)+".h5"
fname_simlogs = self.fsimdir+"/d"+str(d)+"/logs_sim_d"+str(d)+"_p"+str(i-1)+".txt"
fname_mocklogs = self.fsimdir+"/d"+str(d)+"/logs_mock_d"+str(d)+"_p"+str(i-1)+".txt"
# Interpolate P to get P_in
self._get_powerspectrum_from_theta(theta, fname_powerspectrum, force_powerspectrum)
# Call auxiliary blackbox method
Phi = self._aux_blackbox(d, fname_powerspectrum, fname_simparfile, fname_mockparfile, fname_RngStateLPT, fname_whitenoise, fname_outputinitialdensity, fname_outputrealspacedensity, fname_outputdensity, fname_RngStateMock, fname_outputmock, fname_outputss, fname_simlogs, fname_mocklogs, force_parfiles, force_sim, force_mock)
# Clean Simbelynë outputs
if remove_sbmy:
self._clean_sbmy_outputs(fname_outputdensity, fname_outputmock, fname_outputss)
UNINDENT()
PrintMessage(3, "Evaluating blackbox ({}/{}) done.".format(i,N))
return Phi
[docs] def compute_pool(self, theta, d, pool_fname, N, force_powerspectrum=False, force_parfiles=False, force_sim=False, force_mock=False):
"""Computes a pool of realizations of the Simbelynë blackbox. A method compute_pool with this prototype is the only mandatory method.
Parameters
----------
theta : array, double, dimension=S
vector of power spectrum values at the support wavenumbers
d : int
direction in parameter space, from 0 to S
pool_fname : :obj:`str`
pool file name
N : int
number of realizations required
force_powerspectrum : bool, optional, default=False
force recomputation of the power spectrum?
force_parfiles : bool, optional, default=False
overwrite if parameter files already exists?
force_sim : bool, optional, default=False
force recomputation if output density already exists?
force_mock : bool, optional, default=False
force recomputation if output mock summary statistics file already exists?
Returns
-------
p : :obj:`pool`
simulation pool
"""
from pyselfi.pool import pool
p=pool(pool_fname,N)
# Run N evaluations of the blackbox at the desired point
while not p.finished:
i = p.N_sims+1
Phi = self.evaluate(theta,d,i,N,force_powerspectrum,force_parfiles,force_sim,force_mock)
p.add_sim(Phi)
if i%self.save_frequency==0:
p.save()
p.save()
return p
[docs] def make_survey_geometry(self, N_CAT, cosmo, force=False):
"""Produces a mock survey geometry file.
Parameters
----------
N_CAT : int
number of subcatalogs
cosmo : dictionary
cosmological parameters (and some infrastructure parameters)
force : bool, optional, default=False
force recomputation if file already exists?
"""
from os.path import exists
fname = self.fsimdir+"/"+self.fname_inputsurveygeometry
if not exists(fname) or force:
import numpy as np
from pysbmy.survey_geometry import SurveyGeometry
from pysbmy.cosmology import redshift_from_comoving_distance, luminosity_distance
# basic setup
G_sim=self.G_sim
L0=G_sim.L0; L1=G_sim.L1; L2=G_sim.L2; N0=G_sim.N0; N1=G_sim.N2; N2=G_sim.N2
corner0=self.corner0; corner1=self.corner1; corner2=self.corner2
# observational setup
bright_cut=np.array(((0.),))
faint_cut=np.array(((0.),))
rmin=np.array(((0.),))
rmax=np.array(((L0/2*np.sqrt(3)),))
zmin=redshift_from_comoving_distance(rmin, **cosmo)
zmax=redshift_from_comoving_distance(rmax, **cosmo)
N_BIAS=3
nbar=2.0e-3
Nmean=nbar*(L0*L1*L2)/(N0*N1*N2)
galaxy_bias_mean=np.array(((1.2,0.,0.),))
galaxy_bias_std=np.array(((0.,0.,0.),))
galaxy_nmean_mean=np.array(((Nmean),))
galaxy_nmean_std=np.array(((0.),))
galaxy_sel_window=np.ones((N_CAT,N0,N1,N2))
# setup coordinates
x=np.arange(corner0,corner0+L0,L0/N0)
x=np.tile(x,(N2,N1,1))+1/2.*L0/N0
y=np.arange(corner1,corner1+L1,L1/N1)
y=np.tile(y,(N2,N0,1))
y=np.swapaxes(y,1,2)+1/2.*L1/N1
z=np.arange(corner2,corner2+L2,L2/N2)
z=np.tile(z,(N1,N0,1))
z=np.swapaxes(z,0,2)+1/2.*L2/N2
r=np.sqrt(x**2+y**2+z**2)
b=np.arcsin(z/r)*180/np.pi
l=np.arctan2(y,x)*180/np.pi
l[np.where(l<0)]+=360.
# setup basic mask
mask=np.ones((N0,N1,N2))
mask[np.where((b>-self.b_cut)*(b<self.b_cut))]=0.
# Schechter selection function
def integrand_luminosity(x, alpha):
from math import exp
return x**alpha * exp(-x)
def integral_luminosity(alpha, x_min, x_max):
from scipy.integrate import quad
return quad(integrand_luminosity, x_min, x_max, args=(alpha))[0]
def computeSchechterPhi1(faint_absolute_magnitude_cut, bright_absolute_magnitude_cut, faint_apparent_magnitude_cut, bright_apparent_magnitude_cut, Mstar, alpha):
from math import log10, pow
xl1 = pow(10.0, 0.4*(Mstar - faint_absolute_magnitude_cut))
xu1 = pow(10.0, 0.4*(Mstar - bright_absolute_magnitude_cut))
Phi1 = integral_luminosity(alpha, xl1, xu1)
return Phi1
def computeSchechterCompleteness(d_lum, cosmo, faint_absolute_magnitude_cut, bright_absolute_magnitude_cut, faint_apparent_magnitude_cut, bright_apparent_magnitude_cut, Mstar, alpha, Phi1, corr=0.):
from math import log10, pow
#corr = zcorrection(redshift)
corr = 0.
absolute_mu0 = faint_apparent_magnitude_cut - 5 * log10(d_lum) - 25 - corr;
absolute_ml0 = bright_apparent_magnitude_cut - 5 * log10(d_lum) - 25 - corr;
abmu = min(absolute_mu0, faint_absolute_magnitude_cut)
abml = max(absolute_ml0, bright_absolute_magnitude_cut)
abmu = max(abmu,abml)
xl0 = pow(10.0, 0.4*(Mstar- abmu))
xu0 = pow(10.0, 0.4*(Mstar- abml))
Phi0 = integral_luminosity(alpha, xl0, xu0)
return max(0.0, Phi0/Phi1)
selection=np.vectorize(computeSchechterCompleteness, excluded=['**cosmo', 'faint_absolute_magnitude_cut', 'bright_absolute_magnitude_cut', 'faint_apparent_magnitude_cut', 'bright_apparent_magnitude_cut','Mstar','alpha','Phi1','corr'])
# setup galaxy selection window
from scipy.interpolate import UnivariateSpline
N = 1000
r_arr = np.linspace(rmin+1e-7,rmax,N)
z_arr = redshift_from_comoving_distance(r_arr, **cosmo)
d_lum_arr = luminosity_distance(z_arr, **cosmo)
bright_apparent_magnitude_cut = self.bright_apparent_magnitude_cut
faint_apparent_magnitude_cut = self.faint_apparent_magnitude_cut
bright_absolute_magnitude_cut = self.bright_absolute_magnitude_cut
faint_absolute_magnitude_cut = self.faint_absolute_magnitude_cut
Mstar = self.Mstar
alpha = self.alpha
Phi1 = computeSchechterPhi1(faint_absolute_magnitude_cut, bright_absolute_magnitude_cut, faint_apparent_magnitude_cut, bright_apparent_magnitude_cut, Mstar, alpha)
selection_arr = selection(d_lum_arr, cosmo,
faint_absolute_magnitude_cut,
bright_absolute_magnitude_cut,
faint_apparent_magnitude_cut,
bright_apparent_magnitude_cut,
Mstar, alpha, Phi1)
spl = UnivariateSpline(r_arr, selection_arr)
galaxy_sel_window[0]*=spl(r)*mask
galaxy_sel_window[0]/=galaxy_sel_window[0].max()
S=SurveyGeometry(L0,L1,L2,corner0,corner1,corner2,N0,N1,N2,N_CAT,cosmo,bright_cut,faint_cut,rmin,rmax,zmin,zmax,N_BIAS,galaxy_bias_mean,galaxy_bias_std,galaxy_nmean_mean,galaxy_nmean_std,galaxy_sel_window)
S.write(fname)
#end class(blackbox)