trash #1
2
do.sh
2
do.sh
@ -11,7 +11,7 @@ build() {
|
||||
|
||||
test() {
|
||||
echo "I am ${FUNCNAME[0]}ing"
|
||||
poetry run flake8
|
||||
poetry run flake8 pathfinder tests
|
||||
poetry run mypy pathfinder
|
||||
poetry run pytest
|
||||
}
|
||||
|
@ -42,6 +42,15 @@ class DotMeasurement():
|
||||
|
||||
return (self._alpha(p, s))**2 * self._b(w)
|
||||
|
||||
def mod_factor_for_point(self, pt: numpy.ndarray) -> float:
|
||||
'''
|
||||
modification factor for cost function.
|
||||
'''
|
||||
s = pt[3:6] # are we'll only ever work in 3d.
|
||||
|
||||
diff = self.r - s
|
||||
return numpy.linalg.norm(diff)**2
|
||||
|
||||
def _alpha(self, p: numpy.ndarray, s: numpy.ndarray) -> float:
|
||||
diff = self.r - s
|
||||
return p.dot(diff) / (numpy.linalg.norm(diff)**3)
|
||||
@ -57,6 +66,23 @@ class DotMeasurement():
|
||||
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
|
||||
return sum(self.v_for_point(pt) for pt in chunked_pts) - self.v
|
||||
|
||||
def cost2(self, pts: numpy.ndarray) -> float:
|
||||
# 7 because dipole in 3d has 7 degrees of freedom.
|
||||
pt_length = 7
|
||||
# creates numpy.ndarrays in groups of pt_length.
|
||||
# Will throw problems for irregular points, but that's okay for now.
|
||||
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
|
||||
|
||||
mod_factor = numpy.prod([self.mod_factor_for_point(pt) for pt in chunked_pts])
|
||||
return (sum(self.v_for_point(pt) for pt in chunked_pts) - self.v) * mod_factor
|
||||
|
||||
def simple_cost(self, pts: numpy.ndarray) -> float:
|
||||
# for reduced case, a is constant
|
||||
pt_length = 2
|
||||
|
||||
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
|
||||
return sum(pt[0] * self._b(pt[1]) for pt in chunked_pts) - self.v
|
||||
|
||||
def jac_pt(self, pt: numpy.ndarray) -> numpy.ndarray:
|
||||
p = pt[0:3] # hardcoded here because chances
|
||||
s = pt[3:6] # are we'll only ever work in 3d.
|
||||
@ -77,6 +103,39 @@ class DotMeasurement():
|
||||
|
||||
return numpy.concatenate((p_divs, r_divs, w_div), axis=None)
|
||||
|
||||
def jac_pt2(self, pt: numpy.ndarray, cost, mod_factor: float) -> numpy.ndarray:
|
||||
p = pt[0:3] # hardcoded here because chances
|
||||
s = pt[3:6] # are we'll only ever work in 3d.
|
||||
w = pt[6]
|
||||
|
||||
diff = self.r - s
|
||||
alpha = self._alpha(p, s)
|
||||
b = self._b(w)
|
||||
|
||||
p_divs = (2 * alpha * diff / (numpy.linalg.norm(diff)**3) * b) * mod_factor
|
||||
|
||||
s_divs = ((-p / (numpy.linalg.norm(diff)**3) + 3 * p.dot(diff) * diff / (numpy.linalg.norm(diff)**5)) * 2 * alpha * b) * mod_factor
|
||||
s_divs_mod = -2 * cost * mod_factor * diff / numpy.linalg.norm(diff)**2
|
||||
|
||||
f2 = self.f**2
|
||||
w2 = w**2
|
||||
|
||||
w_div = (alpha**2 * (1 / numpy.pi) * ((f2 - w2) / ((f2 + w2)**2))) * mod_factor
|
||||
|
||||
return numpy.concatenate((p_divs, s_divs + s_divs_mod, w_div), axis=None)
|
||||
|
||||
def simple_jac_pt(self, pt: numpy.ndarray) -> numpy.ndarray:
|
||||
a = pt[0]
|
||||
w = pt[1]
|
||||
|
||||
f2 = self.f**2
|
||||
w2 = w**2
|
||||
|
||||
b = self._b(w)
|
||||
w_div = a * (1 / numpy.pi) * ((f2 - w2) / ((f2 + w2)**2))
|
||||
|
||||
return numpy.concatenate((b, w_div), axis=None)
|
||||
|
||||
def jac(self, pts: numpy.ndarray) -> numpy.ndarray:
|
||||
# 7 because oscillating dipole in 3d has 7 degrees of freedom.
|
||||
pt_length = 7
|
||||
@ -85,3 +144,20 @@ class DotMeasurement():
|
||||
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
|
||||
|
||||
return numpy.append([], [self.jac_pt(pt) for pt in chunked_pts])
|
||||
|
||||
def jac2(self, pts: numpy.ndarray) -> numpy.ndarray:
|
||||
# 7 because oscillating dipole in 3d has 7 degrees of freedom.
|
||||
pt_length = 7
|
||||
# creates numpy.ndarrays in groups of pt_length.
|
||||
# Will throw problems for irregular points, but that's okay for now.
|
||||
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
|
||||
|
||||
cost = self.cost(pts)
|
||||
mod_factor = numpy.prod([self.mod_factor_for_point(pt) for pt in chunked_pts])
|
||||
return numpy.append([], [self.jac_pt2(pt, cost, mod_factor) for pt in chunked_pts])
|
||||
|
||||
def simple_jac(self, pts: numpy.ndarray) -> numpy.ndarray:
|
||||
pt_length = 2
|
||||
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
|
||||
|
||||
return numpy.append([], [self.simple_jac_pt(pt) for pt in chunked_pts])
|
||||
|
@ -3,6 +3,7 @@ import numpy
|
||||
import scipy.optimize
|
||||
from pathfinder.model.oscillating.dot import DotMeasurement
|
||||
import pathfinder.model.oscillating.util
|
||||
import logging
|
||||
|
||||
|
||||
class DotOscillatingDipoleModel():
|
||||
@ -31,15 +32,87 @@ class DotOscillatingDipoleModel():
|
||||
|
||||
return costs_to_return
|
||||
|
||||
def costs2(self) -> Callable[[numpy.ndarray], numpy.ndarray]:
|
||||
def costs_to_return(pt: numpy.ndarray) -> numpy.ndarray:
|
||||
return numpy.array([dot.cost2(pt) for dot in self.dots])
|
||||
|
||||
return costs_to_return
|
||||
|
||||
def simple_costs(self) -> Callable[[numpy.ndarray], numpy.ndarray]:
|
||||
def costs_to_return(pt: numpy.ndarray) -> numpy.ndarray:
|
||||
return numpy.array([dot.simple_cost(pt) for dot in self.dots])
|
||||
|
||||
return costs_to_return
|
||||
|
||||
def jac(self) -> Callable[[numpy.ndarray], numpy.ndarray]:
|
||||
def jac_to_return(pts: numpy.ndarray) -> numpy.ndarray:
|
||||
return numpy.array([dot.jac(pts) for dot in self.dots])
|
||||
|
||||
return jac_to_return
|
||||
|
||||
def jac2(self) -> Callable[[numpy.ndarray], numpy.ndarray]:
|
||||
def jac_to_return(pts: numpy.ndarray) -> numpy.ndarray:
|
||||
return numpy.array([dot.jac2(pts) for dot in self.dots])
|
||||
|
||||
return jac_to_return
|
||||
|
||||
def simple_jac(self) -> Callable[[numpy.ndarray], numpy.ndarray]:
|
||||
def jac_to_return(pts: numpy.ndarray) -> numpy.ndarray:
|
||||
return numpy.array([dot.simple_jac(pts) for dot in self.dots])
|
||||
|
||||
return jac_to_return
|
||||
|
||||
def sol(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1, use_root=True):
|
||||
initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n)
|
||||
|
||||
result = scipy.optimize.least_squares(self.costs(), initial, jac=self.jac(), ftol=1e-15, gtol=3e-16)
|
||||
result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x)
|
||||
return result
|
||||
|
||||
def sol_minim(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1, use_root=True):
|
||||
initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n)
|
||||
|
||||
cached_costs = self.costs()
|
||||
cached_jacobian = self.jac()
|
||||
|
||||
def summed(pt):
|
||||
current_cost = cached_costs(pt)
|
||||
curr_jac = cached_jacobian(pt)
|
||||
curr_gradient = .5 * numpy.matmul(numpy.transpose(curr_jac), current_cost)
|
||||
return (numpy.sum(current_cost**2), curr_gradient)
|
||||
|
||||
result = scipy.optimize.minimize(summed, initial, jac=True, tol=1e-16, options={"gtol": 1e-20})
|
||||
result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x)
|
||||
return result
|
||||
|
||||
def sol_basinhopping(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1, use_root=True):
|
||||
initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n)
|
||||
|
||||
cached_costs = self.costs()
|
||||
cached_jacobian = self.jac()
|
||||
|
||||
def summed(pt):
|
||||
current_cost = cached_costs(pt)
|
||||
curr_jac = cached_jacobian(pt)
|
||||
curr_gradient = .5 * numpy.matmul(numpy.transpose(curr_jac), current_cost)
|
||||
return (numpy.sum(current_cost**2), curr_gradient)
|
||||
|
||||
minimizer_kwargs = {"method": "L-BFGS-B", "jac": True, "tol": 1e-16, "options": {"gtol": 1e-16}}
|
||||
|
||||
result = scipy.optimize.basinhopping(summed, initial, minimizer_kwargs=minimizer_kwargs, niter=100)
|
||||
result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x)
|
||||
return result
|
||||
|
||||
def sol2(self, initial_dipole=(0.1, 0.1, 0.1), initial_position=(.1, .1, .1), initial_frequency=1, use_root=True):
|
||||
initial = numpy.tile(numpy.concatenate((initial_dipole, initial_position, initial_frequency), axis=None), self.n)
|
||||
|
||||
result = scipy.optimize.least_squares(self.costs2(), initial, jac=self.jac2(), ftol=1e-15, gtol=3e-16)
|
||||
result.pathfinder_x = pathfinder.model.oscillating.util.normalize_oscillating_dipole_list(result.x)
|
||||
return result
|
||||
|
||||
def simple_sol(self, initial_a=0.1, initial_frequency=1):
|
||||
initial = numpy.tile(numpy.concatenate((initial_a, initial_frequency), axis=None), self.n)
|
||||
|
||||
result = scipy.optimize.least_squares(self.simple_costs(), initial, jac=self.simple_jac(), ftol=1e-15, gtol=3e-16)
|
||||
result.pathfinder_x = result.x
|
||||
return result
|
||||
|
@ -62,8 +62,9 @@ class OscillatingDipoleArrangement():
|
||||
--------
|
||||
dipoles : Sequence[OscillatingDipole]
|
||||
'''
|
||||
def __init__(self, dipoles: Sequence[OscillatingDipole]):
|
||||
def __init__(self, dipoles: Sequence[OscillatingDipole], seed=12345):
|
||||
self.dipoles = dipoles
|
||||
self.rng = numpy.random.default_rng(seed)
|
||||
|
||||
def get_dot_measurement(self, dot_input: DotInput) -> DotMeasurement:
|
||||
r = numpy.array(dot_input[0])
|
||||
@ -75,3 +76,18 @@ class OscillatingDipoleArrangement():
|
||||
For a series of points, each with three coordinates and a frequency, return a list of the corresponding DotMeasurements.
|
||||
'''
|
||||
return [self.get_dot_measurement(dot_input) for dot_input in dot_inputs]
|
||||
|
||||
def get_dot_measurements_with_random(self, dot_inputs: Sequence[DotInput]) -> List[DotMeasurement]:
|
||||
'''
|
||||
For a series of points, each with three coordinates and a frequency, return a list of the corresponding DotMeasurements.
|
||||
'''
|
||||
def apply_random(input: DotInput, error_size: float) -> DotMeasurement:
|
||||
actual_measurement = self.get_dot_measurement(input)
|
||||
errored = DotMeasurement(actual_measurement.v * error_size, actual_measurement.r, actual_measurement.f)
|
||||
errored.error = error_size
|
||||
errored.original_v = actual_measurement.v
|
||||
return errored
|
||||
|
||||
random = (self.rng.random(len(dot_inputs)) * .3) + .85
|
||||
|
||||
return [apply_random(*inputs) for inputs in zip(dot_inputs, random)]
|
||||
|
@ -16,4 +16,4 @@ def flip_chunk_to_positive_px(pt: numpy.ndarray) -> numpy.ndarray:
|
||||
def normalize_oscillating_dipole_list(pts: numpy.ndarray) -> numpy.ndarray:
|
||||
pt_length = 7
|
||||
chunked_pts = [flip_chunk_to_positive_px(pts[i: i + pt_length]) for i in range(0, len(pts), pt_length)]
|
||||
return numpy.concatenate(sorted(chunked_pts, key=lambda x: tuple(round(val, 3) for val in operator.itemgetter(0, 1, 2, 3, 4, 5, 6)(x))), axis=None)
|
||||
return numpy.concatenate(sorted(chunked_pts, key=lambda x: tuple(round(val, 3) for val in operator.itemgetter(6, 0, 1, 2, 3, 4, 5)(x))), axis=None)
|
||||
|
56
scripts/bh-clusters-1.py
Normal file
56
scripts/bh-clusters-1.py
Normal file
@ -0,0 +1,56 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def try_initial_position(model, expected_result, initial_pos):
|
||||
res = model.sol_basinhopping(initial_position=initial_pos)
|
||||
return res.pathfinder_x
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
dot_inputs = list(itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .5) for o in (0, 0.5)
|
||||
))
|
||||
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
logging.info("Finished setting up model")
|
||||
|
||||
results = []
|
||||
rb = -4
|
||||
ru = 5
|
||||
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, 2) for dy in numpy.arange(rb, ru, 2) for dz in range(rb, ru, 2)]
|
||||
logging.info(f"Will have {len(points_to_try)} points to try")
|
||||
logging.info("creating pool...")
|
||||
with multiprocessing.Pool() as pool:
|
||||
results = pool.starmap(try_initial_position, points_to_try)
|
||||
logging.info(results)
|
||||
final_values = [r for r in results if r is not None]
|
||||
logging.info(final_values)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
56
scripts/bh-clusters-skewedl.py
Normal file
56
scripts/bh-clusters-skewedl.py
Normal file
@ -0,0 +1,56 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def try_initial_position(model, expected_result, initial_pos):
|
||||
res = model.sol_basinhopping(initial_position=initial_pos)
|
||||
return res.pathfinder_x
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
dot_inputs = list(itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .5) for o in (0, 0.5)
|
||||
))
|
||||
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
logging.info("Finished setting up model")
|
||||
|
||||
results = []
|
||||
rb = -4
|
||||
ru = 5
|
||||
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, 2) for dy in numpy.arange(rb, ru, 2) for dz in range(rb, ru, 2)]
|
||||
logging.info(f"Will have {len(points_to_try)} points to try")
|
||||
logging.info("creating pool...")
|
||||
with multiprocessing.Pool() as pool:
|
||||
results = pool.starmap(try_initial_position, points_to_try)
|
||||
logging.info(results)
|
||||
final_values = [r for r in results if r is not None]
|
||||
logging.info(final_values)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
50
scripts/bh1-minim.py
Normal file
50
scripts/bh1-minim.py
Normal file
@ -0,0 +1,50 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
|
||||
dot_inputs = [
|
||||
([0, 0, .01], 5), ([-1, 0, -.01], 5), ([-2, 0, -.01], 5), ([0, -1, .01], 5), ([-1, -1, 0], 5), ([-2, -1, 0], 5),
|
||||
([0, 0, .01], 1), ([-1, 0, -.01], 1), ([-2, 0, -.01], 1), ([0, -1, .01], 1), ([-1, -1, 0], 1), ([-2, -1, 0], 1),
|
||||
]
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
res = model.sol_minim()
|
||||
|
||||
logging.info("Result:")
|
||||
logging.info(res)
|
||||
logging.info(model.costs()(res.pathfinder_x))
|
||||
logging.info(model.jac()(res.pathfinder_x))
|
||||
|
||||
current_cost = model.costs()(res.pathfinder_x)
|
||||
curr_jac = model.jac()(res.pathfinder_x)
|
||||
curr_gradient = .5 * numpy.matmul(numpy.transpose(curr_jac), current_cost)
|
||||
logging.info((numpy.sum(current_cost**2), curr_gradient))
|
||||
logging.info(numpy.sum(model.costs()(res.pathfinder_x + curr_gradient)**2))
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
42
scripts/bh1.py
Normal file
42
scripts/bh1.py
Normal file
@ -0,0 +1,42 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
|
||||
dot_inputs = [
|
||||
([0, 0, .01], 5), ([-1, 0, -.01], 5), ([-2, 0, -.01], 5), ([0, -1, .01], 5), ([-1, -1, 0], 5), ([-2, -1, 0], 5),
|
||||
([0, 0, .01], 1), ([-1, 0, -.01], 1), ([-2, 0, -.01], 1), ([0, -1, .01], 1), ([-1, -1, 0], 1), ([-2, -1, 0], 1),
|
||||
]
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
res = model.sol_basinhopping()
|
||||
|
||||
logging.info("Result:")
|
||||
logging.info(res)
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
@ -0,0 +1,59 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def try_initial_position(model, expected_result, initial_pos):
|
||||
res = model.sol2(initial_position=initial_pos)
|
||||
if res.success:
|
||||
return res.pathfinder_x
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
dot_inputs = list(itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .05) for o in (0, 0.5)
|
||||
))
|
||||
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
logging.info("Finished setting up model")
|
||||
|
||||
results = []
|
||||
rb = -3
|
||||
ru = 4
|
||||
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru, 2) for dy in range(rb, ru, 2) for dz in range(rb, ru, 2)]
|
||||
logging.info(f"Will have {len(points_to_try)} points to try")
|
||||
logging.info("creating pool...")
|
||||
with multiprocessing.Pool() as pool:
|
||||
results = pool.starmap(try_initial_position, points_to_try)
|
||||
logging.info(results)
|
||||
final_values = [r for r in results if r is not None]
|
||||
logging.info(final_values)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
59
scripts/initial_point_map-middledip-o-0.5-clusters.py
Normal file
59
scripts/initial_point_map-middledip-o-0.5-clusters.py
Normal file
@ -0,0 +1,59 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def try_initial_position(model, expected_result, initial_pos):
|
||||
res = model.sol(initial_position=initial_pos)
|
||||
if res.success:
|
||||
return res.pathfinder_x
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
dot_inputs = list(itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .05) for o in (0, 0.5)
|
||||
))
|
||||
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
logging.info("Finished setting up model")
|
||||
|
||||
results = []
|
||||
rb = -4
|
||||
ru = 5
|
||||
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru, 2) for dy in range(rb, ru, 2) for dz in range(rb, ru, 2)]
|
||||
logging.info(f"Will have {len(points_to_try)} points to try")
|
||||
logging.info("creating pool...")
|
||||
with multiprocessing.Pool() as pool:
|
||||
results = pool.starmap(try_initial_position, points_to_try)
|
||||
logging.info(results)
|
||||
final_values = [r for r in results if r is not None]
|
||||
logging.info(final_values)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
66
scripts/initial_point_map-middledip-o-0.5-jac.py
Normal file
66
scripts/initial_point_map-middledip-o-0.5-jac.py
Normal file
@ -0,0 +1,66 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def try_initial_position(model, expected_result, initial_pos):
|
||||
res = model.sol(initial_position=initial_pos)
|
||||
if res.success:
|
||||
return res.pathfinder_x
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
dot_inputs = list(itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .05) for o in (0, 0.5)
|
||||
))
|
||||
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
logging.info("Finished setting up model")
|
||||
|
||||
vals = [
|
||||
[1.61794, 1.96704, -4.00528, -0.0784033, 0.316984, 1.0257, 7],
|
||||
[8.18934, 2.86237, 15.2491, -0.695465, 1.11861, -0.306364, 7],
|
||||
[0.7485, -2.17157, 35.6347, 0.353949, 0.487858, 0.0554771, 7],
|
||||
[2.11867, -0.227147, -48.9911, -0.180317, 0.521937, 0.00989036, 7],
|
||||
[0.0930268, 3.32614, 1.69929, 0.121074, -0.230094, -0.872652, 7],
|
||||
[1, 2, 3, 0, 0, -1, 7],
|
||||
[1, 2, 3, 0, 0, -1, 8],
|
||||
[2, 2, 3, 0, 0, -1, 7]
|
||||
]
|
||||
jac = model.jac()
|
||||
costs = model.costs()
|
||||
for val in vals:
|
||||
j = jac(numpy.array(val))
|
||||
c = costs(numpy.array(val))
|
||||
logging.info(f"looking at {val}")
|
||||
# logging.info(f"jac: [{j}]")
|
||||
# logging.info(f"cost: [{c}]")
|
||||
logging.info(f"gradient: {numpy.matmul(numpy.transpose(j), c)}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
60
scripts/initial_point_map-middledip-o-0.5.py
Normal file
60
scripts/initial_point_map-middledip-o-0.5.py
Normal file
@ -0,0 +1,60 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def try_initial_position(model, expected_result, initial_pos):
|
||||
res = model.sol(initial_position=initial_pos)
|
||||
# print_result(f"{initial_pos}", res)
|
||||
return (initial_pos, all(numpy.isclose(res.pathfinder_x, expected_result, rtol=1e-6, atol=1e-6)))
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
dot_inputs = list(itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .05) for o in (0, 0.5)
|
||||
))
|
||||
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 0, 0, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
logging.info("Finished setting up model")
|
||||
|
||||
work_good = []
|
||||
no_work_good = []
|
||||
rb = -4
|
||||
ru = 5
|
||||
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru, 2) for dy in range(rb, ru, 2) for dz in range(rb, ru, 2)]
|
||||
logging.info(f"Will have {len(points_to_try)} points to try")
|
||||
logging.info("creating pool...")
|
||||
with multiprocessing.Pool() as pool:
|
||||
results = pool.starmap(try_initial_position, points_to_try)
|
||||
|
||||
work_good = [x for x,good in results if good]
|
||||
no_work_good = [x for x,good in results if not good]
|
||||
|
||||
logging.info(f"work good: [{work_good}]")
|
||||
logging.info(f"no work good: [{no_work_good}]")
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
59
scripts/initial_point_map_1.py
Normal file
59
scripts/initial_point_map_1.py
Normal file
@ -0,0 +1,59 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def try_initial_position(model, expected_result, initial_pos):
|
||||
res = model.sol(initial_position=initial_pos)
|
||||
return (initial_pos, all(numpy.isclose(res.pathfinder_x, expected_result)))
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
dot_inputs = list(itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .1) for o in (0, 0.01)
|
||||
))
|
||||
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
logging.info("Finished setting up model")
|
||||
|
||||
work_good = []
|
||||
no_work_good = []
|
||||
rb = -4
|
||||
ru = 5
|
||||
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru) for dy in range(rb, ru) for dz in range(rb, ru)]
|
||||
logging.info(f"Will have {len(points_to_try)} points to try")
|
||||
logging.info("creating pool...")
|
||||
with multiprocessing.Pool() as pool:
|
||||
results = pool.starmap(try_initial_position, points_to_try)
|
||||
|
||||
work_good = [x for x,good in results if good]
|
||||
no_work_good = [x for x,good in results if not good]
|
||||
|
||||
logging.info(f"work good: [{work_good}]")
|
||||
logging.info(f"no work good: [{no_work_good}]")
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
59
scripts/initial_point_map_10.py
Normal file
59
scripts/initial_point_map_10.py
Normal file
@ -0,0 +1,59 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def try_initial_position(model, expected_result, initial_pos):
|
||||
res = model.sol(initial_position=initial_pos)
|
||||
return (initial_pos, all(numpy.isclose(res.pathfinder_x, expected_result)))
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
dot_inputs = list(itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .5) for o in (0, 0.01)
|
||||
))
|
||||
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 0, -1], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
logging.info("Finished setting up model")
|
||||
|
||||
work_good = []
|
||||
no_work_good = []
|
||||
rb = -2
|
||||
ru = 3
|
||||
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in range(rb, ru, 2) for dy in range(rb, ru, 2) for dz in range(rb, ru, 2)]
|
||||
logging.info(f"Will have {len(points_to_try)} points to try")
|
||||
logging.info("creating pool...")
|
||||
with multiprocessing.Pool() as pool:
|
||||
results = pool.starmap(try_initial_position, points_to_try)
|
||||
|
||||
work_good = [x for x,good in results if good]
|
||||
no_work_good = [x for x,good in results if not good]
|
||||
|
||||
logging.info(f"work good: [{work_good}]")
|
||||
logging.info(f"no work good: [{no_work_good}]")
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
45
scripts/random_test1-noise-plot.py
Normal file
45
scripts/random_test1-noise-plot.py
Normal file
@ -0,0 +1,45 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def try_initial_position(model, expected_result, initial_pos):
|
||||
res = model.sol_basinhopping(initial_position=initial_pos)
|
||||
logging.info(res)
|
||||
return res.pathfinder_x
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
dot_inputs = list(
|
||||
(([.5, -1, 0], numpy.power(10, f))) for f in numpy.arange(1, 3, .1)
|
||||
)
|
||||
logging.info(dot_inputs)
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [.51, -1.2, 0], 8)
|
||||
expected_result = numpy.array([1, 2, 3, .51, -1.2, 0, 8])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs)
|
||||
|
||||
logging.info([(m.f, m.v) for m in dot_measurements])
|
||||
logging.info([(m.f, m.v) for m in dot_measurements_with_error])
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
60
scripts/random_test1.py
Normal file
60
scripts/random_test1.py
Normal file
@ -0,0 +1,60 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def try_initial_position(model, expected_result, initial_pos):
|
||||
res = model.sol_basinhopping(initial_position=initial_pos)
|
||||
logging.info(res)
|
||||
return res.pathfinder_x
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
dot_inputs = list(itertools.chain.from_iterable(
|
||||
(([-.8, -5.1, 0], f), ([.5, -1, 0], f), ([.3, 5, 0], f), ([7.3, 6.5, 0], f)) for f in numpy.arange(1, 10, 2)
|
||||
))
|
||||
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [.51, -1.2, 0], 8)
|
||||
expected_result = numpy.array([1, 2, 3, .51, -1.2, 0, 8])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs)
|
||||
|
||||
# model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1)
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
logging.info("Finished setting up model")
|
||||
|
||||
results = []
|
||||
rb = -2
|
||||
ru = 3
|
||||
interval = 3
|
||||
points_to_try = [(model, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)]
|
||||
logging.info(f"Will have {len(points_to_try)} points to try")
|
||||
logging.info("creating pool...")
|
||||
with multiprocessing.Pool() as pool:
|
||||
results = pool.starmap(try_initial_position, points_to_try)
|
||||
logging.info(results)
|
||||
final_values = [r for r in results if r is not None]
|
||||
logging.info(final_values)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
75
scripts/random_test2.py
Normal file
75
scripts/random_test2.py
Normal file
@ -0,0 +1,75 @@
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
logging.info(msg)
|
||||
logging.info(f"\tResult: {result.pathfinder_x}")
|
||||
logging.info(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
logging.info(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
logging.info(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def try_initial_position(model, expected_result, initial_pos):
|
||||
res = model.sol_basinhopping(initial_position=initial_pos)
|
||||
logging.info(res)
|
||||
return res.pathfinder_x
|
||||
|
||||
|
||||
def main():
|
||||
logging.info("Running script...")
|
||||
dot_inputs = list(itertools.chain.from_iterable(
|
||||
(([-.8, -5.1, 0], f), ([.5, -1, 0], f), ([.3, 5, 0], f), ([7.3, 6.5, 0], f)) for f in numpy.arange(1, 10, 2)
|
||||
))
|
||||
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [7.4, 6.3, 0.02], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 7.4, 6.3, 0.02, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
dot_measurements_with_error = dipole_arrangement.get_dot_measurements_with_random(dot_inputs)
|
||||
|
||||
# model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1)
|
||||
model1 = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
model2 = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements_with_error, 1)
|
||||
logging.info("Finished setting up model")
|
||||
|
||||
results = []
|
||||
rb = -2
|
||||
ru = 3
|
||||
interval = 3
|
||||
points_to_try = [(model1, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)]
|
||||
logging.info(f"Will have {len(points_to_try)} points to try")
|
||||
logging.info("creating pool...")
|
||||
with multiprocessing.Pool() as pool:
|
||||
results = pool.starmap(try_initial_position, points_to_try)
|
||||
#logging.info(results)
|
||||
final_values = [r for r in results if r is not None]
|
||||
logging.info("Results without errors")
|
||||
logging.info(final_values)
|
||||
|
||||
results2 = []
|
||||
points_to_try = [(model2, expected_result, (0.01 + dx, 0.01 + dy, 0.01 + dz)) for dx in numpy.arange(rb, ru, interval) for dy in numpy.arange(rb, ru, interval) for dz in range(rb, ru, interval)]
|
||||
logging.info(f"Will have {len(points_to_try)} points to try")
|
||||
logging.info("creating pool...")
|
||||
with multiprocessing.Pool() as pool:
|
||||
results2 = pool.starmap(try_initial_position, points_to_try)
|
||||
#logging.info(results)
|
||||
final_values2 = [r for r in results2 if r is not None]
|
||||
logging.info("Results without errors")
|
||||
logging.info(final_values)
|
||||
logging.info("Results with errors")
|
||||
logging.info(final_values2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
main()
|
@ -1,5 +1,7 @@
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
import itertools
|
||||
import pytest
|
||||
|
||||
|
||||
def chunk_n_sort(pts):
|
||||
@ -107,3 +109,83 @@ def test_two_dipole_eighteen_dot_two_frequencies_morerealistic():
|
||||
print_result("two oscillating dipole six dots", res)
|
||||
assert res.success, "The solution for two dipole and six dots should have succeeded."
|
||||
numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6)
|
||||
|
||||
|
||||
def test_one_dipole_four_dot_ten_frequencies():
|
||||
# setup
|
||||
dot_inputs = itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(1, 10, .1) for o in (0, 0.01)
|
||||
)
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
|
||||
expected_result = numpy.array([1, 2, 3, 0, 4, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 1)
|
||||
res = model.sol(initial_position=(.1, .1, .1))
|
||||
|
||||
print_result("one oscillating dipole four dots", res)
|
||||
print(model.jac()(res.x))
|
||||
assert res.success, "The solution for one dipole and four dots should have succeeded."
|
||||
numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6)
|
||||
|
||||
|
||||
def test_two_dipole_four_dot_ten_frequencies():
|
||||
# setup
|
||||
dot_inputs = itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in range(1, 10) for o in (0, .5)
|
||||
)
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
|
||||
dipole2 = pathfinder.model.oscillating.OscillatingDipole([-1, 2, 0], [-1, 2, 1], 4)
|
||||
expected_result = numpy.array([1, -2, 0, -1, 2, 1, 4, 1, 2, 3, 0, 4, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole, dipole2])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 2)
|
||||
res = model.sol()
|
||||
|
||||
print_result("two oscillating dipole four dots", res)
|
||||
assert res.success, "The solution for two dipole and two dots should have succeeded."
|
||||
numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6)
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Never actually works.")
|
||||
def test_three_dipole_four_dot_ten_frequencies_wrongcount():
|
||||
# setup
|
||||
dot_inputs = itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in range(1, 10) for o in (0, .5)
|
||||
)
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
|
||||
dipole2 = pathfinder.model.oscillating.OscillatingDipole([2, 5, 0], [1, 6, 1], 2)
|
||||
dipole3 = pathfinder.model.oscillating.OscillatingDipole([-1, 2, 0], [-1, 2, 1], 4)
|
||||
expected_result = numpy.array([2, 5, 0, 1, 6, 1, 2, 1, -2, 0, -1, 2, 1, 4, 1, 2, 3, 0, 4, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole, dipole2, dipole3])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 2)
|
||||
res = model.sol()
|
||||
|
||||
print_result("three but thinks two oscillating dipole four dots", res)
|
||||
# assert res.success, "The solution for two dipole and two dots should have succeeded."
|
||||
numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6)
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Never actually works.")
|
||||
def test_three_dipole_four_dot_twenty_frequencies_rightcount():
|
||||
# setup
|
||||
dot_inputs = itertools.chain.from_iterable(
|
||||
(([0 + o, 0, .01], f), ([0 + o, -1, 0], f), ([-1 + o, 0, -.01], f), ([-1 + o, -1, .01], f)) for f in numpy.arange(0.2, 10, .2) for o in (0, .5)
|
||||
)
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
|
||||
dipole2 = pathfinder.model.oscillating.OscillatingDipole([2, 5, 0], [1, 6, 1], 2)
|
||||
dipole3 = pathfinder.model.oscillating.OscillatingDipole([-1, 2, 0], [-1, 2, 1], 4)
|
||||
expected_result = numpy.array([2, 5, 0, 1, 6, 1, 2, 1, -2, 0, -1, 2, 1, 4, 1, 2, 3, 0, 4, -1, 7])
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole, dipole2, dipole3])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 3)
|
||||
res = model.sol(initial_position=(.1, 3, .1))
|
||||
|
||||
print_result("three oscillating dipole four dots", res)
|
||||
# assert res.success, "The solution for two dipole and two dots should have succeeded."
|
||||
numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6)
|
||||
|
43
tests/model/oscillating/test_simpledot.py
Normal file
43
tests/model/oscillating/test_simpledot.py
Normal file
@ -0,0 +1,43 @@
|
||||
import numpy
|
||||
import pathfinder.model.oscillating
|
||||
|
||||
|
||||
def chunk_n_sort(pts):
|
||||
pt_length = 7
|
||||
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
|
||||
|
||||
return chunked_pts
|
||||
|
||||
|
||||
def print_result(msg, result):
|
||||
print(msg)
|
||||
print(f"\tResult: {result.pathfinder_x}")
|
||||
print(f"\tSuccess: {result.success}. {result.message}")
|
||||
try:
|
||||
print(f"\tFunc evals: {result.nfev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
print(f"\tJacb evals: {result.njev}")
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def test_simple_three_dipole_one_dot_frequencies():
|
||||
# setup
|
||||
dot_inputs = [
|
||||
([0, 0, .01], f) for f in numpy.arange(1, 10, .5)
|
||||
]
|
||||
dipole = pathfinder.model.oscillating.OscillatingDipole([1, 2, 3], [0, 4, -1], 7)
|
||||
dipole2 = pathfinder.model.oscillating.OscillatingDipole([2, 5, 0], [1, 6, 1], 2)
|
||||
dipole3 = pathfinder.model.oscillating.OscillatingDipole([-1, 2, 0], [-1, 2, 1], 4)
|
||||
|
||||
dipole_arrangement = pathfinder.model.oscillating.OscillatingDipoleArrangement([dipole, dipole2, dipole3])
|
||||
dot_measurements = dipole_arrangement.get_dot_measurements(dot_inputs)
|
||||
|
||||
model = pathfinder.model.oscillating.DotOscillatingDipoleModel(dot_measurements, 3)
|
||||
res = model.simple_sol()
|
||||
|
||||
print_result("six oscillating dipole one dot", res)
|
||||
assert res.success, "The solution for a single dipole and six dots should have succeeded."
|
||||
# numpy.testing.assert_allclose(res.pathfinder_x, expected_result, err_msg="Dipole wasn't as expected.", rtol=1e-6, atol=1e-6)
|
Reference in New Issue
Block a user