feat: add subset sim probs command for bayes for subset simulation results
Some checks failed
gitea-physics/deepdog/pipeline/head There was a failure building this commit

This commit is contained in:
Deepak Mallubhotla 2024-05-21 15:54:08 -05:00
parent 1a1ecc01ea
commit c881da2837
Signed by: deepak
GPG Key ID: BEBAEBF28083E022
11 changed files with 565 additions and 121 deletions

View File

@ -0,0 +1,5 @@
from deepdog.cli.subset_sim_probs.main import wrapped_main
__all__ = [
"wrapped_main",
]

View File

@ -0,0 +1,52 @@
import argparse
import os
def parse_args() -> argparse.Namespace:
def dir_path(path):
if os.path.isdir(path):
return path
else:
raise argparse.ArgumentTypeError(f"readable_dir:{path} is not a valid path")
parser = argparse.ArgumentParser(
"subset_sim_probs",
description="Calculating probability from finished subset sim run",
)
parser.add_argument(
"--log_file",
type=str,
help="A filename for logging to, if not provided will only log to stderr",
default=None,
)
parser.add_argument(
"--results-directory",
"-d",
type=dir_path,
help="The directory to search for bayesrun files, defaulting to cwd if not passed",
default=".",
)
parser.add_argument(
"--indexify-json",
help="A json file with the indexify config for parsing job indexes. Will skip if not present",
default="",
)
parser.add_argument(
"--outfile",
"-o",
type=str,
help="output filename for coalesced data. If not provided, will not be written",
default=None,
)
confirm_outfile_overwrite_group = parser.add_mutually_exclusive_group()
confirm_outfile_overwrite_group.add_argument(
"--never-overwrite-outfile",
action="store_true",
help="If a duplicate outfile is detected, skip confirmation and automatically exit early"
)
confirm_outfile_overwrite_group.add_argument(
"--force-overwrite-outfile",
action="store_true",
help="Skips checking for duplicate outfiles and overwrites"
)
return parser.parse_args()

View File

@ -0,0 +1,132 @@
import typing
from deepdog.results import GeneralOutput
import logging
import csv
import tqdm
_logger = logging.getLogger(__name__)
def build_model_dict(
general_outputs: typing.Sequence[GeneralOutput],
) -> typing.Dict[
typing.Tuple, typing.Dict[typing.Tuple, typing.Dict["str", typing.Any]]
]:
"""
Maybe someday do something smarter with the coalescing and stuff but don't want to so i won't
"""
# assume that everything is well formatted and the keys are the same across entire list and initialise list of keys.
# model dict will contain a model_key: {calculation_dict} where each calculation_dict represents a single calculation for that model,
# the uncoalesced version, keyed by the specific file keys
model_dict: typing.Dict[
typing.Tuple, typing.Dict[typing.Tuple, typing.Dict["str", typing.Any]]
] = {}
_logger.info("building model dict")
for out in tqdm.tqdm(general_outputs, desc="reading outputs", leave=False):
for model_result in out.results:
model_key = tuple(v for v in model_result.parsed_model_keys.values())
if model_key not in model_dict:
model_dict[model_key] = {}
calculation_dict = model_dict[model_key]
calculation_key = tuple(v for v in out.data.values())
if calculation_key not in calculation_dict:
calculation_dict[calculation_key] = {
"_model_key_dict": model_result.parsed_model_keys,
"_calculation_key_dict": out.data,
"num_finished_runs": int(model_result.result_dict["num_finished_runs"]),
"num_runs": int(model_result.result_dict["num_runs"]),
"estimated_likelihood": float(model_result.result_dict["estimated_likelihood"]),
}
else:
raise ValueError(
f"Got {calculation_key} twice for model_key {model_key}"
)
return model_dict
def coalesced_dict(
uncoalesced_model_dict: typing.Dict[
typing.Tuple, typing.Dict[typing.Tuple, typing.Dict["str", typing.Any]]
],
):
"""
pass in uncoalesced dict
the minimum_count field is what we use to make sure our probs are never zero
"""
coalesced_dict = {}
# we are already iterating so for no reason because performance really doesn't matter let's count the keys ourselves
num_keys = 0
# first pass coalesce
for model_key, model_dict in uncoalesced_model_dict.items():
num_keys += 1
for calculation in model_dict.values():
if model_key not in coalesced_dict:
coalesced_dict[model_key] = {
"_model_key_dict": calculation["_model_key_dict"].copy(),
"calculations_coalesced": 1,
"num_finished_runs": calculation["num_finished_runs"],
"num_runs": calculation["num_runs"],
"estimated_likelihood": calculation["estimated_likelihood"],
}
else:
_logger.error(f"We shouldn't be here! Double key for {model_key=}")
raise ValueError()
# second pass do probability calculation
prior = 1 / num_keys
_logger.info(f"Got {num_keys} model keys, so our prior will be {prior}")
total_weight = 0
for coalesced_model_dict in coalesced_dict.values():
model_weight = coalesced_model_dict["estimated_likelihood"] * prior
total_weight += model_weight
total_prob = 0
for coalesced_model_dict in coalesced_dict.values():
likelihood = coalesced_model_dict["estimated_likelihood"]
prob = likelihood * prior / total_weight
coalesced_model_dict["prob"] = prob
total_prob += prob
_logger.debug(
f"Got a total probability of {total_prob}, which should be close to 1 up to float/rounding error"
)
return coalesced_dict
def write_coalesced_dict(
coalesced_output_filename: typing.Optional[str],
coalesced_model_dict: typing.Dict[typing.Tuple, typing.Dict["str", typing.Any]],
):
if coalesced_output_filename is None or coalesced_output_filename == "":
_logger.warning("Not provided a uncoalesced filename, not going to try")
return
first_value = next(iter(coalesced_model_dict.values()))
model_field_names = set(first_value["_model_key_dict"].keys())
_logger.info(f"Detected model field names {model_field_names}")
collected_fieldnames = list(model_field_names)
collected_fieldnames.extend(["calculations_coalesced", "num_finished_runs", "num_runs", "prob"])
with open(coalesced_output_filename, "w", newline="") as coalesced_output_file:
writer = csv.DictWriter(coalesced_output_file, fieldnames=collected_fieldnames)
writer.writeheader()
for model_dict in coalesced_model_dict.values():
row = model_dict["_model_key_dict"].copy()
row.update(
{
"calculations_coalesced": model_dict["calculations_coalesced"],
"num_finished_runs": model_dict["num_finished_runs"],
"num_runs": model_dict["num_runs"],
"prob": model_dict["prob"],
}
)
writer.writerow(row)

View File

@ -0,0 +1,106 @@
import logging
import argparse
import json
import deepdog.cli.subset_sim_probs.args
import deepdog.cli.subset_sim_probs.dicts
import deepdog.cli.util
import deepdog.results
import deepdog.indexify
import pathlib
import tqdm
import os
import tqdm.contrib.logging
_logger = logging.getLogger(__name__)
def set_up_logging(log_file: str):
log_pattern = "%(asctime)s | %(levelname)-7s | %(name)s:%(lineno)d | %(message)s"
if log_file is None:
handlers = [
logging.StreamHandler(),
]
else:
handlers = [logging.StreamHandler(), logging.FileHandler(log_file)]
logging.basicConfig(
level=logging.DEBUG,
format=log_pattern,
# it's okay to ignore this mypy error because who cares about logger handler types
handlers=handlers, # type: ignore
)
logging.captureWarnings(True)
def main(args: argparse.Namespace):
"""
Main function with passed in arguments and no additional logging setup in case we want to extract out later
"""
with tqdm.contrib.logging.logging_redirect_tqdm():
_logger.info(f"args: {args}")
if "outfile" in args and args.outfile:
if os.path.exists(args.outfile):
if args.never_overwrite_outfile:
_logger.warning(f"Filename {args.outfile} already exists, and never want overwrite, so aborting.")
return
elif args.force_overwrite_outfile:
_logger.warning(f"Forcing overwrite of {args.outfile}")
else:
# need to confirm
confirm_overwrite = deepdog.cli.util.confirm_prompt(f"Filename {args.outfile} exists, overwrite?")
if not confirm_overwrite:
_logger.warning(f"Filename {args.outfile} already exists and do not want overwrite, aborting.")
return
else:
_logger.warning(f"Overwriting file {args.outfile}")
indexifier = None
if args.indexify_json:
with open(args.indexify_json, "r") as indexify_json_file:
indexify_spec = json.load(indexify_json_file)
indexify_data = indexify_spec["indexes"]
if "seed_spec" in indexify_spec:
seed_spec = indexify_spec["seed_spec"]
indexify_data[seed_spec["field_name"]] = list(
range(seed_spec["num_seeds"])
)
# _logger.debug(f"Indexifier data looks like {indexify_data}")
indexifier = deepdog.indexify.Indexifier(indexify_data)
results_dir = pathlib.Path(args.results_directory)
out_files = [f for f in results_dir.iterdir() if f.name.endswith("subsetsim.csv")]
_logger.info(
f"Reading {len(out_files)} subsetsim.csv files in directory {args.results_directory}"
)
# _logger.info(out_files)
parsed_output_files = [
deepdog.results.read_subset_sim_file(f, indexifier)
for f in tqdm.tqdm(out_files, desc="reading files", leave=False)
]
# Refactor here to allow for arbitrary likelihood file sources
_logger.info("building uncoalesced dict")
uncoalesced_dict = deepdog.cli.subset_sim_probs.dicts.build_model_dict(
parsed_output_files
)
_logger.info("building coalesced dict")
coalesced = deepdog.cli.subset_sim_probs.dicts.coalesced_dict(uncoalesced_dict)
if "outfile" in args and args.outfile:
deepdog.cli.subset_sim_probs.dicts.write_coalesced_dict(
args.outfile, coalesced
)
else:
_logger.info("Skipping writing coalesced")
def wrapped_main():
args = deepdog.cli.subset_sim_probs.args.parse_args()
set_up_logging(args.log_file)
main(args)

View File

@ -0,0 +1,3 @@
from deepdog.cli.util.confirm import confirm_prompt
__all__ = ["confirm_prompt"]

View File

@ -0,0 +1,25 @@
import sys
_RESPONSE_MAP = {
"yes": True,
"ye": True,
"y": True,
"no": False,
"n": False,
"nope": False,
"true": True,
"false": False
}
def confirm_prompt(question: str) -> bool:
""" Prompt with the question and returns yes or no based on response.
"""
prompt = question + " [y/n]: "
while True:
choice = input(prompt).lower()
if choice in _RESPONSE_MAP:
return _RESPONSE_MAP[choice]
else:
print(f"Respond with \"yes\" or \"no\"")

View File

@ -5,6 +5,13 @@ import logging
import deepdog.indexify import deepdog.indexify
import pathlib import pathlib
import csv import csv
from deepdog.results.read_csv import (
parse_bayesrun_row,
BayesrunModelResult,
parse_general_row,
GeneralModelResult,
)
from deepdog.results.filename import parse_file_slug
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
@ -12,67 +19,19 @@ FILENAME_REGEX = re.compile(
r"(?P<timestamp>\d{8}-\d{6})-(?P<filename_slug>.*)\.realdata\.fast_filter\.bayesrun\.csv" r"(?P<timestamp>\d{8}-\d{6})-(?P<filename_slug>.*)\.realdata\.fast_filter\.bayesrun\.csv"
) )
MODEL_REGEXES = [
re.compile(pattern)
for pattern in [
r"geom_(?P<xmin>-?\d+)_(?P<xmax>-?\d+)_(?P<ymin>-?\d+)_(?P<ymax>-?\d+)_(?P<zmin>-?\d+)_(?P<zmax>-?\d+)-orientation_(?P<orientation>free|fixedxy|fixedz)-dipole_count_(?P<avg_filled>\d+)_(?P<field_name>\w*)",
r"geom_(?P<xmin>-?\d+)_(?P<xmax>-?\d+)_(?P<ymin>-?\d+)_(?P<ymax>-?\d+)_(?P<zmin>-?\d+)_(?P<zmax>-?\d+)-magnitude_(?P<log_magnitude>\d*\.?\d+)-orientation_(?P<orientation>free|fixedxy|fixedz)-dipole_count_(?P<avg_filled>\d+)_(?P<field_name>\w*)",
r"geom_(?P<xmin>-?\d*\.?\d+)_(?P<xmax>-?\d*\.?\d+)_(?P<ymin>-?\d*\.?\d+)_(?P<ymax>-?\d*\.?\d+)_(?P<zmin>-?\d*\.?\d+)_(?P<zmax>-?\d*\.?\d+)-magnitude_(?P<log_magnitude>\d*\.?\d+)-orientation_(?P<orientation>free|fixedxy|fixedz)-dipole_count_(?P<avg_filled>\d+)_(?P<field_name>\w*)",
]
]
FILE_SLUG_REGEXES = [ SUBSET_SIM_FILENAME_REGEX = re.compile(
re.compile(pattern) r"(?P<filename_slug>.*)-(?:no_adaptive_steps_)?(?P<num_ss_runs>\d+)-nc_(?P<n_c>\d+)-ns_(?P<n_s>\d+)-mmax_(?P<mmax>\d+)\.multi\.subsetsim\.csv"
for pattern in [ )
r"(?P<tag>\w+)-(?P<job_index>\d+)",
r"mock_tarucha-(?P<job_index>\d+)",
r"(?:(?P<mock>mock)_)?tarucha(?:_(?P<tarucha_run_id>\d+))?-(?P<job_index>\d+)",
]
]
SIMPLE_TAG_REGEX = re.compile(r"\w+-\d+")
@dataclasses.dataclass @dataclasses.dataclass
class BayesrunOutputFilename: class BayesrunOutputFilename:
timestamp: str timestamp: typing.Optional[str]
filename_slug: str filename_slug: str
path: pathlib.Path path: pathlib.Path
class BayesrunColumnParsed:
"""
class for parsing a bayesrun while pulling certain special fields out
"""
def __init__(self, groupdict: typing.Dict[str, str]):
self.column_field = groupdict["field_name"]
self.model_field_dict = {
k: v for k, v in groupdict.items() if k != "field_name"
}
self._groupdict_str = repr(groupdict)
def __str__(self):
return f"BayesrunColumnParsed[{self.column_field}: {self.model_field_dict}]"
def __repr__(self):
return f"BayesrunColumnParsed({self._groupdict_str})"
def __eq__(self, other):
if isinstance(other, BayesrunColumnParsed):
return (self.column_field == other.column_field) and (
self.model_field_dict == other.model_field_dict
)
return NotImplemented
@dataclasses.dataclass
class BayesrunModelResult:
parsed_model_keys: typing.Dict[str, str]
success: int
count: int
@dataclasses.dataclass @dataclasses.dataclass
class BayesrunOutput: class BayesrunOutput:
filename: BayesrunOutputFilename filename: BayesrunOutputFilename
@ -80,57 +39,11 @@ class BayesrunOutput:
results: typing.Sequence[BayesrunModelResult] results: typing.Sequence[BayesrunModelResult]
def _batch_iterable_into_chunks(iterable, n=1): @dataclasses.dataclass
""" class GeneralOutput:
utility for batching bayesrun files where columns appear in threes filename: BayesrunOutputFilename
""" data: typing.Dict["str", typing.Any]
for ndx in range(0, len(iterable), n): results: typing.Sequence[GeneralModelResult]
yield iterable[ndx : min(ndx + n, len(iterable))]
def _parse_bayesrun_column(
column: str,
) -> typing.Optional[BayesrunColumnParsed]:
"""
Tries one by one all of a predefined list of regexes that I might have used in the past.
Returns the groupdict for the first match, or None if no match found.
"""
for pattern in MODEL_REGEXES:
match = pattern.match(column)
if match:
return BayesrunColumnParsed(match.groupdict())
else:
return None
def _parse_bayesrun_row(
row: typing.Dict[str, str],
) -> typing.Sequence[BayesrunModelResult]:
results = []
batched_keys = _batch_iterable_into_chunks(list(row.keys()), 3)
for model_keys in batched_keys:
parsed = [_parse_bayesrun_column(column) for column in model_keys]
values = [row[column] for column in model_keys]
if parsed[0] is None:
raise ValueError(f"no viable success row found for keys {model_keys}")
if parsed[1] is None:
raise ValueError(f"no viable count row found for keys {model_keys}")
if parsed[0].column_field != "success":
raise ValueError(f"The column {model_keys[0]} is not a success field")
if parsed[1].column_field != "count":
raise ValueError(f"The column {model_keys[1]} is not a count field")
parsed_keys = parsed[0].model_field_dict
success = int(values[0])
count = int(values[1])
results.append(
BayesrunModelResult(
parsed_model_keys=parsed_keys,
success=success,
count=count,
)
)
return results
def _parse_output_filename(file: pathlib.Path) -> BayesrunOutputFilename: def _parse_output_filename(file: pathlib.Path) -> BayesrunOutputFilename:
@ -144,24 +57,26 @@ def _parse_output_filename(file: pathlib.Path) -> BayesrunOutputFilename:
) )
def _parse_file_slug(slug: str) -> typing.Optional[typing.Dict[str, str]]: def _parse_ss_output_filename(file: pathlib.Path) -> BayesrunOutputFilename:
for pattern in FILE_SLUG_REGEXES: filename = file.name
match = pattern.match(slug) match = SUBSET_SIM_FILENAME_REGEX.match(filename)
if match: if not match:
return match.groupdict() raise ValueError(f"{filename} was not a valid subset sim output")
else: groups = match.groupdict()
return None return BayesrunOutputFilename(
filename_slug=groups["filename_slug"], path=file, timestamp=None
)
def read_output_file( def read_subset_sim_file(
file: pathlib.Path, indexifier: typing.Optional[deepdog.indexify.Indexifier] file: pathlib.Path, indexifier: typing.Optional[deepdog.indexify.Indexifier]
) -> BayesrunOutput: ) -> GeneralOutput:
parsed_filename = tag = _parse_output_filename(file) parsed_filename = tag = _parse_ss_output_filename(file)
out = BayesrunOutput(filename=parsed_filename, data={}, results=[]) out = GeneralOutput(filename=parsed_filename, data={}, results=[])
out.data.update(dataclasses.asdict(tag)) out.data.update(dataclasses.asdict(tag))
parsed_tag = _parse_file_slug(parsed_filename.filename_slug) parsed_tag = parse_file_slug(parsed_filename.filename_slug)
if parsed_tag is None: if parsed_tag is None:
_logger.warning( _logger.warning(
f"Could not parse {tag} against any matching regexes. Going to skip tag parsing" f"Could not parse {tag} against any matching regexes. Going to skip tag parsing"
@ -186,8 +101,53 @@ def read_output_file(
row = rows[0] row = rows[0]
else: else:
raise ValueError(f"Confused about having multiple rows in {file.name}") raise ValueError(f"Confused about having multiple rows in {file.name}")
results = _parse_bayesrun_row(row) results = parse_general_row(
row, ("num_finished_runs", "num_runs", None, "estimated_likelihood")
)
out.results = results out.results = results
return out return out
def read_output_file(
file: pathlib.Path, indexifier: typing.Optional[deepdog.indexify.Indexifier]
) -> BayesrunOutput:
parsed_filename = tag = _parse_output_filename(file)
out = BayesrunOutput(filename=parsed_filename, data={}, results=[])
out.data.update(dataclasses.asdict(tag))
parsed_tag = parse_file_slug(parsed_filename.filename_slug)
if parsed_tag is None:
_logger.warning(
f"Could not parse {tag} against any matching regexes. Going to skip tag parsing"
)
else:
out.data.update(parsed_tag)
if indexifier is not None:
try:
job_index = parsed_tag["job_index"]
indexified = indexifier.indexify(int(job_index))
out.data.update(indexified)
except KeyError:
# This isn't really that important of an error, apart from the warning
_logger.warning(
f"Parsed tag to {parsed_tag}, and attempted to indexify but no job_index key was found. skipping and moving on"
)
with file.open() as input_file:
reader = csv.DictReader(input_file)
rows = [r for r in reader]
if len(rows) == 1:
row = rows[0]
else:
raise ValueError(f"Confused about having multiple rows in {file.name}")
results = parse_bayesrun_row(row)
out.results = results
return out
__all__ = ["read_output_file", "BayesrunOutput"]

View File

@ -0,0 +1,21 @@
import re
import typing
FILE_SLUG_REGEXES = [
re.compile(pattern)
for pattern in [
r"(?P<tag>\w+)-(?P<job_index>\d+)",
r"mock_tarucha-(?P<job_index>\d+)",
r"(?:(?P<mock>mock)_)?tarucha(?:_(?P<tarucha_run_id>\d+))?-(?P<job_index>\d+)",
]
]
def parse_file_slug(slug: str) -> typing.Optional[typing.Dict[str, str]]:
for pattern in FILE_SLUG_REGEXES:
match = pattern.match(slug)
if match:
return match.groupdict()
else:
return None

139
deepdog/results/read_csv.py Normal file
View File

@ -0,0 +1,139 @@
import typing
import re
import dataclasses
MODEL_REGEXES = [
re.compile(pattern)
for pattern in [
r"geom_(?P<xmin>-?\d+)_(?P<xmax>-?\d+)_(?P<ymin>-?\d+)_(?P<ymax>-?\d+)_(?P<zmin>-?\d+)_(?P<zmax>-?\d+)-orientation_(?P<orientation>free|fixedxy|fixedz)-dipole_count_(?P<avg_filled>\d+)_(?P<field_name>\w*)",
r"geom_(?P<xmin>-?\d+)_(?P<xmax>-?\d+)_(?P<ymin>-?\d+)_(?P<ymax>-?\d+)_(?P<zmin>-?\d+)_(?P<zmax>-?\d+)-magnitude_(?P<log_magnitude>\d*\.?\d+)-orientation_(?P<orientation>free|fixedxy|fixedz)-dipole_count_(?P<avg_filled>\d+)_(?P<field_name>\w*)",
r"geom_(?P<xmin>-?\d*\.?\d+)_(?P<xmax>-?\d*\.?\d+)_(?P<ymin>-?\d*\.?\d+)_(?P<ymax>-?\d*\.?\d+)_(?P<zmin>-?\d*\.?\d+)_(?P<zmax>-?\d*\.?\d+)-magnitude_(?P<log_magnitude>\d*\.?\d+)-orientation_(?P<orientation>free|fixedxy|fixedz)-dipole_count_(?P<avg_filled>\d+)_(?P<field_name>\w*)",
]
]
@dataclasses.dataclass
class BayesrunModelResult:
parsed_model_keys: typing.Dict[str, str]
success: int
count: int
@dataclasses.dataclass
class GeneralModelResult:
parsed_model_keys: typing.Dict[str, str]
result_dict: typing.Dict[str, str]
class BayesrunColumnParsed:
"""
class for parsing a bayesrun while pulling certain special fields out
"""
def __init__(self, groupdict: typing.Dict[str, str]):
self.column_field = groupdict["field_name"]
self.model_field_dict = {
k: v for k, v in groupdict.items() if k != "field_name"
}
self._groupdict_str = repr(groupdict)
def __str__(self):
return f"BayesrunColumnParsed[{self.column_field}: {self.model_field_dict}]"
def __repr__(self):
return f"BayesrunColumnParsed({self._groupdict_str})"
def __eq__(self, other):
if isinstance(other, BayesrunColumnParsed):
return (self.column_field == other.column_field) and (
self.model_field_dict == other.model_field_dict
)
return NotImplemented
def _parse_bayesrun_column(
column: str,
) -> typing.Optional[BayesrunColumnParsed]:
"""
Tries one by one all of a predefined list of regexes that I might have used in the past.
Returns the groupdict for the first match, or None if no match found.
"""
for pattern in MODEL_REGEXES:
match = pattern.match(column)
if match:
return BayesrunColumnParsed(match.groupdict())
else:
return None
def _batch_iterable_into_chunks(iterable, n=1):
"""
utility for batching bayesrun files where columns appear in threes
"""
for ndx in range(0, len(iterable), n):
yield iterable[ndx : min(ndx + n, len(iterable))]
def parse_general_row(
row: typing.Dict[str, str],
expected_fields: typing.Sequence[typing.Optional[str]],
) -> typing.Sequence[GeneralModelResult]:
results = []
batched_keys = _batch_iterable_into_chunks(list(row.keys()), len(expected_fields))
for model_keys in batched_keys:
parsed = [_parse_bayesrun_column(column) for column in model_keys]
values = [row[column] for column in model_keys]
result_dict = {}
parsed_keys = None
for expected_field, parsed_field, value in zip(expected_fields, parsed, values):
if expected_field is None:
continue
if parsed_field is None:
raise ValueError(
f"No viable row found for {expected_field=} in {model_keys=}"
)
if parsed_field.column_field != expected_field:
raise ValueError(
f"The column {parsed_field.column_field} does not match expected {expected_field}"
)
result_dict[expected_field] = value
if parsed_keys is None:
parsed_keys = parsed_field.model_field_dict
if parsed_keys is None:
raise ValueError(f"Somehow parsed keys is none here, for {row=}")
results.append(
GeneralModelResult(parsed_model_keys=parsed_keys, result_dict=result_dict)
)
return results
def parse_bayesrun_row(
row: typing.Dict[str, str],
) -> typing.Sequence[BayesrunModelResult]:
results = []
batched_keys = _batch_iterable_into_chunks(list(row.keys()), 3)
for model_keys in batched_keys:
parsed = [_parse_bayesrun_column(column) for column in model_keys]
values = [row[column] for column in model_keys]
if parsed[0] is None:
raise ValueError(f"no viable success row found for keys {model_keys}")
if parsed[1] is None:
raise ValueError(f"no viable count row found for keys {model_keys}")
if parsed[0].column_field != "success":
raise ValueError(f"The column {model_keys[0]} is not a success field")
if parsed[1].column_field != "count":
raise ValueError(f"The column {model_keys[1]} is not a count field")
parsed_keys = parsed[0].model_field_dict
success = int(values[0])
count = int(values[1])
results.append(
BayesrunModelResult(
parsed_model_keys=parsed_keys,
success=success,
count=count,
)
)
return results

View File

@ -22,6 +22,7 @@ syrupy = "^4.0.8"
[tool.poetry.scripts] [tool.poetry.scripts]
probs = "deepdog.cli.probs:wrapped_main" probs = "deepdog.cli.probs:wrapped_main"
subset_sim_probs = "deepdog.cli.subset_sim_probs:wrapped_main"
[build-system] [build-system]
requires = ["poetry-core>=1.0.0"] requires = ["poetry-core>=1.0.0"]

View File

@ -1,4 +1,4 @@
import deepdog.results import deepdog.results.read_csv
def test_parse_groupdict(): def test_parse_groupdict():
@ -6,9 +6,9 @@ def test_parse_groupdict():
"geom_-20_20_-10_10_0_5-orientation_free-dipole_count_100_success" "geom_-20_20_-10_10_0_5-orientation_free-dipole_count_100_success"
) )
parsed = deepdog.results._parse_bayesrun_column(example_column_name) parsed = deepdog.results.read_csv._parse_bayesrun_column(example_column_name)
assert parsed is not None assert parsed is not None
expected = deepdog.results.BayesrunColumnParsed( expected = deepdog.results.read_csv.BayesrunColumnParsed(
{ {
"xmin": "-20", "xmin": "-20",
"xmax": "20", "xmax": "20",
@ -29,9 +29,9 @@ def test_parse_groupdict_with_magnitude():
"geom_-20_20_-10_10_0_5-magnitude_3.5-orientation_free-dipole_count_100_success" "geom_-20_20_-10_10_0_5-magnitude_3.5-orientation_free-dipole_count_100_success"
) )
parsed = deepdog.results._parse_bayesrun_column(example_column_name) parsed = deepdog.results.read_csv._parse_bayesrun_column(example_column_name)
assert parsed is not None assert parsed is not None
expected = deepdog.results.BayesrunColumnParsed( expected = deepdog.results.read_csv.BayesrunColumnParsed(
{ {
"xmin": "-20", "xmin": "-20",
"xmax": "20", "xmax": "20",