Adds a bunch for random noise stuff
Some checks failed
gitea-physics/pathfinder/pipeline/pr-master There was a failure building this commit

This commit is contained in:
Deepak Mallubhotla 2021-11-29 14:44:14 -06:00
parent fbc37158bc
commit 260c1b0a4e
4 changed files with 197 additions and 1 deletions

View File

@ -62,8 +62,9 @@ class OscillatingDipoleArrangement():
--------
dipoles : Sequence[OscillatingDipole]
'''
def __init__(self, dipoles: Sequence[OscillatingDipole]):
def __init__(self, dipoles: Sequence[OscillatingDipole], seed=12345):
self.dipoles = dipoles
self.rng = numpy.random.default_rng(seed)
def get_dot_measurement(self, dot_input: DotInput) -> DotMeasurement:
r = numpy.array(dot_input[0])
@ -75,3 +76,18 @@ class OscillatingDipoleArrangement():
For a series of points, each with three coordinates and a frequency, return a list of the corresponding DotMeasurements.
'''
return [self.get_dot_measurement(dot_input) for dot_input in dot_inputs]
def get_dot_measurements_with_random(self, dot_inputs: Sequence[DotInput]) -> List[DotMeasurement]:
'''
For a series of points, each with three coordinates and a frequency, return a list of the corresponding DotMeasurements.
'''
def apply_random(input: DotInput, error_size: float) -> DotMeasurement:
actual_measurement = self.get_dot_measurement(input)
errored = DotMeasurement(actual_measurement.v * error_size, actual_measurement.r, actual_measurement.f)
errored.error = error_size
errored.original_v = actual_measurement.v
return errored
random = (self.rng.random(len(dot_inputs)) * .3) + .85
return [apply_random(*inputs) for inputs in zip(dot_inputs, random)]

View File

@ -0,0 +1,45 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol_basinhopping(initial_position=initial_pos)
logging.info(res)
return res.pathfinder_x
def main():
logging.info("Running script...")
dot_inputs = list(
(([.5, -1, 0], numpy.power(10, f))) for f in numpy.arange(1, 3, .1)
)
logging.info(dot_inputs)
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [.51, -1.2, 0], 8)
expected_result = numpy.array([1, 2, 3, .51, -1.2, 0, 8])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs)
logging.info([(m.f, m.v) for m in dot_measurements])
logging.info([(m.f, m.v) for m in dot_measurements_with_error])
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

60
scripts/random_test1.py Normal file
View File

@ -0,0 +1,60 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol_basinhopping(initial_position=initial_pos)
logging.info(res)
return res.pathfinder_x
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([-.8, -5.1, 0], f), ([.5, -1, 0], f), ([.3, 5, 0], f), ([7.3, 6.5, 0], f)) for f in numpy.arange(1, 10, 2)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [.51, -1.2, 0], 8)
expected_result = numpy.array([1, 2, 3, .51, -1.2, 0, 8])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs)
# model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
logging.info("Finished setting up model")
results = []
rb = -2
ru = 3
interval = 3
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results = pool.starmap(try_initial_position, points_to_try)
logging.info(results)
final_values = [r for r in results if r is not None]
logging.info(final_values)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

75
scripts/random_test2.py Normal file
View File

@ -0,0 +1,75 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol_basinhopping(initial_position=initial_pos)
logging.info(res)
return res.pathfinder_x
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([-.8, -5.1, 0], f), ([.5, -1, 0], f), ([.3, 5, 0], f), ([7.3, 6.5, 0], f)) for f in numpy.arange(1, 10, 2)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [7.4, 6.3, 0.02], 7)
expected_result = numpy.array([1, 2, 3, 7.4, 6.3, 0.02, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs)
# model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1)
model1 = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
model2 = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1)
logging.info("Finished setting up model")
results = []
rb = -2
ru = 3
interval = 3
points_to_try = [(model1, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results = pool.starmap(try_initial_position, points_to_try)
#logging.info(results)
final_values = [r for r in results if r is not None]
logging.info("Results without errors")
logging.info(final_values)
results2 = []
points_to_try = [(model2, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results2 = pool.starmap(try_initial_position, points_to_try)
#logging.info(results)
final_values2 = [r for r in results2 if r is not None]
logging.info("Results without errors")
logging.info(final_values)
logging.info("Results with errors")
logging.info(final_values2)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()