diff --git a/pathfinder/model/oscillating/oscillating_dipole.py b/pathfinder/model/oscillating/oscillating_dipole.py index 1f3fba1..ca067ed 100644 --- a/pathfinder/model/oscillating/oscillating_dipole.py +++ b/pathfinder/model/oscillating/oscillating_dipole.py @@ -62,8 +62,9 @@ class OscillatingDipoleArrangement(): -------- dipoles : Sequence[OscillatingDipole] ''' - def __init__(self, dipoles: Sequence[OscillatingDipole]): + def __init__(self, dipoles: Sequence[OscillatingDipole], seed=12345): self.dipoles = dipoles + self.rng = numpy.random.default_rng(seed) def get_dot_measurement(self, dot_input: DotInput) -> DotMeasurement: r = numpy.array(dot_input[0]) @@ -75,3 +76,18 @@ class OscillatingDipoleArrangement(): For a series of points, each with three coordinates and a frequency, return a list of the corresponding DotMeasurements. ''' return [self.get_dot_measurement(dot_input) for dot_input in dot_inputs] + + def get_dot_measurements_with_random(self, dot_inputs: Sequence[DotInput]) -> List[DotMeasurement]: + ''' + For a series of points, each with three coordinates and a frequency, return a list of the corresponding DotMeasurements. + ''' + def apply_random(input: DotInput, error_size: float) -> DotMeasurement: + actual_measurement = self.get_dot_measurement(input) + errored = DotMeasurement(actual_measurement.v * error_size, actual_measurement.r, actual_measurement.f) + errored.error = error_size + errored.original_v = actual_measurement.v + return errored + + random = (self.rng.random(len(dot_inputs)) * .3) + .85 + + return [apply_random(*inputs) for inputs in zip(dot_inputs, random)] diff --git a/scripts/random_test1-noise-plot.py b/scripts/random_test1-noise-plot.py new file mode 100644 index 0000000..bf64e01 --- /dev/null +++ b/scripts/random_test1-noise-plot.py @@ -0,0 +1,45 @@ +import itertools +import logging +import multiprocessing +import numpy +import pathfinder.model.oscillating + + +def print_result(msg, result): + logging.info(msg) + logging.info(f"\tResult: {result.pathfinder_x}") + logging.info(f"\tSuccess: {result.success}. {result.message}") + try: + logging.info(f"\tFunc evals: {result.nfev}") + except AttributeError: + pass + try: + logging.info(f"\tJacb evals: {result.njev}") + except AttributeError: + pass + + +def try_initial_position(model, expected_result, initial_pos): + res = model.sol_basinhopping(initial_position=initial_pos) + logging.info(res) + return res.pathfinder_x + + +def main(): + logging.info("Running script...") + dot_inputs = list( + (([.5, -1, 0], numpy.power(10, f))) for f in numpy.arange(1, 3, .1) + ) + logging.info(dot_inputs) + dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [.51, -1.2, 0], 8) + expected_result = numpy.array([1, 2, 3, .51, -1.2, 0, 8]) + dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole]) + dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs) + dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs) + + logging.info([(m.f, m.v) for m in dot_measurements]) + logging.info([(m.f, m.v) for m in dot_measurements_with_error]) + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main() \ No newline at end of file diff --git a/scripts/random_test1.py b/scripts/random_test1.py new file mode 100644 index 0000000..969b568 --- /dev/null +++ b/scripts/random_test1.py @@ -0,0 +1,60 @@ +import itertools +import logging +import multiprocessing +import numpy +import pathfinder.model.oscillating + + +def print_result(msg, result): + logging.info(msg) + logging.info(f"\tResult: {result.pathfinder_x}") + logging.info(f"\tSuccess: {result.success}. {result.message}") + try: + logging.info(f"\tFunc evals: {result.nfev}") + except AttributeError: + pass + try: + logging.info(f"\tJacb evals: {result.njev}") + except AttributeError: + pass + + +def try_initial_position(model, expected_result, initial_pos): + res = model.sol_basinhopping(initial_position=initial_pos) + logging.info(res) + return res.pathfinder_x + + +def main(): + logging.info("Running script...") + dot_inputs = list(itertools.chain.from_iterable( + (([-.8, -5.1, 0], f), ([.5, -1, 0], f), ([.3, 5, 0], f), ([7.3, 6.5, 0], f)) for f in numpy.arange(1, 10, 2) + )) + + dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [.51, -1.2, 0], 8) + expected_result = numpy.array([1, 2, 3, .51, -1.2, 0, 8]) + dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole]) + dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs) + dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs) + + # model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1) + model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1) + logging.info("Finished setting up model") + + results = [] + rb = -2 + ru = 3 + interval = 3 + points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)] + logging.info(f"Will have {len(points_to_try)} points to try") + logging.info("creating pool...") + with multiprocessing.Pool() as pool: + results = pool.starmap(try_initial_position, points_to_try) + logging.info(results) + final_values = [r for r in results if r is not None] + logging.info(final_values) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main() \ No newline at end of file diff --git a/scripts/random_test2.py b/scripts/random_test2.py new file mode 100644 index 0000000..2c246ed --- /dev/null +++ b/scripts/random_test2.py @@ -0,0 +1,75 @@ +import itertools +import logging +import multiprocessing +import numpy +import pathfinder.model.oscillating + + +def print_result(msg, result): + logging.info(msg) + logging.info(f"\tResult: {result.pathfinder_x}") + logging.info(f"\tSuccess: {result.success}. {result.message}") + try: + logging.info(f"\tFunc evals: {result.nfev}") + except AttributeError: + pass + try: + logging.info(f"\tJacb evals: {result.njev}") + except AttributeError: + pass + + +def try_initial_position(model, expected_result, initial_pos): + res = model.sol_basinhopping(initial_position=initial_pos) + logging.info(res) + return res.pathfinder_x + + +def main(): + logging.info("Running script...") + dot_inputs = list(itertools.chain.from_iterable( + (([-.8, -5.1, 0], f), ([.5, -1, 0], f), ([.3, 5, 0], f), ([7.3, 6.5, 0], f)) for f in numpy.arange(1, 10, 2) + )) + + dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [7.4, 6.3, 0.02], 7) + expected_result = numpy.array([1, 2, 3, 7.4, 6.3, 0.02, 7]) + dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole]) + dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs) + dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs) + + # model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1) + model1 = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1) + model2 = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1) + logging.info("Finished setting up model") + + results = [] + rb = -2 + ru = 3 + interval = 3 + points_to_try = [(model1, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)] + logging.info(f"Will have {len(points_to_try)} points to try") + logging.info("creating pool...") + with multiprocessing.Pool() as pool: + results = pool.starmap(try_initial_position, points_to_try) + #logging.info(results) + final_values = [r for r in results if r is not None] + logging.info("Results without errors") + logging.info(final_values) + + results2 = [] + points_to_try = [(model2, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)] + logging.info(f"Will have {len(points_to_try)} points to try") + logging.info("creating pool...") + with multiprocessing.Pool() as pool: + results2 = pool.starmap(try_initial_position, points_to_try) + #logging.info(results) + final_values2 = [r for r in results2 if r is not None] + logging.info("Results without errors") + logging.info(final_values) + logging.info("Results with errors") + logging.info(final_values2) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main() \ No newline at end of file