feat: improve initial cost calculation to allow multiprocessing, adds ability to specify a number of levels to do with direct mc instead of subset simulation

This commit is contained in:
Deepak Mallubhotla 2024-05-19 22:11:50 -05:00
parent 24ac65bf9c
commit 09fad2e102
Signed by: deepak
GPG Key ID: BEBAEBF28083E022

View File

@ -52,9 +52,11 @@ class SubsetSimulation:
default_r_step=0.01, default_r_step=0.01,
default_w_log_step=0.01, default_w_log_step=0.01,
default_upper_w_log_step=4, default_upper_w_log_step=4,
num_initial_dmc_gens=1,
keep_probs_list=True, keep_probs_list=True,
dump_last_generation_to_file=False, dump_last_generation_to_file=False,
initial_cost_chunk_size=100, initial_cost_chunk_size=100,
initial_cost_multiprocess=True,
cap_core_count: int = 0, # 0 means cap at num cores - 1 cap_core_count: int = 0, # 0 means cap at num cores - 1
): ):
name, model = model_name_pair name, model = model_name_pair
@ -100,6 +102,7 @@ class SubsetSimulation:
_logger.info(f"\tn_c: {self.n_c}") _logger.info(f"\tn_c: {self.n_c}")
_logger.info(f"\tn_s: {self.n_s}") _logger.info(f"\tn_s: {self.n_s}")
_logger.info(f"\tm: {self.m_max}") _logger.info(f"\tm: {self.m_max}")
_logger.info(f"\t{num_initial_dmc_gens=}")
_logger.info(f"\t{mcmc_seed=}") _logger.info(f"\t{mcmc_seed=}")
_logger.info(f"\t{level_0_seed=}") _logger.info(f"\t{level_0_seed=}")
_logger.info("let's do level 0...") _logger.info("let's do level 0...")
@ -111,9 +114,12 @@ class SubsetSimulation:
self.dump_last_generations = dump_last_generation_to_file self.dump_last_generations = dump_last_generation_to_file
self.initial_cost_chunk_size = initial_cost_chunk_size self.initial_cost_chunk_size = initial_cost_chunk_size
self.initial_cost_multiprocess = initial_cost_multiprocess
self.cap_core_count = cap_core_count self.cap_core_count = cap_core_count
self.num_dmc_gens = num_initial_dmc_gens
def _single_chain_gen(self, args: Tuple): def _single_chain_gen(self, args: Tuple):
threshold_cost, stdevs, rng_seed, (c, s) = args threshold_cost, stdevs, rng_seed, (c, s) = args
rng = numpy.random.default_rng(rng_seed) rng = numpy.random.default_rng(rng_seed)
@ -133,50 +139,28 @@ class SubsetSimulation:
output_messages = [] output_messages = []
# If we have n_s = 10 and n_c = 100, then our big N = 1000 and p = 1/10
# The DMC stage would normally generate 1000, then pick the best 100 and start counting prob = p/10.
# Let's say we want our DMC stage to go down to level 2.
# Then we need to filter out p^2, so our initial has to be N_0 = N / p = n_c * n_s^2
initial_dmc_n = self.n_c * (self.n_s**self.num_dmc_gens)
initial_level = (
self.num_dmc_gens - 1
) # This is perfunctory but let's label it here really explicitly
_logger.info(f"Generating {initial_dmc_n} for DMC stage")
sample_dipoles = self.model.get_monte_carlo_dipole_inputs( sample_dipoles = self.model.get_monte_carlo_dipole_inputs(
self.n_c * self.n_s, initial_dmc_n,
-1, -1,
rng_to_use=numpy.random.default_rng(self.level_0_seed), rng_to_use=numpy.random.default_rng(self.level_0_seed),
) )
# _logger.debug(sample_dipoles) # _logger.debug(sample_dipoles)
# _logger.debug(sample_dipoles.shape) # _logger.debug(sample_dipoles.shape)
raw_costs = [] _logger.debug("Finished dipole generation")
_logger.debug( _logger.debug(
f"Using iterated cost function thing with chunk size {self.initial_cost_chunk_size}" f"Using iterated multiprocessing cost function thing with chunk size {self.initial_cost_chunk_size}"
) )
for x in range(0, len(sample_dipoles), self.initial_cost_chunk_size):
# _logger.debug(f"doing chunk {x}")
raw_costs.append(
self.cost_function_to_use(
sample_dipoles[x : x + self.initial_cost_chunk_size]
)
)
costs = numpy.concatenate(raw_costs)
# _logger.debug(f"costs: {costs}")
sorted_indexes = costs.argsort()[::-1]
# _logger.debug(costs[sorted_indexes])
# _logger.debug(sample_dipoles[sorted_indexes])
sorted_costs = costs[sorted_indexes]
sorted_dipoles = sample_dipoles[sorted_indexes]
threshold_cost = sorted_costs[-self.n_c]
all_dipoles = numpy.array(
[
pdme.subspace_simulation.sort_array_of_dipoles_by_frequency(samp)
for samp in sorted_dipoles
]
)
all_chains = list(zip(sorted_costs, all_dipoles))
long_mcmc_rng = numpy.random.default_rng(self.mcmc_seed)
mcmc_rng_seed_sequence = numpy.random.SeedSequence(self.mcmc_seed)
# core count etc. logic here # core count etc. logic here
core_count = multiprocessing.cpu_count() - 1 or 1 core_count = multiprocessing.cpu_count() - 1 or 1
if (self.cap_core_count >= 1) and (self.cap_core_count < core_count): if (self.cap_core_count >= 1) and (self.cap_core_count < core_count):
@ -184,7 +168,74 @@ class SubsetSimulation:
_logger.info(f"Using {core_count} cores") _logger.info(f"Using {core_count} cores")
with multiprocessing.Pool(core_count) as pool: with multiprocessing.Pool(core_count) as pool:
for i in range(self.m_max):
# Do the initial DMC calculation in a multiprocessing
chunks = numpy.array_split(
sample_dipoles,
range(
self.initial_cost_chunk_size,
len(sample_dipoles),
self.initial_cost_chunk_size,
),
)
if self.initial_cost_multiprocess:
_logger.debug("Multiprocessing initial costs")
raw_costs = pool.map(self.cost_function_to_use, chunks)
else:
_logger.debug("Single process initial costs")
raw_costs = []
for chunk_idx, chunk in enumerate(chunks):
_logger.debug(f"doing chunk #{chunk_idx}")
raw_costs.append(self.cost_function_to_use(chunk))
costs = numpy.concatenate(raw_costs)
_logger.debug("finished initial dmc cost calculation")
# _logger.debug(f"costs: {costs}")
sorted_indexes = costs.argsort()[::-1]
# _logger.debug(costs[sorted_indexes])
# _logger.debug(sample_dipoles[sorted_indexes])
sorted_costs = costs[sorted_indexes]
sorted_dipoles = sample_dipoles[sorted_indexes]
all_dipoles = numpy.array(
[
pdme.subspace_simulation.sort_array_of_dipoles_by_frequency(samp)
for samp in sorted_dipoles
]
)
all_chains = list(zip(sorted_costs, all_dipoles))
for dmc_level in range(initial_level):
# if initial level is 1, we want to print out what the level 0 threshold would have been?
_logger.debug(f"Get the pseudo statistics for level {dmc_level}")
_logger.debug(f"Whole chain has length {len(all_chains)}")
pseudo_threshold_index = -(
self.n_c * (self.n_s ** (self.num_dmc_gens - dmc_level - 1))
)
_logger.debug(
f"Have a pseudo_threshold_index of {pseudo_threshold_index}, or {len(all_chains) + pseudo_threshold_index}"
)
pseudo_threshold_cost = all_chains[-pseudo_threshold_index][0]
_logger.info(
f"Pseudo-level {dmc_level} threshold cost {pseudo_threshold_cost}, at P = (1 / {self.n_s})^{dmc_level + 1}"
)
all_chains = all_chains[pseudo_threshold_index:]
long_mcmc_rng = numpy.random.default_rng(self.mcmc_seed)
mcmc_rng_seed_sequence = numpy.random.SeedSequence(self.mcmc_seed)
threshold_cost = all_chains[-self.n_c][0]
_logger.info(
f"Finishing DMC threshold cost {threshold_cost} at level {initial_level}, at P = (1 / {self.n_s})^{initial_level + 1}"
)
_logger.debug(f"Executing the MCMC with chains of length {len(all_chains)}")
# Now we move on to the MCMC part of the algorithm
# This is important, we want to allow some extra initial levels so we need to account for that here!
for i in range(self.num_dmc_gens, self.m_max):
_logger.info(f"Starting level {i}")
next_seeds = all_chains[-self.n_c :] next_seeds = all_chains[-self.n_c :]
if self.dump_last_generations: if self.dump_last_generations:
@ -443,6 +494,7 @@ class MultiSubsetSimulations:
n_s: int, n_s: int,
m_max: int, m_max: int,
target_cost: float, target_cost: float,
num_initial_dmc_gens: int = 1,
level_0_seed_seed: int = 200, level_0_seed_seed: int = 200,
mcmc_seed_seed: int = 20, mcmc_seed_seed: int = 20,
use_adaptive_steps=True, use_adaptive_steps=True,
@ -462,6 +514,8 @@ class MultiSubsetSimulations:
self.m_max = m_max self.m_max = m_max
self.target_cost = target_cost # This is not optional here! self.target_cost = target_cost # This is not optional here!
self.num_dmc_gens = num_initial_dmc_gens
self.level_0_seed_seed = level_0_seed_seed self.level_0_seed_seed = level_0_seed_seed
self.mcmc_seed_seed = mcmc_seed_seed self.mcmc_seed_seed = mcmc_seed_seed