1 Commits

Author SHA1 Message Date
a7fe2455a5 Adds some tests whatever
Some checks failed
gitea-physics/pathfinder/pipeline/head There was a failure building this commit
2021-11-01 15:50:57 -05:00
20 changed files with 133 additions and 790 deletions

2
do.sh
View File

@@ -11,7 +11,7 @@ build() {
test() {
echo "I am ${FUNCNAME[0]}ing"
poetry run flake8 pathfinder tests
poetry run flake8
poetry run mypy pathfinder
poetry run pytest
}

View File

@@ -3,7 +3,6 @@ import numpy
import scipy.optimize
from pathfinder.model.oscillating.dot import DotMeasurement
import pathfinder.model.oscillating.util
import logging
class DotOscillatingDipoleModel():
@@ -69,40 +68,6 @@ class DotOscillatingDipoleModel():
result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x)
return result
def sol_minim(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1, use_root=True):
initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n)
cached_costs = self.costs()
cached_jacobian = self.jac()
def summed(pt):
current_cost = cached_costs(pt)
curr_jac = cached_jacobian(pt)
curr_gradient = .5 * numpy.matmul(numpy.transpose(curr_jac), current_cost)
return (numpy.sum(current_cost**2), curr_gradient)
result = scipy.optimize.minimize(summed, initial, jac=True, tol=1e-16, options={"gtol": 1e-20})
result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x)
return result
def sol_basinhopping(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1, use_root=True):
initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n)
cached_costs = self.costs()
cached_jacobian = self.jac()
def summed(pt):
current_cost = cached_costs(pt)
curr_jac = cached_jacobian(pt)
curr_gradient = .5 * numpy.matmul(numpy.transpose(curr_jac), current_cost)
return (numpy.sum(current_cost**2), curr_gradient)
minimizer_kwargs = {"method": "L-BFGS-B", "jac": True, "tol": 1e-16, "options": {"gtol": 1e-16}}
result = scipy.optimize.basinhopping(summed, initial, minimizer_kwargs=minimizer_kwargs, niter=100)
result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x)
return result
def sol2(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1, use_root=True):
initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n)
@@ -116,3 +81,46 @@ class DotOscillatingDipoleModel():
result = scipy.optimize.least_squares(self.simple_costs(), initial, jac=self.simple_jac(), ftol=1e-15, gtol=3e-16)
result.pathfinder_x = result.x
return result
def sol_simple(self, costs, initial, **kwargs):
initial = numpy.tile(numpy.concatenate(initial, axis=None), self.n)
result = scipy.optimize.least_squares(self.costs(), initial, jac=kwargs["jac"], ftol=kwargs["ftol"], gtol=kwargs["gtol"])
result.old_fun = result.fun
result.fun = numpy.sum(result.fun**2, axis=None)
result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x)
return result
def sol_basinhopping(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1):
initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n)
def summed_costs(pt):
curr_cost = self.costs()(pt)
squared_cost = numpy.sum(curr_cost**2, axis=None)
gradient = .5 * numpy.matmul(numpy.transpose(self.jac()(pt)), curr_cost)
return (squared_cost, gradient)
minimizer_kwargs = {"method": "BFGS", "jac": True}
result = scipy.optimize.basinhopping(summed_costs, initial, niter=1000, minimizer_kwargs=minimizer_kwargs)
result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x)
return result
def sol_basinhopping_big(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1):
initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n)
minimizer_kwargs = {
"method": self.sol_simple,
"jac": self.jac(),
"options": {
"ftol": 1e-15,
"gtol": 3e-1,
}
}
result = scipy.optimize.basinhopping(self.costs(), initial, niter=1000, minimizer_kwargs=minimizer_kwargs)
result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x)
return result

View File

@@ -62,9 +62,8 @@ class OscillatingDipoleArrangement():
--------
dipoles : Sequence[OscillatingDipole]
'''
def __init__(self, dipoles: Sequence[OscillatingDipole], seed=12345):
def __init__(self, dipoles: Sequence[OscillatingDipole]):
self.dipoles = dipoles
self.rng = numpy.random.default_rng(seed)
def get_dot_measurement(self, dot_input: DotInput) -> DotMeasurement:
r = numpy.array(dot_input[0])
@@ -76,18 +75,3 @@ class OscillatingDipoleArrangement():
For a series of points, each with three coordinates and a frequency, return a list of the corresponding DotMeasurements.
'''
return [self.get_dot_measurement(dot_input) for dot_input in dot_inputs]
def get_dot_measurements_with_random(self, dot_inputs: Sequence[DotInput]) -> List[DotMeasurement]:
'''
For a series of points, each with three coordinates and a frequency, return a list of the corresponding DotMeasurements.
'''
def apply_random(input: DotInput, error_size: float) -> DotMeasurement:
actual_measurement = self.get_dot_measurement(input)
errored = DotMeasurement(actual_measurement.v * error_size, actual_measurement.r, actual_measurement.f)
errored.error = error_size
errored.original_v = actual_measurement.v
return errored
random = (self.rng.random(len(dot_inputs)) * .3) + .85
return [apply_random(*inputs) for inputs in zip(dot_inputs, random)]

View File

@@ -16,4 +16,4 @@ def flip_chunk_to_positive_px(pt: numpy.ndarray) -> numpy.ndarray:
def normalize_oscillating_dipole_list(pts: numpy.ndarray) -> numpy.ndarray:
pt_length = 7
chunked_pts = [flip_chunk_to_positive_px(pts[i: i + pt_length]) for i in range(0, len(pts), pt_length)]
return numpy.concatenate(sorted(chunked_pts, key=lambda x: tuple(round(val, 3) for val in operator.itemgetter(6, 0, 1, 2, 3, 4, 5)(x))), axis=None)
return numpy.concatenate(sorted(chunked_pts, key=lambda x: tuple(round(val, 3) for val in operator.itemgetter(0, 1, 2, 3, 4, 5, 6)(x))), axis=None)

View File

@@ -1,56 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol_basinhopping(initial_position=initial_pos)
return res.pathfinder_x
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .5) for o in (0, 0.5)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
logging.info("Finished setting up model")
results = []
rb = -4
ru = 5
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, 2) for dy in numpy.arange(rb, ru, 2) for dz in range(rb, ru, 2)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results = pool.starmap(try_initial_position, points_to_try)
logging.info(results)
final_values = [r for r in results if r is not None]
logging.info(final_values)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -1,50 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def main():
logging.info("Running script...")
dot_inputs = [
([0, 0, .01], 5), ([-1, 0, -.01], 5), ([-2, 0, -.01], 5), ([0, -1, .01], 5), ([-1, -1, 0], 5), ([-2, -1, 0], 5),
([0, 0, .01], 1), ([-1, 0, -.01], 1), ([-2, 0, -.01], 1), ([0, -1, .01], 1), ([-1, -1, 0], 1), ([-2, -1, 0], 1),
]
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
res = model.sol_minim()
logging.info("Result:")
logging.info(res)
logging.info(model.costs()(res.pathfinder_x))
logging.info(model.jac()(res.pathfinder_x))
current_cost = model.costs()(res.pathfinder_x)
curr_jac = model.jac()(res.pathfinder_x)
curr_gradient = .5 * numpy.matmul(numpy.transpose(curr_jac), current_cost)
logging.info((numpy.sum(current_cost**2), curr_gradient))
logging.info(numpy.sum(model.costs()(res.pathfinder_x + curr_gradient)**2))
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -1,42 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def main():
logging.info("Running script...")
dot_inputs = [
([0, 0, .01], 5), ([-1, 0, -.01], 5), ([-2, 0, -.01], 5), ([0, -1, .01], 5), ([-1, -1, 0], 5), ([-2, -1, 0], 5),
([0, 0, .01], 1), ([-1, 0, -.01], 1), ([-2, 0, -.01], 1), ([0, -1, .01], 1), ([-1, -1, 0], 1), ([-2, -1, 0], 1),
]
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
res = model.sol_basinhopping()
logging.info("Result:")
logging.info(res)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -21,13 +21,14 @@ def print_result(msg, result):
def try_initial_position(model, expected_result, initial_pos):
res = model.sol_basinhopping(initial_position=initial_pos)
return res.pathfinder_x
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .5) for o in (0, 0.5)
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .05) for o in (0, 0.5)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
@@ -41,7 +42,7 @@ def main():
results = []
rb = -4
ru = 5
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, 2) for dy in numpy.arange(rb, ru, 2) for dz in range(rb, ru, 2)]
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru, 3) for dy in range(rb, ru, 3) for dz in range(rb, ru, 2)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:

View File

@@ -1,59 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol2(initial_position=initial_pos)
if res.success:
return res.pathfinder_x
else:
return None
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .05) for o in (0, 0.5)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
logging.info("Finished setting up model")
results = []
rb = -3
ru = 4
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru, 2) for dy in range(rb, ru, 2) for dz in range(rb, ru, 2)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results = pool.starmap(try_initial_position, points_to_try)
logging.info(results)
final_values = [r for r in results if r is not None]
logging.info(final_values)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -1,59 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol(initial_position=initial_pos)
if res.success:
return res.pathfinder_x
else:
return None
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .05) for o in (0, 0.5)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
logging.info("Finished setting up model")
results = []
rb = -4
ru = 5
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru, 2) for dy in range(rb, ru, 2) for dz in range(rb, ru, 2)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results = pool.starmap(try_initial_position, points_to_try)
logging.info(results)
final_values = [r for r in results if r is not None]
logging.info(final_values)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -1,66 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol(initial_position=initial_pos)
if res.success:
return res.pathfinder_x
else:
return None
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .05) for o in (0, 0.5)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
logging.info("Finished setting up model")
vals = [
[1.61794, 1.96704, -4.00528, -0.0784033, 0.316984, 1.0257, 7],
[8.18934, 2.86237, 15.2491, -0.695465, 1.11861, -0.306364, 7],
[0.7485, -2.17157, 35.6347, 0.353949, 0.487858, 0.0554771, 7],
[2.11867, -0.227147, -48.9911, -0.180317, 0.521937, 0.00989036, 7],
[0.0930268, 3.32614, 1.69929, 0.121074, -0.230094, -0.872652, 7],
[1, 2, 3, 0, 0, -1, 7],
[1, 2, 3, 0, 0, -1, 8],
[2, 2, 3, 0, 0, -1, 7]
]
jac = model.jac()
costs = model.costs()
for val in vals:
j = jac(numpy.array(val))
c = costs(numpy.array(val))
logging.info(f"looking at {val}")
# logging.info(f"jac: [{j}]")
# logging.info(f"cost: [{c}]")
logging.info(f"gradient: {numpy.matmul(numpy.transpose(j), c)}")
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -1,60 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol(initial_position=initial_pos)
# print_result(f"{initial_pos}", res)
return (initial_pos, all(numpy.isclose(res.pathfinder_x, expected_result, rtol=1e-6, atol=1e-6)))
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .05) for o in (0, 0.5)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
logging.info("Finished setting up model")
work_good = []
no_work_good = []
rb = -4
ru = 5
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru, 2) for dy in range(rb, ru, 2) for dz in range(rb, ru, 2)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results = pool.starmap(try_initial_position, points_to_try)
work_good = [x for x,good in results if good]
no_work_good = [x for x,good in results if not good]
logging.info(f"work good: [{work_good}]")
logging.info(f"no work good: [{no_work_good}]")
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -1,59 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol(initial_position=initial_pos)
return (initial_pos, all(numpy.isclose(res.pathfinder_x, expected_result)))
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .1) for o in (0, 0.01)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
logging.info("Finished setting up model")
work_good = []
no_work_good = []
rb = -4
ru = 5
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru) for dy in range(rb, ru) for dz in range(rb, ru)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results = pool.starmap(try_initial_position, points_to_try)
work_good = [x for x,good in results if good]
no_work_good = [x for x,good in results if not good]
logging.info(f"work good: [{work_good}]")
logging.info(f"no work good: [{no_work_good}]")
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -1,59 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol(initial_position=initial_pos)
return (initial_pos, all(numpy.isclose(res.pathfinder_x, expected_result)))
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .5) for o in (0, 0.01)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
logging.info("Finished setting up model")
work_good = []
no_work_good = []
rb = -2
ru = 3
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru, 2) for dy in range(rb, ru, 2) for dz in range(rb, ru, 2)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results = pool.starmap(try_initial_position, points_to_try)
work_good = [x for x,good in results if good]
no_work_good = [x for x,good in results if not good]
logging.info(f"work good: [{work_good}]")
logging.info(f"no work good: [{no_work_good}]")
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -1,45 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol_basinhopping(initial_position=initial_pos)
logging.info(res)
return res.pathfinder_x
def main():
logging.info("Running script...")
dot_inputs = list(
(([.5, -1, 0], numpy.power(10, f))) for f in numpy.arange(1, 3, .1)
)
logging.info(dot_inputs)
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [.51, -1.2, 0], 8)
expected_result = numpy.array([1, 2, 3, .51, -1.2, 0, 8])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs)
logging.info([(m.f, m.v) for m in dot_measurements])
logging.info([(m.f, m.v) for m in dot_measurements_with_error])
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -1,60 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol_basinhopping(initial_position=initial_pos)
logging.info(res)
return res.pathfinder_x
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([-.8, -5.1, 0], f), ([.5, -1, 0], f), ([.3, 5, 0], f), ([7.3, 6.5, 0], f)) for f in numpy.arange(1, 10, 2)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [.51, -1.2, 0], 8)
expected_result = numpy.array([1, 2, 3, .51, -1.2, 0, 8])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs)
# model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
logging.info("Finished setting up model")
results = []
rb = -2
ru = 3
interval = 3
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results = pool.starmap(try_initial_position, points_to_try)
logging.info(results)
final_values = [r for r in results if r is not None]
logging.info(final_values)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -1,75 +0,0 @@
import itertools
import logging
import multiprocessing
import numpy
import pathfinder.model.oscillating
def print_result(msg, result):
logging.info(msg)
logging.info(f"\tResult: {result.pathfinder_x}")
logging.info(f"\tSuccess: {result.success}. {result.message}")
try:
logging.info(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
logging.info(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def try_initial_position(model, expected_result, initial_pos):
res = model.sol_basinhopping(initial_position=initial_pos)
logging.info(res)
return res.pathfinder_x
def main():
logging.info("Running script...")
dot_inputs = list(itertools.chain.from_iterable(
(([-.8, -5.1, 0], f), ([.5, -1, 0], f), ([.3, 5, 0], f), ([7.3, 6.5, 0], f)) for f in numpy.arange(1, 10, 2)
))
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [7.4, 6.3, 0.02], 7)
expected_result = numpy.array([1, 2, 3, 7.4, 6.3, 0.02, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs)
# model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1)
model1 = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
model2 = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1)
logging.info("Finished setting up model")
results = []
rb = -2
ru = 3
interval = 3
points_to_try = [(model1, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results = pool.starmap(try_initial_position, points_to_try)
#logging.info(results)
final_values = [r for r in results if r is not None]
logging.info("Results without errors")
logging.info(final_values)
results2 = []
points_to_try = [(model2, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)]
logging.info(f"Will have {len(points_to_try)} points to try")
logging.info("creating pool...")
with multiprocessing.Pool() as pool:
results2 = pool.starmap(try_initial_position, points_to_try)
#logging.info(results)
final_values2 = [r for r in results2 if r is not None]
logging.info("Results without errors")
logging.info(final_values)
logging.info("Results with errors")
logging.info(final_values2)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
main()

View File

@@ -0,0 +1,83 @@
import numpy
import pathfinder.model.oscillating
import itertools
def chunk_n_sort(pts):
pt_length = 7
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
return chunked_pts
def print_result(msg, result):
print(msg)
print(f"\tResult: {result.pathfinder_x}")
# print(f"\tSuccess: {result.success}. {result.message}")
try:
print(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
print(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def test_one_dipole_six_dot_two_frequencies_bh():
# setup
dot_inputs = [
([0, 0, .01], 5), ([-1, 0, -.01], 5), ([-2, 0, -.01], 5), ([0, -1, .01], 5), ([-1, -1, 0], 5), ([-2, -1, 0], 5),
([0, 0, .01], 1), ([-1, 0, -.01], 1), ([-2, 0, -.01], 1), ([0, -1, .01], 1), ([-1, -1, 0], 1), ([-2, -1, 0], 1),
]
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
res = model.sol_basinhopping()
print_result("one oscillating dipole six dots", res)
print(res.lowest_optimization_result)
# assert res.lowest_optimization_result.success, "The solution for a single dipole and six dots should have succeeded."
numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6)
def test_one_dipole_six_dot_two_frequencies_bhbig():
# setup
dot_inputs = [
([0, 0, .01], 5), ([-1, 0, -.01], 5), ([-2, 0, -.01], 5), ([0, -1, .01], 5), ([-1, -1, 0], 5), ([-2, -1, 0], 5),
([0, 0, .01], 1), ([-1, 0, -.01], 1), ([-2, 0, -.01], 1), ([0, -1, .01], 1), ([-1, -1, 0], 1), ([-2, -1, 0], 1),
]
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
res = model.sol_basinhopping_big()
print_result("one oscillating dipole six dots", res)
print(res.lowest_optimization_result)
# assert res.lowest_optimization_result.success, "The solution for a single dipole and six dots should have succeeded."
numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6)
def test_one_dipole_four_dot_ten_frequencies_bhbig():
# setup
dot_inputs = itertools.chain.from_iterable(
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f), ([-2 + o, 0, -.01], f), ([-2 + o, -1, .01], f)) for f in numpy.arange(1, 10, .1) for o in (0, 0.2)
)
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
res = model.sol_basinhopping_big()
print_result("one oscillating dipole four dots", res)
print(res)
print(res.lowest_optimization_result)
numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6)

View File

@@ -114,7 +114,7 @@ def test_two_dipole_eighteen_dot_two_frequencies_morerealistic():
def test_one_dipole_four_dot_ten_frequencies():
# setup
dot_inputs = itertools.chain.from_iterable(
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .1) for o in (0, 0.01)
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f), ([-2 + o, 0, -.01], f), ([-2 + o, -1, .01], f)) for f in numpy.arange(1, 10, .1) for o in (0, 0.01)
)
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])

View File

@@ -1,43 +0,0 @@
import numpy
import pathfinder.model.oscillating
def chunk_n_sort(pts):
pt_length = 7
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
return chunked_pts
def print_result(msg, result):
print(msg)
print(f"\tResult: {result.pathfinder_x}")
print(f"\tSuccess: {result.success}. {result.message}")
try:
print(f"\tFunc evals: {result.nfev}")
except AttributeError:
pass
try:
print(f"\tJacb evals: {result.njev}")
except AttributeError:
pass
def test_simple_three_dipole_one_dot_frequencies():
# setup
dot_inputs = [
([0, 0, .01], f) for f in numpy.arange(1, 10, .5)
]
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
dipole2 = pathfinder.model.oscillating.OscillatingDipole([2, 5, 0], [1, 6, 1], 2)
dipole3 = pathfinder.model.oscillating.OscillatingDipole([-1, 2, 0], [-1, 2, 1], 4)
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole, dipole2, dipole3])
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 3)
res = model.simple_sol()
print_result("six oscillating dipole one dot", res)
assert res.success, "The solution for a single dipole and six dots should have succeeded."
# numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6)