diff --git a/deepdog/subset_simulation/subset_simulation_impl.py b/deepdog/subset_simulation/subset_simulation_impl.py index 4edb7cd..82bf904 100644 --- a/deepdog/subset_simulation/subset_simulation_impl.py +++ b/deepdog/subset_simulation/subset_simulation_impl.py @@ -52,9 +52,11 @@ class SubsetSimulation: default_r_step=0.01, default_w_log_step=0.01, default_upper_w_log_step=4, + num_initial_dmc_gens=1, keep_probs_list=True, dump_last_generation_to_file=False, initial_cost_chunk_size=100, + initial_cost_multiprocess=True, cap_core_count: int = 0, # 0 means cap at num cores - 1 ): name, model = model_name_pair @@ -100,6 +102,7 @@ class SubsetSimulation: _logger.info(f"\tn_c: {self.n_c}") _logger.info(f"\tn_s: {self.n_s}") _logger.info(f"\tm: {self.m_max}") + _logger.info(f"\t{num_initial_dmc_gens=}") _logger.info(f"\t{mcmc_seed=}") _logger.info(f"\t{level_0_seed=}") _logger.info("let's do level 0...") @@ -111,9 +114,12 @@ class SubsetSimulation: self.dump_last_generations = dump_last_generation_to_file self.initial_cost_chunk_size = initial_cost_chunk_size + self.initial_cost_multiprocess = initial_cost_multiprocess self.cap_core_count = cap_core_count + self.num_dmc_gens = num_initial_dmc_gens + def _single_chain_gen(self, args: Tuple): threshold_cost, stdevs, rng_seed, (c, s) = args rng = numpy.random.default_rng(rng_seed) @@ -133,50 +139,28 @@ class SubsetSimulation: output_messages = [] + # If we have n_s = 10 and n_c = 100, then our big N = 1000 and p = 1/10 + # The DMC stage would normally generate 1000, then pick the best 100 and start counting prob = p/10. + # Let's say we want our DMC stage to go down to level 2. + # Then we need to filter out p^2, so our initial has to be N_0 = N / p = n_c * n_s^2 + initial_dmc_n = self.n_c * (self.n_s**self.num_dmc_gens) + initial_level = ( + self.num_dmc_gens - 1 + ) # This is perfunctory but let's label it here really explicitly + _logger.info(f"Generating {initial_dmc_n} for DMC stage") sample_dipoles = self.model.get_monte_carlo_dipole_inputs( - self.n_c * self.n_s, + initial_dmc_n, -1, rng_to_use=numpy.random.default_rng(self.level_0_seed), ) # _logger.debug(sample_dipoles) # _logger.debug(sample_dipoles.shape) - raw_costs = [] + _logger.debug("Finished dipole generation") _logger.debug( - f"Using iterated cost function thing with chunk size {self.initial_cost_chunk_size}" + f"Using iterated multiprocessing cost function thing with chunk size {self.initial_cost_chunk_size}" ) - for x in range(0, len(sample_dipoles), self.initial_cost_chunk_size): - # _logger.debug(f"doing chunk {x}") - raw_costs.append( - self.cost_function_to_use( - sample_dipoles[x : x + self.initial_cost_chunk_size] - ) - ) - costs = numpy.concatenate(raw_costs) - - # _logger.debug(f"costs: {costs}") - sorted_indexes = costs.argsort()[::-1] - - # _logger.debug(costs[sorted_indexes]) - # _logger.debug(sample_dipoles[sorted_indexes]) - - sorted_costs = costs[sorted_indexes] - sorted_dipoles = sample_dipoles[sorted_indexes] - - threshold_cost = sorted_costs[-self.n_c] - - all_dipoles = numpy.array( - [ - pdme.subspace_simulation.sort_array_of_dipoles_by_frequency(samp) - for samp in sorted_dipoles - ] - ) - all_chains = list(zip(sorted_costs, all_dipoles)) - - long_mcmc_rng = numpy.random.default_rng(self.mcmc_seed) - mcmc_rng_seed_sequence = numpy.random.SeedSequence(self.mcmc_seed) - # core count etc. logic here core_count = multiprocessing.cpu_count() - 1 or 1 if (self.cap_core_count >= 1) and (self.cap_core_count < core_count): @@ -184,7 +168,74 @@ class SubsetSimulation: _logger.info(f"Using {core_count} cores") with multiprocessing.Pool(core_count) as pool: - for i in range(self.m_max): + + # Do the initial DMC calculation in a multiprocessing + + chunks = numpy.array_split( + sample_dipoles, + range( + self.initial_cost_chunk_size, + len(sample_dipoles), + self.initial_cost_chunk_size, + ), + ) + if self.initial_cost_multiprocess: + _logger.debug("Multiprocessing initial costs") + raw_costs = pool.map(self.cost_function_to_use, chunks) + else: + _logger.debug("Single process initial costs") + raw_costs = [] + for chunk_idx, chunk in enumerate(chunks): + _logger.debug(f"doing chunk #{chunk_idx}") + raw_costs.append(self.cost_function_to_use(chunk)) + costs = numpy.concatenate(raw_costs) + _logger.debug("finished initial dmc cost calculation") + # _logger.debug(f"costs: {costs}") + sorted_indexes = costs.argsort()[::-1] + + # _logger.debug(costs[sorted_indexes]) + # _logger.debug(sample_dipoles[sorted_indexes]) + + sorted_costs = costs[sorted_indexes] + sorted_dipoles = sample_dipoles[sorted_indexes] + + all_dipoles = numpy.array( + [ + pdme.subspace_simulation.sort_array_of_dipoles_by_frequency(samp) + for samp in sorted_dipoles + ] + ) + all_chains = list(zip(sorted_costs, all_dipoles)) + for dmc_level in range(initial_level): + # if initial level is 1, we want to print out what the level 0 threshold would have been? + _logger.debug(f"Get the pseudo statistics for level {dmc_level}") + _logger.debug(f"Whole chain has length {len(all_chains)}") + pseudo_threshold_index = -( + self.n_c * (self.n_s ** (self.num_dmc_gens - dmc_level - 1)) + ) + _logger.debug( + f"Have a pseudo_threshold_index of {pseudo_threshold_index}, or {len(all_chains) + pseudo_threshold_index}" + ) + pseudo_threshold_cost = all_chains[-pseudo_threshold_index][0] + _logger.info( + f"Pseudo-level {dmc_level} threshold cost {pseudo_threshold_cost}, at P = (1 / {self.n_s})^{dmc_level + 1}" + ) + all_chains = all_chains[pseudo_threshold_index:] + + long_mcmc_rng = numpy.random.default_rng(self.mcmc_seed) + mcmc_rng_seed_sequence = numpy.random.SeedSequence(self.mcmc_seed) + + threshold_cost = all_chains[-self.n_c][0] + _logger.info( + f"Finishing DMC threshold cost {threshold_cost} at level {initial_level}, at P = (1 / {self.n_s})^{initial_level + 1}" + ) + _logger.debug(f"Executing the MCMC with chains of length {len(all_chains)}") + + # Now we move on to the MCMC part of the algorithm + + # This is important, we want to allow some extra initial levels so we need to account for that here! + for i in range(self.num_dmc_gens, self.m_max): + _logger.info(f"Starting level {i}") next_seeds = all_chains[-self.n_c :] if self.dump_last_generations: @@ -443,6 +494,7 @@ class MultiSubsetSimulations: n_s: int, m_max: int, target_cost: float, + num_initial_dmc_gens: int = 1, level_0_seed_seed: int = 200, mcmc_seed_seed: int = 20, use_adaptive_steps=True, @@ -462,6 +514,8 @@ class MultiSubsetSimulations: self.m_max = m_max self.target_cost = target_cost # This is not optional here! + self.num_dmc_gens = num_initial_dmc_gens + self.level_0_seed_seed = level_0_seed_seed self.mcmc_seed_seed = mcmc_seed_seed