diff --git a/tantri/binning/__init__.py b/tantri/binning/__init__.py new file mode 100644 index 0000000..a0d6ee3 --- /dev/null +++ b/tantri/binning/__init__.py @@ -0,0 +1,3 @@ +""" +Binning data. +""" diff --git a/tantri/binning/binning.py b/tantri/binning/binning.py new file mode 100644 index 0000000..f9a894f --- /dev/null +++ b/tantri/binning/binning.py @@ -0,0 +1,125 @@ +import typing +import numpy +import logging +from dataclasses import dataclass + + +_logger = logging.getLogger(__name__) + + +@dataclass +class BinConfig: + log_scale: bool # true means that our bins of the x coordinate will be in + # if linear scale (not log_scale) then the semantics are + # min_x, min_x + bin_width, .... min_x + A * bin_width, max_x (and the last bin may not be evenly spaced) + # if log_scale then log(min_x), log(min_x) + bin_width, log(min_x) + 2 bin_width etc. + # (so essentially the units of bin_width depend on log_scale) + bin_width: float + # never log, will be logarithmed if needed + bin_min: typing.Optional[float] = None + + # note that min_points_required must be >= 2 + min_points_required: int = 2 + + def __post_init__(self): + if self.min_points_required < 2: + raise ValueError( + f"Can't compute summary statistics with bins of size < 2, so {self.min_points_required} is invalid" + ) + + +@dataclass +class BinSummaryValue: + mean_y: float + stdev_y: float + + +def _summarise_values(ys: numpy.ndarray) -> BinSummaryValue: + mean_y = ys.mean(axis=0).item() + stdev_y = ys.std(axis=0, ddof=1).item() + return BinSummaryValue(mean_y, stdev_y) + + +@dataclass +class BinSummary: + mean_x: float + summary_values: typing.Dict[str, BinSummaryValue] + + +@dataclass +class Bin: + bindex: int # this is going to be very specific to a particular binning but hey let's include it + x_min: float + # points is a tuple of (freqs, value_dicts: Dict[str, numpy.ndarray]) + # this conforms well to APSD result + point_xs: numpy.ndarray + point_y_dict: typing.Dict[str, numpy.ndarray] + + def mean_point(self) -> typing.Tuple[float, typing.Dict[str, float]]: + mean_x = self.point_xs.mean(axis=0).item() + mean_y_dict = {k: v.mean(axis=0).item() for k, v in self.point_y_dict.items()} + return (mean_x, mean_y_dict) + + def summary_point(self) -> BinSummary: + mean_x = self.point_xs.mean(axis=0).item() + summary_dict = {k: _summarise_values(v) for k, v in self.point_y_dict.items()} + return BinSummary(mean_x, summary_dict) + + def stdev_ys(self) -> typing.Dict[str, float]: + return {k: v.std(axis=0, ddof=1).item() for k, v in self.point_y_dict.items()} + + +def _construct_bins(xs: numpy.ndarray, bin_config: BinConfig) -> numpy.ndarray: + min_x = numpy.min(xs) + + # if the bin config requested bin_min is None, then we can ignore it. + + if bin_config.bin_min is not None: + _logger.debug(f"Received a desired bin_min={bin_config.bin_min}") + if bin_config.bin_min > min_x: + raise ValueError( + f"The lowest x value of {xs=} was {min_x=}, which is lower than the requested bin_min={bin_config.bin_min}" + ) + else: + _logger.debug(f"Setting minimum to {bin_config.bin_min}") + min_x = bin_config.bin_min + + max_x = numpy.max(xs) + num_points = numpy.ceil(1 + (max_x - min_x) / bin_config.bin_width) + return min_x + (numpy.arange(0, num_points) * bin_config.bin_width) + + +def _populate_bins( + xs: numpy.ndarray, ys: typing.Dict[str, numpy.ndarray], bins: numpy.ndarray +) -> typing.Sequence[Bin]: + indexes = numpy.digitize(xs, bins) - 1 + output_bins = [] + + seen = set() + + for bindex in indexes: + if bindex not in seen: + seen.add(bindex) + + matched_x = xs[indexes == bindex] + matched_output_dict = {k: v[indexes == bindex] for k, v in ys.items()} + output_bins.append( + Bin( + bindex, + x_min=bins[bindex].item(), + point_xs=matched_x, + point_y_dict=matched_output_dict, + ) + ) + + return output_bins + + +def bin_lists( + xs: numpy.ndarray, ys: typing.Dict[str, numpy.ndarray], bin_config: BinConfig +) -> typing.Sequence[Bin]: + bins = _construct_bins(xs, bin_config) + raw_bins = _populate_bins(xs, ys, bins) + return [ + bin for bin in raw_bins if len(bin.point_xs) >= bin_config.min_points_required + ] diff --git a/tests/binning/__snapshots__/test_binning.ambr b/tests/binning/__snapshots__/test_binning.ambr new file mode 100644 index 0000000..83bc876 --- /dev/null +++ b/tests/binning/__snapshots__/test_binning.ambr @@ -0,0 +1,45 @@ +# serializer version: 1 +# name: test_group_x_bins + list([ + Bin(bindex=0, x_min=1.0, point_xs=array([1. , 2.8, 8. ]), point_y_dict={'identity_plus_one': array([ 3. , 4.8, 10. ])}), + Bin(bindex=1, x_min=9.0, point_xs=array([12.2, 13.6]), point_y_dict={'identity_plus_one': array([14.2, 15.6])}), + Bin(bindex=2, x_min=17.0, point_xs=array([17. , 19.71, 20. , 24. ]), point_y_dict={'identity_plus_one': array([19. , 21.71, 22. , 26. ])}), + Bin(bindex=4, x_min=33.0, point_xs=array([33.]), point_y_dict={'identity_plus_one': array([35.])}), + ]) +# --- +# name: test_group_x_bins_mean + list([ + tuple( + 3.9333333333333336, + dict({ + 'identity_plus_one': 5.933333333333334, + }), + ), + tuple( + 12.899999999999999, + dict({ + 'identity_plus_one': 14.899999999999999, + }), + ), + tuple( + 20.177500000000002, + dict({ + 'identity_plus_one': 22.177500000000002, + }), + ), + tuple( + 33.0, + dict({ + 'identity_plus_one': 35.0, + }), + ), + ]) +# --- +# name: test_group_x_bins_summary + list([ + BinSummary(mean_x=3.9333333333333336, summary_values={'identity_plus_one': BinSummaryValue(mean_y=5.933333333333334, stdev_y=3.635014901390823)}), + BinSummary(mean_x=12.899999999999999, summary_values={'identity_plus_one': BinSummaryValue(mean_y=14.899999999999999, stdev_y=0.9899494936611668)}), + BinSummary(mean_x=20.177500000000002, summary_values={'identity_plus_one': BinSummaryValue(mean_y=22.177500000000002, stdev_y=2.884329789280923)}), + BinSummary(mean_x=33.0, summary_values={'identity_plus_one': BinSummaryValue(mean_y=35.0, stdev_y=nan)}), + ]) +# --- diff --git a/tests/binning/test_binning.py b/tests/binning/test_binning.py new file mode 100644 index 0000000..0981734 --- /dev/null +++ b/tests/binning/test_binning.py @@ -0,0 +1,111 @@ +import pytest +import tantri.binning.binning as binning +import numpy + + +def test_bin_construction_faulty_min(): + x_list = numpy.array([5, 6, 7, 8]) + + bin_config = binning.BinConfig(log_scale=False, bin_width=0.8, bin_min=5.5) + + with pytest.raises(ValueError): + binning._construct_bins(x_list, bin_config) + + +def test_bin_construction_force_min(): + x_list = numpy.array([4.5, 5.5, 6.5, 7.5, 8.5]) + + bin_config = binning.BinConfig(log_scale=False, bin_width=1, bin_min=2) + + expected_bins = numpy.array([2, 3, 4, 5, 6, 7, 8, 9]) + + actual_bins = binning._construct_bins(x_list, bin_config=bin_config) + numpy.testing.assert_allclose( + actual_bins, expected_bins, err_msg="The bins were not as expected" + ) + + +def test_bin_construction_even(): + x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + + bin_config = binning.BinConfig(log_scale=False, bin_width=8) + expected_bins = numpy.array([1, 9, 17, 25, 33]) + + actual_bins = binning._construct_bins(x_list, bin_config=bin_config) + numpy.testing.assert_allclose( + actual_bins, expected_bins, err_msg="The bins were not as expected" + ) + + +def test_bin_construction_uneven(): + x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + + bin_config = binning.BinConfig(log_scale=False, bin_width=7) + expected_bins = numpy.array([1, 8, 15, 22, 29, 36]) + + actual_bins = binning._construct_bins(x_list, bin_config=bin_config) + numpy.testing.assert_allclose( + actual_bins, expected_bins, err_msg="The bins were not as expected" + ) + + +def test_bin_construction_uneven_non_integer(): + x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + + bin_config = binning.BinConfig(log_scale=False, bin_width=7.5) + expected_bins = numpy.array([1, 8.5, 16, 23.5, 31, 38.5]) + + actual_bins = binning._construct_bins(x_list, bin_config=bin_config) + numpy.testing.assert_allclose( + actual_bins, expected_bins, err_msg="The bins were not as expected" + ) + + +def test_group_x_bins(snapshot): + x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + y_dict = { + "identity_plus_one": ( + numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + 2 + ) + } + + bin_config = binning.BinConfig(log_scale=False, bin_width=8) + # expected_bins = numpy.array([1, 9, 17, 25, 33]) + + binned = binning.bin_lists(x_list, y_dict, bin_config) + + assert binned == snapshot + + +def test_group_x_bins_mean(snapshot): + x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + y_dict = { + "identity_plus_one": ( + numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + 2 + ) + } + + bin_config = binning.BinConfig(log_scale=False, bin_width=8) + # expected_bins = numpy.array([1, 9, 17, 25, 33]) + + binned = binning.bin_lists(x_list, y_dict, bin_config) + mean_binned = [bin.mean_point() for bin in binned] + + assert mean_binned == snapshot + + +def test_group_x_bins_summary(snapshot): + x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + y_dict = { + "identity_plus_one": ( + numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + 2 + ) + } + + bin_config = binning.BinConfig(log_scale=False, bin_width=8) + # expected_bins = numpy.array([1, 9, 17, 25, 33]) + + binned = binning.bin_lists(x_list, y_dict, bin_config) + summary = [bin.summary_point() for bin in binned] + + assert summary == snapshot