feat: adds cli probs

This commit is contained in:
Deepak Mallubhotla 2024-04-27 18:43:25 -05:00
parent 12e6916ab2
commit 4b2e573715
Signed by: deepak
GPG Key ID: BEBAEBF28083E022
12 changed files with 555 additions and 0 deletions

0
deepdog/cli/__init__.py Normal file
View File

View File

@ -0,0 +1,5 @@
from deepdog.cli.probs.main import main
__all__ = [
"main",
]

63
deepdog/cli/probs/args.py Normal file
View File

@ -0,0 +1,63 @@
import argparse
import os
def parse_args() -> argparse.Namespace:
def dir_path(path):
if os.path.isdir(path):
return path
else:
raise argparse.ArgumentTypeError(f"readable_dir:{path} is not a valid path")
parser = argparse.ArgumentParser(
"probs", description="Calculating probability from finished bayesrun"
)
parser.add_argument(
"--log_file",
type=str,
help="A filename for logging to, if not provided will only log to stderr",
default=None,
)
parser.add_argument(
"--bayesrun-directory",
"-d",
type=dir_path,
help="The directory to search for bayesrun files, defaulting to cwd if not passed",
default=".",
)
parser.add_argument(
"--indexify-json",
help="A json file with the indexify config for parsing job indexes",
default="indexes.json",
)
parser.add_argument(
"--seed-index",
type=int,
help='take an integer to append as a "seed" key with range at end of indexify dict. Skip if <= 0',
default=0,
)
parser.add_argument(
"--seed-fieldname",
type=str,
help='if --seed-index is set, the fieldname to append to the indexifier. "seed" by default',
default="seed",
)
parser.add_argument(
"--coalesced-keys",
type=str,
help="A comma separated list of strings over which to coalesce data. By default coalesce over all fields within model names, ignore file level names",
default="",
)
parser.add_argument(
"--uncoalesced-outfile",
type=str,
help="output filename for uncoalesced data. If not provided, will not be written",
default=None,
)
parser.add_argument(
"--coalesced-outfile",
type=str,
help="output filename for coalesced data. If not provided, will not be written",
default=None,
)
return parser.parse_args()

137
deepdog/cli/probs/dicts.py Normal file
View File

@ -0,0 +1,137 @@
import typing
from deepdog.results import BayesrunOutput
import logging
import csv
_logger = logging.getLogger(__name__)
def build_model_dict(
bayes_outputs: typing.Sequence[BayesrunOutput],
) -> typing.Dict[
typing.Tuple, typing.Dict[typing.Tuple, typing.Dict["str", typing.Any]]
]:
"""
Maybe someday do something smarter with the coalescing and stuff but don't want to so i won't
"""
# assume that everything is well formatted and the keys are the same across entire list and initialise list of keys.
# model dict will contain a model_key: {calculation_dict} where each calculation_dict represents a single calculation for that model,
# the uncoalesced version, keyed by the specific file keys
model_dict: typing.Dict[
typing.Tuple, typing.Dict[typing.Tuple, typing.Dict["str", typing.Any]]
] = {}
for out in bayes_outputs:
for model_result in out.results:
model_key = tuple(v for v in model_result.parsed_model_keys.values())
if model_key not in model_dict:
model_dict[model_key] = {}
calculation_dict = model_dict[model_key]
calculation_key = tuple(v for v in out.data.values())
if calculation_key not in calculation_dict:
calculation_dict[calculation_key] = {
"_model_key_dict": model_result.parsed_model_keys,
"_calculation_key_dict": out.data,
"success": model_result.success,
"count": model_result.count,
}
else:
raise ValueError(
f"Got {calculation_key} twice for model_key {model_key}"
)
return model_dict
def write_uncoalesced_dict(
uncoalesced_output_filename: typing.Optional[str],
uncoalesced_model_dict: typing.Dict[
typing.Tuple, typing.Dict[typing.Tuple, typing.Dict["str", typing.Any]]
],
):
if uncoalesced_output_filename is None or uncoalesced_output_filename == "":
_logger.warning("Not provided a uncoalesced filename, not going to try")
return
first_value = next(iter(next(iter(uncoalesced_model_dict.values())).values()))
model_field_names = set(first_value["_model_key_dict"].keys())
calculation_field_names = set(first_value["_calculation_key_dict"].keys())
if not (set(model_field_names).isdisjoint(calculation_field_names)):
_logger.info(f"Detected model field names {model_field_names}")
_logger.info(f"Detected calculation field names {calculation_field_names}")
raise ValueError(
f"model field names {model_field_names} and calculation {calculation_field_names} have an overlap, which is possibly a problem"
)
collected_fieldnames = list(model_field_names)
collected_fieldnames.extend(calculation_field_names)
collected_fieldnames.extend(["success", "count"])
_logger.info(f"Full uncoalesced fieldnames are {collected_fieldnames}")
with open(uncoalesced_output_filename, "w", newline="") as uncoalesced_output_file:
writer = csv.DictWriter(
uncoalesced_output_file, fieldnames=collected_fieldnames
)
writer.writeheader()
for model_dict in uncoalesced_model_dict.values():
for calculation in model_dict.values():
row = calculation["_model_key_dict"].copy()
row.update(calculation["_calculation_key_dict"].copy())
row.update(
{
"success": calculation["success"],
"count": calculation["count"],
}
)
writer.writerow(row)
def coalesced_dict(
uncoalesced_model_dict: typing.Dict[
typing.Tuple, typing.Dict[typing.Tuple, typing.Dict["str", typing.Any]]
]
):
coalesced_dict = {}
for model_key, model_dict in uncoalesced_model_dict.items():
for calculation in model_dict.values():
if model_key not in coalesced_dict:
coalesced_dict[model_key] = {
"_model_key_dict": calculation["_model_key_dict"].copy(),
"calculations_coalesced": 0,
"count": 0,
"success": 0,
}
sub_dict = coalesced_dict[model_key]
sub_dict["calculations_coalesced"] += 1
sub_dict["count"] += calculation["count"]
sub_dict["success"] += calculation["success"]
return coalesced_dict
def write_coalesced_dict(
coalesced_output_filename: typing.Optional[str],
coalesced_model_dict: typing.Dict[typing.Tuple, typing.Dict["str", typing.Any]],
):
if coalesced_output_filename is None or coalesced_output_filename == "":
_logger.warning("Not provided a uncoalesced filename, not going to try")
return
first_value = next(iter(coalesced_model_dict.values()))
model_field_names = set(first_value["_model_key_dict"].keys())
_logger.info(f"Detected model field names {model_field_names}")
collected_fieldnames = list(model_field_names)
collected_fieldnames.extend(["calculations_coalesced", "success", "count"])
with open(coalesced_output_filename, "w", newline="") as coalesced_output_file:
writer = csv.DictWriter(coalesced_output_file, fieldnames=collected_fieldnames)
writer.writeheader()
for model_dict in coalesced_model_dict.values():
row = model_dict["_model_key_dict"].copy()
row.update(
{
"calculations_coalesced": model_dict["calculations_coalesced"],
"success": model_dict["success"],
"count": model_dict["count"],
}
)
writer.writerow(row)

80
deepdog/cli/probs/main.py Normal file
View File

@ -0,0 +1,80 @@
import logging
import argparse
import json
import deepdog.cli.probs.args
import deepdog.cli.probs.dicts
import deepdog.results
import deepdog.indexify
import pathlib
_logger = logging.getLogger(__name__)
def set_up_logging(log_file: str):
log_pattern = "%(asctime)s | %(levelname)-7s | %(name)s:%(lineno)d | %(message)s"
if log_file is None:
handlers = [
logging.StreamHandler(),
]
else:
handlers = [logging.StreamHandler(), logging.FileHandler(log_file)]
logging.basicConfig(
level=logging.DEBUG,
format=log_pattern,
# it's okay to ignore this mypy error because who cares about logger handler types
handlers=handlers, # type: ignore
)
logging.captureWarnings(True)
def wrapped_main(args: argparse.Namespace):
"""
Main function with passed in arguments and no additional logging setup in case we want to extract out later
"""
_logger.info(f"args: {args}")
if args.coalesced_keys:
raise NotImplementedError(
"Currently not supporting coalesced keys, but maybe in future"
)
with open(args.indexify_json, "r") as indexify_json_file:
indexify_data = json.load(indexify_json_file)
if args.seed_index > 0:
indexify_data[args.seed_fieldname] = list(range(args.seed_index))
# _logger.debug(f"Indexifier data looks like {indexify_data}")
indexifier = deepdog.indexify.Indexifier(indexify_data)
bayes_dir = pathlib.Path(args.bayesrun_directory)
out_files = [f for f in bayes_dir.iterdir() if f.name.endswith("bayesrun.csv")]
_logger.info(
f"Found {len(out_files)} bayesrun.csv files in directory {args.bayesrun_directory}"
)
# _logger.info(out_files)
parsed_output_files = [
deepdog.results.read_output_file(f, indexifier) for f in out_files
]
_logger.info("building uncoalesced dict")
uncoalesced_dict = deepdog.cli.probs.dicts.build_model_dict(parsed_output_files)
if args.uncoalesced_outfile:
deepdog.cli.probs.dicts.write_uncoalesced_dict(
args.uncoalesced_outfile, uncoalesced_dict
)
else:
_logger.info("Skipping writing uncoalesced")
_logger.info("building coalesced dict")
coalesced = deepdog.cli.probs.dicts.coalesced_dict(uncoalesced_dict)
if args.coalesced_outfile:
deepdog.cli.probs.dicts.write_coalesced_dict(args.coalesced_outfile, coalesced)
else:
_logger.info("Skipping writing coalesced")
def main():
args = deepdog.cli.probs.args.parse_args()
set_up_logging(args.log_file)
wrapped_main(args)

View File

@ -0,0 +1,58 @@
"""
Probably should just include a way to handle the indexify function I reuse so much.
All about breaking an integer into a tuple of values from lists, which is useful because of how we do CHTC runs.
"""
import itertools
import typing
import logging
import math
_logger = logging.getLogger(__name__)
# from https://stackoverflow.com/questions/5228158/cartesian-product-of-a-dictionary-of-lists
def _dict_product(dicts):
"""
>>> list(dict_product(dict(number=[1,2], character='ab')))
[{'character': 'a', 'number': 1},
{'character': 'a', 'number': 2},
{'character': 'b', 'number': 1},
{'character': 'b', 'number': 2}]
"""
return list(dict(zip(dicts.keys(), x)) for x in itertools.product(*dicts.values()))
class Indexifier:
"""
The order of keys is very important, but collections.OrderedDict is no longer needed in python 3.7.
I think it's okay to rely on that.
"""
def __init__(self, list_dict: typing.Dict[str, typing.Sequence]):
self.dict = list_dict
def indexify(self, n: int) -> typing.Dict[str, typing.Any]:
product_dict = _dict_product(self.dict)
return product_dict[n]
def _indexify_indices(self, n: int) -> typing.Sequence[int]:
"""
legacy indexify from old scripts, copypast.
could be used like
>>> ret = {}
>>> for k, i in zip(self.dict.keys(), self._indexify_indices):
>>> ret[k] = self.dict[k][i]
>>> return ret
"""
weights = [len(v) for v in self.dict.values()]
N = math.prod(weights)
curr_n = n
curr_N = N
out = []
for w in weights[:-1]:
# print(f"current: {curr_N}, {curr_n}, {curr_n // w}")
curr_N = curr_N // w # should be int division anyway
out.append(curr_n // curr_N)
curr_n = curr_n % curr_N
return out

169
deepdog/results/__init__.py Normal file
View File

@ -0,0 +1,169 @@
import dataclasses
import re
import typing
import logging
import deepdog.indexify
import pathlib
import csv
_logger = logging.getLogger(__name__)
FILENAME_REGEX = r"(?P<timestamp>\d{8}-\d{6})-(?P<filename_slug>.*)\.realdata\.fast_filter\.bayesrun\.csv"
MODEL_REGEXES = [
r"geom_(?P<xmin>-?\d+)_(?P<xmax>-?\d+)_(?P<ymin>-?\d+)_(?P<ymax>-?\d+)_(?P<zmin>-?\d+)_(?P<zmax>-?\d+)-orientation_(?P<orientation>free|fixedxy|fixedz)-dipole_count_(?P<avg_filled>\d+)_(?P<field_name>\w*)"
]
FILE_SLUG_REGEXES = [
r"mock_tarucha-(?P<job_index>\d+)",
]
@dataclasses.dataclass
class BayesrunOutputFilename:
timestamp: str
filename_slug: str
path: pathlib.Path
@dataclasses.dataclass
class BayesrunColumnParsed:
"""
class for parsing a bayesrun while pulling certain special fields out
"""
def __init__(self, groupdict: typing.Dict[str, str]):
self.column_field = groupdict["field_name"]
self.model_field_dict = {
k: v for k, v in groupdict.items() if k != "field_name"
}
def __str__(self):
return f"BayesrunColumnParsed[{self.column_field}: {self.model_field_dict}]"
@dataclasses.dataclass
class BayesrunModelResult:
parsed_model_keys: typing.Dict[str, str]
success: int
count: int
@dataclasses.dataclass
class BayesrunOutput:
filename: BayesrunOutputFilename
data: typing.Dict["str", typing.Any]
results: typing.Sequence[BayesrunModelResult]
def _batch_iterable_into_chunks(iterable, n=1):
"""
utility for batching bayesrun files where columns appear in threes
"""
for ndx in range(0, len(iterable), n):
yield iterable[ndx : min(ndx + n, len(iterable))]
def _parse_bayesrun_column(
column: str,
) -> typing.Optional[BayesrunColumnParsed]:
"""
Tries one by one all of a predefined list of regexes that I might have used in the past.
Returns the groupdict for the first match, or None if no match found.
"""
for pattern in MODEL_REGEXES:
match = re.match(pattern, column)
if match:
return BayesrunColumnParsed(match.groupdict())
else:
return None
def _parse_bayesrun_row(
row: typing.Dict[str, str],
) -> typing.Sequence[BayesrunModelResult]:
results = []
batched_keys = _batch_iterable_into_chunks(list(row.keys()), 3)
for model_keys in batched_keys:
parsed = [_parse_bayesrun_column(column) for column in model_keys]
values = [row[column] for column in model_keys]
if parsed[0] is None:
raise ValueError(f"no viable success row found for keys {model_keys}")
if parsed[1] is None:
raise ValueError(f"no viable count row found for keys {model_keys}")
if parsed[0].column_field != "success":
raise ValueError(f"The column {model_keys[0]} is not a success field")
if parsed[1].column_field != "count":
raise ValueError(f"The column {model_keys[1]} is not a count field")
parsed_keys = parsed[0].model_field_dict
success = int(values[0])
count = int(values[1])
results.append(
BayesrunModelResult(
parsed_model_keys=parsed_keys,
success=success,
count=count,
)
)
return results
def _parse_output_filename(file: pathlib.Path) -> BayesrunOutputFilename:
filename = file.name
match = re.match(FILENAME_REGEX, filename)
if not match:
raise ValueError(f"{filename} was not a valid bayesrun output")
groups = match.groupdict()
return BayesrunOutputFilename(
timestamp=groups["timestamp"], filename_slug=groups["filename_slug"], path=file
)
def _parse_file_slug(slug: str) -> typing.Optional[typing.Dict[str, str]]:
for pattern in FILE_SLUG_REGEXES:
match = re.match(pattern, slug)
if match:
return match.groupdict()
else:
return None
def read_output_file(
file: pathlib.Path, indexifier: typing.Optional[deepdog.indexify.Indexifier]
) -> BayesrunOutput:
parsed_filename = tag = _parse_output_filename(file)
out = BayesrunOutput(filename=parsed_filename, data={}, results=[])
out.data.update(dataclasses.asdict(tag))
parsed_tag = _parse_file_slug(parsed_filename.filename_slug)
if parsed_tag is None:
_logger.warning(
f"Could not parse {tag} against any matching regexes. Going to skip tag parsing"
)
else:
out.data.update(parsed_tag)
if indexifier is not None:
try:
job_index = parsed_tag["job_index"]
indexified = indexifier.indexify(int(job_index))
out.data.update(indexified)
except KeyError:
# This isn't really that important of an error, apart from the warning
_logger.warning(
f"Parsed tag to {parsed_tag}, and attempted to indexify but no job_index key was found. skipping and moving on"
)
with file.open() as input_file:
reader = csv.DictReader(input_file)
rows = [r for r in reader]
if len(rows) == 1:
row = rows[0]
else:
raise ValueError(f"Confused about having multiple rows in {file.name}")
results = _parse_bayesrun_row(row)
out.results = results
return out

View File

@ -19,6 +19,9 @@ python-semantic-release = "^7.24.0"
black = "^22.3.0"
syrupy = "^4.0.8"
[tool.poetry.scripts]
probs = "deepdog.cli.probs:main"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

View File

View File

@ -0,0 +1,12 @@
import deepdog.indexify
import logging
_logger = logging.getLogger(__name__)
def test_indexifier():
weight_dict = {"key_1": [1, 2, 3], "key_2": ["a", "b", "c"]}
indexifier = deepdog.indexify.Indexifier(weight_dict)
_logger.debug(f"setting up indexifier {indexifier}")
assert indexifier.indexify(0) == {"key_1": 1, "key_2": "a"}
assert indexifier.indexify(5) == {"key_1": 2, "key_2": "c"}

View File

View File

@ -0,0 +1,28 @@
import deepdog.results
def test_parse_groupdict():
example_column_name = (
"geom_-20_20_-10_10_0_5-orientation_free-dipole_count_100_success"
)
parsed = deepdog.results._parse_bayesrun_column(example_column_name)
expected = deepdog.results.BayesrunColumnParsed(
{
"xmin": "-20",
"xmax": "20",
"ymin": "-10",
"ymax": "10",
"zmin": "0",
"zmax": "5",
"orientation": "free",
"avg_filled": "100",
"field_name": "success",
}
)
assert parsed == expected
# def test_parse_no_match_column_name():
# parsed = deepdog.results.parse_bayesrun_column("There's nothing here")
# assert parsed is None