binning #1

Merged
deepak merged 7 commits from binning into master 2024-09-04 19:26:14 +00:00
5 changed files with 730 additions and 0 deletions

View File

@ -0,0 +1,7 @@
"""
Binning data.
"""
from tantri.binning.binning import bin_lists, BinConfig, BinSummary, BinSummaryValue
__all__ = ["bin_lists", "BinConfig", "BinSummary", "BinSummaryValue"]

135
tantri/binning/binning.py Normal file
View File

@ -0,0 +1,135 @@
import typing
import numpy
import logging
from dataclasses import dataclass
_logger = logging.getLogger(__name__)
@dataclass
class BinConfig:
log_scale: bool # true means that our bins of the x coordinate will be in
# if linear scale (not log_scale) then the semantics are
# min_x, min_x + bin_width, .... min_x + A * bin_width, max_x (and the last bin may not be evenly spaced)
# if log_scale then log(min_x), log(min_x) + bin_width, log(min_x) + 2 bin_width etc.
# (so essentially the units of bin_width depend on log_scale)
bin_width: float
# never log, will be logarithmed if needed
bin_min: typing.Optional[float] = None
# note that min_points_required must be >= 2
min_points_required: int = 2
def __post_init__(self):
if self.min_points_required < 2:
raise ValueError(
f"Can't compute summary statistics with bins of size < 2, so {self.min_points_required} is invalid"
)
@dataclass
class BinSummaryValue:
mean_y: float
stdev_y: float
def _summarise_values(ys: numpy.ndarray) -> BinSummaryValue:
mean_y = ys.mean(axis=0).item()
stdev_y = ys.std(axis=0, ddof=1).item()
return BinSummaryValue(mean_y, stdev_y)
@dataclass
class BinSummary:
mean_x: float
summary_values: typing.Dict[str, BinSummaryValue]
@dataclass
class Bin:
bindex: int # this is going to be very specific to a particular binning but hey let's include it
x_min: float
# points is a tuple of (freqs, value_dicts: Dict[str, numpy.ndarray])
# this conforms well to APSD result
point_xs: numpy.ndarray
point_y_dict: typing.Dict[str, numpy.ndarray]
def mean_point(self) -> typing.Tuple[float, typing.Dict[str, float]]:
mean_x = self.point_xs.mean(axis=0).item()
mean_y_dict = {k: v.mean(axis=0).item() for k, v in self.point_y_dict.items()}
return (mean_x, mean_y_dict)
def summary_point(self) -> BinSummary:
mean_x = self.point_xs.mean(axis=0).item()
summary_dict = {k: _summarise_values(v) for k, v in self.point_y_dict.items()}
return BinSummary(mean_x, summary_dict)
def _construct_bins(xs: numpy.ndarray, bin_config: BinConfig) -> numpy.ndarray:
min_x_raw = numpy.min(xs)
# if the bin config requested bin_min is None, then we can ignore it.
if bin_config.bin_min is not None:
_logger.debug(f"Received a desired bin_min={bin_config.bin_min}")
if bin_config.bin_min > min_x_raw:
raise ValueError(
f"The lowest x value of {xs=} was {min_x_raw=}, which is lower than the requested bin_min={bin_config.bin_min}"
)
else:
_logger.debug(f"Setting minimum to {bin_config.bin_min}")
min_x_raw = bin_config.bin_min
max_x_raw = numpy.max(xs)
if bin_config.log_scale:
min_x = numpy.log10(min_x_raw)
max_x = numpy.log10(max_x_raw)
else:
min_x = min_x_raw
max_x = max_x_raw
num_points = numpy.ceil(1 + (max_x - min_x) / bin_config.bin_width)
bins = min_x + (numpy.arange(0, num_points) * bin_config.bin_width)
if bin_config.log_scale:
return 10**bins
else:
return bins
def _populate_bins(
xs: numpy.ndarray, ys: typing.Dict[str, numpy.ndarray], bins: numpy.ndarray
) -> typing.Sequence[Bin]:
indexes = numpy.digitize(xs, bins) - 1
output_bins = []
seen = set()
for bindex in indexes:
if bindex not in seen:
seen.add(bindex)
matched_x = xs[indexes == bindex]
matched_output_dict = {k: v[indexes == bindex] for k, v in ys.items()}
output_bins.append(
Bin(
bindex,
x_min=bins[bindex].item(),
point_xs=matched_x,
point_y_dict=matched_output_dict,
)
)
return output_bins
def bin_lists(
xs: numpy.ndarray, ys: typing.Dict[str, numpy.ndarray], bin_config: BinConfig
) -> typing.Sequence[Bin]:
bins = _construct_bins(xs, bin_config)
raw_bins = _populate_bins(xs, ys, bins)
return [
bin for bin in raw_bins if len(bin.point_xs) >= bin_config.min_points_required
]

View File

@ -4,6 +4,7 @@ import tantri
import numpy import numpy
import tantri.cli.input_files.write_dipoles import tantri.cli.input_files.write_dipoles
import tantri.cli.file_importer import tantri.cli.file_importer
import tantri.binning
import json import json
import tantri.dipoles import tantri.dipoles
import tantri.dipoles.event_time_series import tantri.dipoles.event_time_series
@ -201,6 +202,193 @@ def _write_time_series(
out.write(f"{i * delta_t}, {transition_values}\n") out.write(f"{i * delta_t}, {transition_values}\n")
@cli.command()
@click.option(
"--dipoles-file",
default="dipoles.json",
show_default=True,
type=click.Path(exists=True, path_type=pathlib.Path),
help="File with json array of dipoles",
)
@click.option(
"--dots-file",
default="dots.json",
show_default=True,
type=click.Path(exists=True, path_type=pathlib.Path),
help="File with json array of dots",
)
@click.option(
"--measurement-type",
type=click.Choice([POTENTIAL, X_ELECTRIC_FIELD]),
default=POTENTIAL,
help="The type of measurement to simulate",
show_default=True,
)
@click.option(
"--delta-t",
"-t",
type=float,
default=1,
help="The delta t between time series iterations.",
show_default=True,
)
@click.option(
"--num-iterations",
# Note we're keeping this name to match write-time-series
"-n",
type=int,
default=10,
help="The number of time steps per time series, total time is num_iterations * delta_t.",
show_default=True,
)
@click.option(
"--num-time-series",
type=int,
default=20,
help="The number of simulated time series, which will be averaged over",
show_default=True,
)
@click.option(
"--time-series-rng-seed",
"-s",
type=int,
default=None,
help="A seed to use to create an override default rng. You should set this.",
)
@click.option(
"--output-file",
"-o",
type=click.Path(path_type=pathlib.Path),
help="The output file to write, in csv format",
required=True,
)
@click.option(
"--binned-output-file",
"-b",
type=click.Path(path_type=pathlib.Path),
help="Optional binned output file",
)
@click.option(
"--bin-widths",
type=float,
default=1,
show_default=True,
help="The default log(!) bin width, 1 means widths of a decade",
)
@click.option("--header-row/--no-header-row", default=False, help="Write a header row")
def write_apsd(
dipoles_file,
dots_file,
measurement_type,
delta_t,
num_iterations,
num_time_series,
time_series_rng_seed,
output_file,
binned_output_file,
bin_widths,
header_row,
):
"""
Generate an APSD for the passed in parameters, averaging over multiple (num_time_series) iterations.
"""
_write_apsd(
dipoles_file,
dots_file,
measurement_type,
delta_t,
num_iterations,
num_time_series,
time_series_rng_seed,
output_file,
binned_output_file,
bin_widths,
header_row,
)
def _write_apsd(
dipoles_file,
dots_file,
measurement_type,
delta_t,
num_iterations,
num_time_series,
time_series_rng_seed,
output_file,
binned_output_file,
bin_widths,
header_row,
):
_logger.debug(
f"Received parameters [dipoles_file: {dipoles_file}] and [dots_file: {dots_file}]"
)
dipoles = tantri.cli.file_importer.read_dipoles_json_file(dipoles_file)
dots = tantri.cli.file_importer.read_dots_json_file(dots_file)
if measurement_type == POTENTIAL:
measurement_enum = tantri.dipoles.DipoleMeasurementType.ELECTRIC_POTENTIAL
value_name = "APSD_V"
elif measurement_type == X_ELECTRIC_FIELD:
measurement_enum = tantri.dipoles.DipoleMeasurementType.X_ELECTRIC_FIELD
value_name = "APSD_Ex"
_logger.debug(f"Using measurement {measurement_enum.name}")
labels = [dot.label for dot in dots]
with output_file.open("w") as out:
if header_row:
value_labels = ", ".join([f"{value_name}_{label}" for label in labels])
out.write(f"f (Hz), {value_labels}\n")
_logger.debug(
f"Going to simulate {num_iterations} iterations with a delta t of {delta_t}"
)
_logger.debug(f"Got seed {time_series_rng_seed}")
if time_series_rng_seed is None:
time_series = tantri.dipoles.DipoleTimeSeries(
dipoles, dots, measurement_enum, delta_t
)
else:
rng = numpy.random.default_rng(time_series_rng_seed)
time_series = tantri.dipoles.DipoleTimeSeries(
dipoles, dots, measurement_enum, delta_t, rng
)
apsd = time_series.generate_average_apsd(
num_series=num_time_series, num_time_series_points=num_iterations
)
values_list = zip(*[apsd.psd_dict[label] for label in labels])
for freq, values in zip(apsd.freqs, values_list):
value_string = ", ".join(str(v) for v in values)
out.write(f"{freq}, {value_string}\n")
if binned_output_file is not None:
with binned_output_file.open("w") as out:
if header_row:
value_labels = ["mean bin f (Hz)"]
for label in labels:
value_labels.append(f"{value_name}_{label}_mean")
value_labels.append(f"{value_name}_{label}_stdev")
value_labels_text = ", ".join(value_labels)
out.write(value_labels_text + "\n")
binned = tantri.binning.bin_lists(
apsd.freqs,
apsd.psd_dict,
tantri.binning.BinConfig(
True, bin_width=bin_widths, bin_min=1e-6, min_points_required=2
),
)
for bin_result in binned:
summary = bin_result.summary_point()
out_list = [str(summary.mean_x)]
for label in labels:
out_list.append(str(summary.summary_values[label].mean_y))
out_list.append(str(summary.summary_values[label].stdev_y))
out_string = ", ".join(out_list) + "\n"
out.write(out_string)
@cli.command() @cli.command()
@click.argument( @click.argument(
"generation_config", "generation_config",

View File

@ -0,0 +1,86 @@
# serializer version: 1
# name: test_group_x_bins
list([
Bin(bindex=0, x_min=1.0, point_xs=array([1. , 2.8, 8. ]), point_y_dict={'identity_plus_one': array([ 3. , 4.8, 10. ])}),
Bin(bindex=1, x_min=9.0, point_xs=array([12.2, 13.6]), point_y_dict={'identity_plus_one': array([14.2, 15.6])}),
Bin(bindex=2, x_min=17.0, point_xs=array([17. , 19.71, 20. , 24. ]), point_y_dict={'identity_plus_one': array([19. , 21.71, 22. , 26. ])}),
])
# ---
# name: test_group_x_bins_log
list([
Bin(bindex=0, x_min=0.0015848899999999994, point_xs=array([0.00158489, 0.00363078, 0.0398107 ]), point_y_dict={'basic_lorentzian': array([0.159154, 0.15915 , 0.158535])}),
Bin(bindex=1, x_min=0.15848899999999994, point_xs=array([ 0.275423, 0.524807, 2.51189 , 8.74984 , 10. ]), point_y_dict={'basic_lorentzian': array([0.134062 , 0.0947588 , 0.00960602, 0.00083808, 0.00064243])}),
])
# ---
# name: test_group_x_bins_mean
list([
tuple(
3.9333333333333336,
dict({
'identity_plus_one': 5.933333333333334,
}),
),
tuple(
12.899999999999999,
dict({
'identity_plus_one': 14.899999999999999,
}),
),
tuple(
20.177500000000002,
dict({
'identity_plus_one': 22.177500000000002,
}),
),
])
# ---
# name: test_group_x_bins_mean_log
list([
tuple(
0.0423015,
dict({
'basic_lorentzian': 0.15817799999999999,
}),
),
tuple(
0.593058,
dict({
'basic_lorentzian': 0.09491108333333335,
}),
),
tuple(
4.0870750000000005,
dict({
'basic_lorentzian': 0.004363105,
}),
),
tuple(
24.196866666666665,
dict({
'basic_lorentzian': 0.0001410066333333333,
}),
),
tuple(
394.723,
dict({
'basic_lorentzian': 1.364947e-06,
}),
),
])
# ---
# name: test_group_x_bins_summary
list([
BinSummary(mean_x=3.9333333333333336, summary_values={'identity_plus_one': BinSummaryValue(mean_y=5.933333333333334, stdev_y=3.635014901390823)}),
BinSummary(mean_x=12.899999999999999, summary_values={'identity_plus_one': BinSummaryValue(mean_y=14.899999999999999, stdev_y=0.9899494936611668)}),
BinSummary(mean_x=20.177500000000002, summary_values={'identity_plus_one': BinSummaryValue(mean_y=22.177500000000002, stdev_y=2.884329789280923)}),
])
# ---
# name: test_group_x_bins_summary_log
list([
BinSummary(mean_x=0.0423015, summary_values={'basic_lorentzian': BinSummaryValue(mean_y=0.15817799999999999, stdev_y=0.001275436787927965)}),
BinSummary(mean_x=0.593058, summary_values={'basic_lorentzian': BinSummaryValue(mean_y=0.09491108333333335, stdev_y=0.05205159393153745)}),
BinSummary(mean_x=4.0870750000000005, summary_values={'basic_lorentzian': BinSummaryValue(mean_y=0.004363105, stdev_y=0.0025964466030423193)}),
BinSummary(mean_x=24.196866666666665, summary_values={'basic_lorentzian': BinSummaryValue(mean_y=0.0001410066333333333, stdev_y=0.00010167601686387665)}),
BinSummary(mean_x=394.723, summary_values={'basic_lorentzian': BinSummaryValue(mean_y=1.364947e-06, stdev_y=1.7011900210905307e-06)}),
])
# ---

View File

@ -0,0 +1,314 @@
import pytest
import tantri.binning.binning as binning
import numpy
def test_bin_config_validation():
with pytest.raises(ValueError):
binning.BinConfig(log_scale=False, bin_width=1, min_points_required=1)
def test_bin_construction_faulty_min():
x_list = numpy.array([5, 6, 7, 8])
bin_config = binning.BinConfig(log_scale=False, bin_width=0.8, bin_min=5.5)
with pytest.raises(ValueError):
binning._construct_bins(x_list, bin_config)
def test_bin_construction_force_min():
x_list = numpy.array([4.5, 5.5, 6.5, 7.5, 8.5])
bin_config = binning.BinConfig(log_scale=False, bin_width=1, bin_min=2)
expected_bins = numpy.array([2, 3, 4, 5, 6, 7, 8, 9])
actual_bins = binning._construct_bins(x_list, bin_config=bin_config)
numpy.testing.assert_allclose(
actual_bins, expected_bins, err_msg="The bins were not as expected"
)
def test_bin_construction_even():
x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33])
bin_config = binning.BinConfig(log_scale=False, bin_width=8)
expected_bins = numpy.array([1, 9, 17, 25, 33])
actual_bins = binning._construct_bins(x_list, bin_config=bin_config)
numpy.testing.assert_allclose(
actual_bins, expected_bins, err_msg="The bins were not as expected"
)
def test_bin_construction_uneven():
x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33])
bin_config = binning.BinConfig(log_scale=False, bin_width=7)
expected_bins = numpy.array([1, 8, 15, 22, 29, 36])
actual_bins = binning._construct_bins(x_list, bin_config=bin_config)
numpy.testing.assert_allclose(
actual_bins, expected_bins, err_msg="The bins were not as expected"
)
def test_bin_construction_uneven_non_integer():
x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33])
bin_config = binning.BinConfig(log_scale=False, bin_width=7.5)
expected_bins = numpy.array([1, 8.5, 16, 23.5, 31, 38.5])
actual_bins = binning._construct_bins(x_list, bin_config=bin_config)
numpy.testing.assert_allclose(
actual_bins, expected_bins, err_msg="The bins were not as expected"
)
def test_group_x_bins(snapshot):
x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33])
y_dict = {
"identity_plus_one": (
numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + 2
)
}
bin_config = binning.BinConfig(log_scale=False, bin_width=8)
# expected_bins = numpy.array([1, 9, 17, 25, 33])
binned = binning.bin_lists(x_list, y_dict, bin_config)
assert binned == snapshot
def test_group_x_bins_mean(snapshot):
x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33])
y_dict = {
"identity_plus_one": (
numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + 2
)
}
bin_config = binning.BinConfig(log_scale=False, bin_width=8)
# expected_bins = numpy.array([1, 9, 17, 25, 33])
binned = binning.bin_lists(x_list, y_dict, bin_config)
mean_binned = [bin.mean_point() for bin in binned]
assert mean_binned == snapshot
def test_group_x_bins_summary(snapshot):
x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33])
y_dict = {
"identity_plus_one": (
numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33]) + 2
)
}
bin_config = binning.BinConfig(log_scale=False, bin_width=8)
# expected_bins = numpy.array([1, 9, 17, 25, 33])
binned = binning.bin_lists(x_list, y_dict, bin_config)
summary = [bin.summary_point() for bin in binned]
assert summary == snapshot
def test_bin_construction_faulty_min_log_scale():
x_list = numpy.array([5, 6, 7, 8])
bin_config = binning.BinConfig(log_scale=True, bin_width=0.8, bin_min=5.5)
with pytest.raises(ValueError):
binning._construct_bins(x_list, bin_config)
def test_bin_construction_force_min_log():
"""
This test shows the main use ofthe bin_min parameter, if we want our bins to nicely line up with decades for example,
then we can force it by ignoring the provided minimum x.
"""
x_list = numpy.array([1500, 5000, 10000, 33253, 400000])
bin_config = binning.BinConfig(log_scale=True, bin_width=1, bin_min=10)
expected_bins = numpy.array([10, 100, 1000, 10000, 100000, 1000000])
actual_bins = binning._construct_bins(x_list, bin_config=bin_config)
numpy.testing.assert_allclose(
actual_bins, expected_bins, err_msg="The bins were not as expected"
)
def test_bin_construction_even_log_scale():
x_list = numpy.array([1, 2.8, 8, 12.2, 13.6, 17, 19.71, 20, 24, 33])
# bin width of 0.3 corresponds to 10^0.3 ~= 2, so we're roughly looking at
bin_config = binning.BinConfig(log_scale=True, bin_width=0.3)
expected_bins = numpy.array(
[
1.00000000000,
1.99526231497,
3.98107170553,
7.94328234724,
15.8489319246,
31.6227766017,
63.0957344480,
]
)
actual_bins = binning._construct_bins(x_list, bin_config=bin_config)
numpy.testing.assert_allclose(
actual_bins, expected_bins, err_msg="The bins were not as expected"
)
def test_group_x_bins_log(snapshot):
x_list = numpy.array(
[
0.00158489,
0.00363078,
0.0398107,
0.275423,
0.524807,
2.51189,
8.74984,
10.0,
63.0957,
3981.07,
]
)
y_dict = {
"basic_lorentzian": numpy.array(
[
0.159154,
0.15915,
0.158535,
0.134062,
0.0947588,
0.00960602,
0.000838084,
0.000642427,
0.0000162008,
4.06987e-9,
]
)
}
bin_config = binning.BinConfig(log_scale=True, bin_width=2)
# expected_bins = numpy.array([1, 9, 17, 25, 33])
binned = binning.bin_lists(x_list, y_dict, bin_config)
assert binned == snapshot
def test_group_x_bins_mean_log(snapshot):
x_list = numpy.array(
[
0.0158489,
0.0316228,
0.0794328,
0.158489,
0.17378,
0.316228,
0.944061,
0.977237,
0.988553,
3.16228,
5.01187,
15.8489,
25.1189,
31.6228,
158.489,
630.957,
]
)
y_dict = {
"basic_lorentzian": (
numpy.array(
[
0.159056,
0.158763,
0.156715,
0.149866,
0.148118,
0.127657,
0.0497503,
0.0474191,
0.0466561,
0.00619907,
0.00252714,
0.000256378,
0.000102165,
0.0000644769,
2.56787e-6,
1.62024e-7,
]
)
)
}
bin_config = binning.BinConfig(log_scale=True, bin_width=1, bin_min=1e-2)
# expected_bins = numpy.array([1, 9, 17, 25, 33])
binned = binning.bin_lists(x_list, y_dict, bin_config)
mean_binned = [bin.mean_point() for bin in binned]
assert mean_binned == snapshot
def test_group_x_bins_summary_log(snapshot):
x_list = numpy.array(
[
0.0158489,
0.0316228,
0.0794328,
0.158489,
0.17378,
0.316228,
0.944061,
0.977237,
0.988553,
3.16228,
5.01187,
15.8489,
25.1189,
31.6228,
158.489,
630.957,
]
)
y_dict = {
"basic_lorentzian": (
numpy.array(
[
0.159056,
0.158763,
0.156715,
0.149866,
0.148118,
0.127657,
0.0497503,
0.0474191,
0.0466561,
0.00619907,
0.00252714,
0.000256378,
0.000102165,
0.0000644769,
2.56787e-6,
1.62024e-7,
]
)
)
}
bin_config = binning.BinConfig(log_scale=True, bin_width=1, bin_min=1e-2)
binned = binning.bin_lists(x_list, y_dict, bin_config)
summary = [bin.summary_point() for bin in binned]
assert summary == snapshot