Adds test for solver

This commit is contained in:
2022-01-02 17:25:44 -06:00
parent 8fee0a27d2
commit ef69660c35
6 changed files with 108 additions and 76 deletions

View File

@@ -1,74 +1,4 @@
import numpy
from typing import Callable, Sequence
from pdme.measurement import DotMeasurement
import logging
from pdme.model.model import Model
from pdme.model.fixed_z_plane_model import FixedZPlaneModel
class Model():
"""
Interface for models.
"""
def point_length(self) -> int:
raise NotImplementedError
def n(self) -> int:
raise NotImplementedError
def v_for_point_at_dot(self, dot: DotMeasurement, pt: numpy.ndarray) -> float:
raise NotImplementedError
def cost_for_dot(self, dot: DotMeasurement, pts: numpy.ndarray) -> float:
# creates numpy.ndarrays in groups of self.point_length().
# Will throw problems for irregular points, but that's okay for now.
pt_length = self.point_length()
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
return sum(self.v_for_point_at_dot(dot, pt) for pt in chunked_pts) - dot.v
def costs(self, dots: Sequence[DotMeasurement]) -> Callable[[numpy.ndarray], numpy.ndarray]:
'''
Returns a function that returns the cost for the given list of DotMeasurements for a particular model-dependent phase space point.
Default implementation assumes a single dot cost from which to build the list.
Parameters
----------
dots: A list of dot measurements to use to find the cost functions.
Returns
----------
Returns the model's cost function.
'''
logging.debug(f"Constructing costs for dots: {dots}")
def costs_to_return(pts: numpy.ndarray) -> numpy.ndarray:
return numpy.array([self.cost_for_dot(dot, pts) for dot in dots])
return costs_to_return
def jac_for_point_at_dot(self, dot: DotMeasurement, pt: numpy.ndarray) -> numpy.ndarray:
raise NotImplementedError
def jac_for_dot(self, dot: DotMeasurement, pts: numpy.ndarray) -> numpy.ndarray:
# creates numpy.ndarrays in groups of self.point_length().
# Will throw problems for irregular points, but that's okay for now.
pt_length = self.point_length()
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
return numpy.append([], [self.jac_for_point_at_dot(dot, pt) for pt in chunked_pts])
def jac(self, dots: Sequence[DotMeasurement]) -> Callable[[numpy.ndarray], numpy.ndarray]:
'''
Returns a function that returns the cost function's Jacobian for the given list of DotMeasurements for a particular model-dependent phase space point.
Default implementation assumes a single dot jacobian from which to build the list.
Parameters
----------
dots: A list of dot measurements to use to find the cost functions and their Jacobian.
Returns
----------
Returns the model's cost function's Jacobian.
'''
def jac_to_return(pts: numpy.ndarray) -> numpy.ndarray:
return numpy.array([self.jac_for_dot(dot, pts) for dot in dots])
return jac_to_return
__all__ = ["Model", "FixedZPlaneModel"]

View File

@@ -1,5 +1,5 @@
import numpy
from pdme.model import Model
from pdme.model.model import Model
from pdme.measurement import DotMeasurement

74
pdme/model/model.py Normal file
View File

@@ -0,0 +1,74 @@
import numpy
from typing import Callable, Sequence
from pdme.measurement import DotMeasurement
import logging
class Model():
"""
Interface for models.
"""
def point_length(self) -> int:
raise NotImplementedError
def n(self) -> int:
raise NotImplementedError
def v_for_point_at_dot(self, dot: DotMeasurement, pt: numpy.ndarray) -> float:
raise NotImplementedError
def cost_for_dot(self, dot: DotMeasurement, pts: numpy.ndarray) -> float:
# creates numpy.ndarrays in groups of self.point_length().
# Will throw problems for irregular points, but that's okay for now.
pt_length = self.point_length()
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
return sum(self.v_for_point_at_dot(dot, pt) for pt in chunked_pts) - dot.v
def costs(self, dots: Sequence[DotMeasurement]) -> Callable[[numpy.ndarray], numpy.ndarray]:
'''
Returns a function that returns the cost for the given list of DotMeasurements for a particular model-dependent phase space point.
Default implementation assumes a single dot cost from which to build the list.
Parameters
----------
dots: A list of dot measurements to use to find the cost functions.
Returns
----------
Returns the model's cost function.
'''
logging.debug(f"Constructing costs for dots: {dots}")
def costs_to_return(pts: numpy.ndarray) -> numpy.ndarray:
return numpy.array([self.cost_for_dot(dot, pts) for dot in dots])
return costs_to_return
def jac_for_point_at_dot(self, dot: DotMeasurement, pt: numpy.ndarray) -> numpy.ndarray:
raise NotImplementedError
def jac_for_dot(self, dot: DotMeasurement, pts: numpy.ndarray) -> numpy.ndarray:
# creates numpy.ndarrays in groups of self.point_length().
# Will throw problems for irregular points, but that's okay for now.
pt_length = self.point_length()
chunked_pts = [pts[i: i + pt_length] for i in range(0, len(pts), pt_length)]
return numpy.append([], [self.jac_for_point_at_dot(dot, pt) for pt in chunked_pts])
def jac(self, dots: Sequence[DotMeasurement]) -> Callable[[numpy.ndarray], numpy.ndarray]:
'''
Returns a function that returns the cost function's Jacobian for the given list of DotMeasurements for a particular model-dependent phase space point.
Default implementation assumes a single dot jacobian from which to build the list.
Parameters
----------
dots: A list of dot measurements to use to find the cost functions and their Jacobian.
Returns
----------
Returns the model's cost function's Jacobian.
'''
def jac_to_return(pts: numpy.ndarray) -> numpy.ndarray:
return numpy.array([self.jac_for_dot(dot, pts) for dot in dots])
return jac_to_return

3
pdme/solver/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from pdme.solver.solver import sol
__all__ = ["sol"]

View File

@@ -5,7 +5,7 @@ from pdme.measurement import DotMeasurement
from typing import Sequence
def sol(model: Model, dots: Sequence[DotMeasurement], initial_pt=None):
def sol(model: Model, dots: Sequence[DotMeasurement], initial_pt=None, bounds=(-numpy.inf, numpy.inf)):
if initial_pt is None:
initial = numpy.tile(.1, model.n() * model.point_length())
else:
@@ -13,5 +13,5 @@ def sol(model: Model, dots: Sequence[DotMeasurement], initial_pt=None):
raise ValueError(f"The initial point {initial_pt} does not have the model's expected length: {model.point_length()}")
initial = numpy.tile(initial_pt, model.n())
result = scipy.optimize.least_squares(model.costs(dots), initial, jac=model.jac(dots), ftol=1e-15, gtol=3e-16)
result = scipy.optimize.least_squares(model.costs(dots), initial, jac=model.jac(dots), ftol=1e-15, gtol=3e-16, bounds=bounds)
return result