diff --git a/pathfinder/model/oscillating/dot.py b/pathfinder/model/oscillating/dot.py index 9578b1b..0bea7a8 100644 --- a/pathfinder/model/oscillating/dot.py +++ b/pathfinder/model/oscillating/dot.py @@ -42,6 +42,15 @@ class DotMeasurement(): return (self._alpha(p, s))**2 * self._b(w) + def mod_factor_for_point(self, pt: numpy.ndarray) -> float: + ''' + modification factor for cost function. + ''' + s = pt[3:6] # are we'll only ever work in 3d. + + diff = self.r - s + return numpy.linalg.norm(diff)**2 + def _alpha(self, p: numpy.ndarray, s: numpy.ndarray) -> float: diff = self.r - s return p.dot(diff) / (numpy.linalg.norm(diff)**3) @@ -57,6 +66,23 @@ class DotMeasurement(): chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)] return sum(self.v_for_point(pt) for pt in chunked_pts) - self.v + def cost2(self, pts: numpy.ndarray) -> float: + # 7 because dipole in 3d has 7 degrees of freedom. + pt_length = 7 + # creates numpy.ndarrays in groups of pt_length. + # Will throw problems for irregular points, but that's okay for now. + chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)] + + mod_factor = numpy.prod([self.mod_factor_for_point(pt) for pt in chunked_pts]) + return (sum(self.v_for_point(pt) for pt in chunked_pts) - self.v) * mod_factor + + def simple_cost(self, pts: numpy.ndarray) -> float: + # for reduced case, a is constant + pt_length = 2 + + chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)] + return sum(pt[0] * self._b(pt[1]) for pt in chunked_pts) - self.v + def jac_pt(self, pt: numpy.ndarray) -> numpy.ndarray: p = pt[0:3] # hardcoded here because chances s = pt[3:6] # are we'll only ever work in 3d. @@ -77,6 +103,39 @@ class DotMeasurement(): return numpy.concatenate((p_divs, r_divs, w_div), axis=None) + def jac_pt2(self, pt: numpy.ndarray, cost, mod_factor: float) -> numpy.ndarray: + p = pt[0:3] # hardcoded here because chances + s = pt[3:6] # are we'll only ever work in 3d. + w = pt[6] + + diff = self.r - s + alpha = self._alpha(p, s) + b = self._b(w) + + p_divs = (2 * alpha * diff / (numpy.linalg.norm(diff)**3) * b) * mod_factor + + s_divs = ((-p / (numpy.linalg.norm(diff)**3) + 3 * p.dot(diff) * diff / (numpy.linalg.norm(diff)**5)) * 2 * alpha * b) * mod_factor + s_divs_mod = -2 * cost * mod_factor * diff / numpy.linalg.norm(diff)**2 + + f2 = self.f**2 + w2 = w**2 + + w_div = (alpha**2 * (1 / numpy.pi) * ((f2 - w2) / ((f2 + w2)**2))) * mod_factor + + return numpy.concatenate((p_divs, s_divs + s_divs_mod, w_div), axis=None) + + def simple_jac_pt(self, pt: numpy.ndarray) -> numpy.ndarray: + a = pt[0] + w = pt[1] + + f2 = self.f**2 + w2 = w**2 + + b = self._b(w) + w_div = a * (1 / numpy.pi) * ((f2 - w2) / ((f2 + w2)**2)) + + return numpy.concatenate((b, w_div), axis=None) + def jac(self, pts: numpy.ndarray) -> numpy.ndarray: # 7 because oscillating dipole in 3d has 7 degrees of freedom. pt_length = 7 @@ -85,3 +144,20 @@ class DotMeasurement(): chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)] return numpy.append([], [self.jac_pt(pt) for pt in chunked_pts]) + + def jac2(self, pts: numpy.ndarray) -> numpy.ndarray: + # 7 because oscillating dipole in 3d has 7 degrees of freedom. + pt_length = 7 + # creates numpy.ndarrays in groups of pt_length. + # Will throw problems for irregular points, but that's okay for now. + chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)] + + cost = self.cost(pts) + mod_factor = numpy.prod([self.mod_factor_for_point(pt) for pt in chunked_pts]) + return numpy.append([], [self.jac_pt2(pt, cost, mod_factor) for pt in chunked_pts]) + + def simple_jac(self, pts: numpy.ndarray) -> numpy.ndarray: + pt_length = 2 + chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)] + + return numpy.append([], [self.simple_jac_pt(pt) for pt in chunked_pts]) diff --git a/pathfinder/model/oscillating/model.py b/pathfinder/model/oscillating/model.py index c06bfa1..42941ae 100644 --- a/pathfinder/model/oscillating/model.py +++ b/pathfinder/model/oscillating/model.py @@ -31,15 +31,96 @@ class DotOscillatingDipoleModel(): return costs_to_return + def costs2(self) -> Callable[[numpy.ndarray], numpy.ndarray]: + def costs_to_return(pt: numpy.ndarray) -> numpy.ndarray: + return numpy.array([dot.cost2(pt) for dot in self.dots]) + + return costs_to_return + + def simple_costs(self) -> Callable[[numpy.ndarray], numpy.ndarray]: + def costs_to_return(pt: numpy.ndarray) -> numpy.ndarray: + return numpy.array([dot.simple_cost(pt) for dot in self.dots]) + + return costs_to_return + def jac(self) -> Callable[[numpy.ndarray], numpy.ndarray]: def jac_to_return(pts: numpy.ndarray) -> numpy.ndarray: return numpy.array([dot.jac(pts) for dot in self.dots]) return jac_to_return + def jac2(self) -> Callable[[numpy.ndarray], numpy.ndarray]: + def jac_to_return(pts: numpy.ndarray) -> numpy.ndarray: + return numpy.array([dot.jac2(pts) for dot in self.dots]) + + return jac_to_return + + def simple_jac(self) -> Callable[[numpy.ndarray], numpy.ndarray]: + def jac_to_return(pts: numpy.ndarray) -> numpy.ndarray: + return numpy.array([dot.simple_jac(pts) for dot in self.dots]) + + return jac_to_return + def sol(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1, use_root=True): initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n) result = scipy.optimize.least_squares(self.costs(), initial, jac=self.jac(), ftol=1e-15, gtol=3e-16) result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x) return result + + def sol2(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1, use_root=True): + initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n) + + result = scipy.optimize.least_squares(self.costs2(), initial, jac=self.jac2(), ftol=1e-15, gtol=3e-16) + result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x) + return result + + def simple_sol(self, initial_a=0.1, initial_frequency=1): + initial = numpy.tile(numpy.concatenate((initial_a, initial_frequency), axis=None), self.n) + + result = scipy.optimize.least_squares(self.simple_costs(), initial, jac=self.simple_jac(), ftol=1e-15, gtol=3e-16) + result.pathfinder_x = result.x + return result + + def sol_simple(self, costs, initial, **kwargs): + initial = numpy.tile(numpy.concatenate(initial, axis=None), self.n) + + result = scipy.optimize.least_squares(self.costs(), initial, jac=kwargs["jac"], ftol=kwargs["ftol"], gtol=kwargs["gtol"]) + result.old_fun = result.fun + result.fun = numpy.sum(result.fun**2, axis=None) + result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x) + return result + + def sol_basinhopping(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1): + initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n) + + def summed_costs(pt): + curr_cost = self.costs()(pt) + + squared_cost = numpy.sum(curr_cost**2, axis=None) + gradient = .5 * numpy.matmul(numpy.transpose(self.jac()(pt)), curr_cost) + + return (squared_cost, gradient) + + minimizer_kwargs = {"method": "BFGS", "jac": True} + + result = scipy.optimize.basinhopping(summed_costs, initial, niter=1000, minimizer_kwargs=minimizer_kwargs) + + result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x) + return result + + def sol_basinhopping_big(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1): + initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n) + + minimizer_kwargs = { + "method": self.sol_simple, + "jac": self.jac(), + "options": { + "ftol": 1e-15, + "gtol": 3e-1, + } + } + + result = scipy.optimize.basinhopping(self.costs(), initial, niter=1000, minimizer_kwargs=minimizer_kwargs) + result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x) + return result diff --git a/scripts/initial_point_map-middledip-o-0.5-clusters-basinhopping.py b/scripts/initial_point_map-middledip-o-0.5-clusters-basinhopping.py new file mode 100644 index 0000000..40bf2af --- /dev/null +++ b/scripts/initial_point_map-middledip-o-0.5-clusters-basinhopping.py @@ -0,0 +1,57 @@ +import itertools +import logging +import multiprocessing +import numpy +import pathfinder.model.oscillating + + +def print_result(msg, result): + logging.info(msg) + logging.info(f"\tResult: {result.pathfinder_x}") + logging.info(f"\tSuccess: {result.success}. {result.message}") + try: + logging.info(f"\tFunc evals: {result.nfev}") + except AttributeError: + pass + try: + logging.info(f"\tJacb evals: {result.njev}") + except AttributeError: + pass + + +def try_initial_position(model, expected_result, initial_pos): + res = model.sol_basinhopping(initial_position=initial_pos) + + return res.pathfinder_x + + +def main(): + logging.info("Running script...") + dot_inputs = list(itertools.chain.from_iterable( + (([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .05) for o in (0, 0.5) + )) + + dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7) + expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7]) + dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole]) + dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs) + + model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1) + logging.info("Finished setting up model") + + results = [] + rb = -4 + ru = 5 + points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru, 3) for dy in range(rb, ru, 3) for dz in range(rb, ru, 2)] + logging.info(f"Will have {len(points_to_try)} points to try") + logging.info("creating pool...") + with multiprocessing.Pool() as pool: + results = pool.starmap(try_initial_position, points_to_try) + logging.info(results) + final_values = [r for r in results if r is not None] + logging.info(final_values) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main() diff --git a/tests/model/oscillating/test_basinhopping.py b/tests/model/oscillating/test_basinhopping.py new file mode 100644 index 0000000..aabf4ed --- /dev/null +++ b/tests/model/oscillating/test_basinhopping.py @@ -0,0 +1,83 @@ +import numpy +import pathfinder.model.oscillating +import itertools + + +def chunk_n_sort(pts): + pt_length = 7 + chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)] + + return chunked_pts + + +def print_result(msg, result): + print(msg) + print(f"\tResult: {result.pathfinder_x}") + # print(f"\tSuccess: {result.success}. {result.message}") + try: + print(f"\tFunc evals: {result.nfev}") + except AttributeError: + pass + try: + print(f"\tJacb evals: {result.njev}") + except AttributeError: + pass + + +def test_one_dipole_six_dot_two_frequencies_bh(): + # setup + dot_inputs = [ + ([0, 0, .01], 5), ([-1, 0, -.01], 5), ([-2, 0, -.01], 5), ([0, -1, .01], 5), ([-1, -1, 0], 5), ([-2, -1, 0], 5), + ([0, 0, .01], 1), ([-1, 0, -.01], 1), ([-2, 0, -.01], 1), ([0, -1, .01], 1), ([-1, -1, 0], 1), ([-2, -1, 0], 1), + ] + dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7) + expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7]) + dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole]) + dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs) + + model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1) + res = model.sol_basinhopping() + + print_result("one oscillating dipole six dots", res) + print(res.lowest_optimization_result) + # assert res.lowest_optimization_result.success, "The solution for a single dipole and six dots should have succeeded." + numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6) + + +def test_one_dipole_six_dot_two_frequencies_bhbig(): + # setup + dot_inputs = [ + ([0, 0, .01], 5), ([-1, 0, -.01], 5), ([-2, 0, -.01], 5), ([0, -1, .01], 5), ([-1, -1, 0], 5), ([-2, -1, 0], 5), + ([0, 0, .01], 1), ([-1, 0, -.01], 1), ([-2, 0, -.01], 1), ([0, -1, .01], 1), ([-1, -1, 0], 1), ([-2, -1, 0], 1), + ] + dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7) + expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7]) + dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole]) + dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs) + + model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1) + res = model.sol_basinhopping_big() + + print_result("one oscillating dipole six dots", res) + print(res.lowest_optimization_result) + # assert res.lowest_optimization_result.success, "The solution for a single dipole and six dots should have succeeded." + numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6) + + +def test_one_dipole_four_dot_ten_frequencies_bhbig(): + # setup + dot_inputs = itertools.chain.from_iterable( + (([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f), ([-2 + o, 0, -.01], f), ([-2 + o, -1, .01], f)) for f in numpy.arange(1, 10, .1) for o in (0, 0.2) + ) + dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7) + expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7]) + dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole]) + dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs) + + model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1) + res = model.sol_basinhopping_big() + + print_result("one oscillating dipole four dots", res) + print(res) + print(res.lowest_optimization_result) + numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6) diff --git a/tests/model/oscillating/test_osc_model_sol.py b/tests/model/oscillating/test_osc_model_sol.py index 8e1d6dc..964ba95 100644 --- a/tests/model/oscillating/test_osc_model_sol.py +++ b/tests/model/oscillating/test_osc_model_sol.py @@ -1,5 +1,7 @@ import numpy import pathfinder.model.oscillating +import itertools +import pytest def chunk_n_sort(pts): @@ -107,3 +109,83 @@ def test_two_dipole_eighteen_dot_two_frequencies_morerealistic(): print_result("two oscillating dipole six dots", res) assert res.success, "The solution for two dipole and six dots should have succeeded." numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6) + + +def test_one_dipole_four_dot_ten_frequencies(): + # setup + dot_inputs = itertools.chain.from_iterable( + (([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f), ([-2 + o, 0, -.01], f), ([-2 + o, -1, .01], f)) for f in numpy.arange(1, 10, .1) for o in (0, 0.01) + ) + dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7) + expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7]) + dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole]) + dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs) + + model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1) + res = model.sol(initial_position=(.1, .1, .1)) + + print_result("one oscillating dipole four dots", res) + print(model.jac()(res.x)) + assert res.success, "The solution for one dipole and four dots should have succeeded." + numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6) + + +def test_two_dipole_four_dot_ten_frequencies(): + # setup + dot_inputs = itertools.chain.from_iterable( + (([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in range(1, 10) for o in (0, .5) + ) + dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7) + dipole2 = pathfinder.model.oscillating.OscillatingDipole([-1, 2, 0], [-1, 2, 1], 4) + expected_result = numpy.array([1, -2, 0, -1, 2, 1, 4, 1, 2, 3, 0, 4, -1, 7]) + dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole, dipole2]) + dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs) + + model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 2) + res = model.sol() + + print_result("two oscillating dipole four dots", res) + assert res.success, "The solution for two dipole and two dots should have succeeded." + numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6) + + +@pytest.mark.skip(reason="Never actually works.") +def test_three_dipole_four_dot_ten_frequencies_wrongcount(): + # setup + dot_inputs = itertools.chain.from_iterable( + (([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in range(1, 10) for o in (0, .5) + ) + dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7) + dipole2 = pathfinder.model.oscillating.OscillatingDipole([2, 5, 0], [1, 6, 1], 2) + dipole3 = pathfinder.model.oscillating.OscillatingDipole([-1, 2, 0], [-1, 2, 1], 4) + expected_result = numpy.array([2, 5, 0, 1, 6, 1, 2, 1, -2, 0, -1, 2, 1, 4, 1, 2, 3, 0, 4, -1, 7]) + dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole, dipole2, dipole3]) + dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs) + + model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 2) + res = model.sol() + + print_result("three but thinks two oscillating dipole four dots", res) + # assert res.success, "The solution for two dipole and two dots should have succeeded." + numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6) + + +@pytest.mark.skip(reason="Never actually works.") +def test_three_dipole_four_dot_twenty_frequencies_rightcount(): + # setup + dot_inputs = itertools.chain.from_iterable( + (([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(0.2, 10, .2) for o in (0, .5) + ) + dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7) + dipole2 = pathfinder.model.oscillating.OscillatingDipole([2, 5, 0], [1, 6, 1], 2) + dipole3 = pathfinder.model.oscillating.OscillatingDipole([-1, 2, 0], [-1, 2, 1], 4) + expected_result = numpy.array([2, 5, 0, 1, 6, 1, 2, 1, -2, 0, -1, 2, 1, 4, 1, 2, 3, 0, 4, -1, 7]) + dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole, dipole2, dipole3]) + dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs) + + model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 3) + res = model.sol(initial_position=(.1, 3, .1)) + + print_result("three oscillating dipole four dots", res) + # assert res.success, "The solution for two dipole and two dots should have succeeded." + numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6)