test: refactor header parsing during measurement csv reading to add tests
This commit is contained in:
parent
e3d3625c92
commit
aea26dfa16
@ -125,6 +125,7 @@ def _reshape_dots_dict(dots_dict: typing.Sequence[typing.Dict]) -> typing.Dict:
|
||||
|
||||
|
||||
BINNED_HEADER_REGEX = r"\s*APSD_(?P<measurement_type>\w+)_(?P<dot_name>\w+)_(?P<summary_stat>mean|stdev)\s*"
|
||||
PAIR_MEASUREMENT_BINNED_HEADER_REGEX = r"\s*APSD_(?P<measurement_type>\w+)_(?P<dot_name1>\w+)_(?P<dot_name2>\w+)_(?P<summary_stat>mean|stdev)\s*"
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
@ -135,6 +136,25 @@ class ParsedBinHeader:
|
||||
summary_stat: str
|
||||
|
||||
|
||||
def _parse_bin_header(field: str) -> typing.Optional[ParsedBinHeader]:
|
||||
"""
|
||||
Parse a binned header field into a ParsedBinHeader object.
|
||||
|
||||
Return None if the field does not match the expected format (and thus no match).
|
||||
"""
|
||||
match = re.match(BINNED_HEADER_REGEX, field)
|
||||
if match is None:
|
||||
_logger.debug(f"Could not parse {field=}")
|
||||
return None
|
||||
match_groups = match.groupdict()
|
||||
return ParsedBinHeader(
|
||||
field,
|
||||
match_groups["measurement_type"],
|
||||
match_groups["dot_name"],
|
||||
match_groups["summary_stat"],
|
||||
)
|
||||
|
||||
|
||||
def read_bin_csv(
|
||||
csv_file: pathlib.Path,
|
||||
) -> typing.Tuple[str, typing.Dict[str, typing.Any]]:
|
||||
@ -161,17 +181,10 @@ def read_bin_csv(
|
||||
}
|
||||
|
||||
for field in remaining_fields:
|
||||
match = re.match(BINNED_HEADER_REGEX, field)
|
||||
if match is None:
|
||||
parsed_header = _parse_bin_header(field)
|
||||
if parsed_header is None:
|
||||
_logger.warning(f"Could not parse {field=}")
|
||||
continue
|
||||
match_groups = match.groupdict()
|
||||
parsed_header = ParsedBinHeader(
|
||||
field,
|
||||
match_groups["measurement_type"],
|
||||
match_groups["dot_name"],
|
||||
match_groups["summary_stat"],
|
||||
)
|
||||
parsed_headers[field] = parsed_header
|
||||
|
||||
if parsed_header.dot_name not in aggregated_dict:
|
||||
|
17
tests/__snapshots__/test_read_bin_csv.ambr
Normal file
17
tests/__snapshots__/test_read_bin_csv.ambr
Normal file
@ -0,0 +1,17 @@
|
||||
# serializer version: 1
|
||||
# name: test_parse_headers
|
||||
list([
|
||||
ParsedBinHeader(original_field='APSD_V_dot1_mean', measurement_type='V', dot_name='dot1', summary_stat='mean'),
|
||||
ParsedBinHeader(original_field='APSD_V_dot1_stdev', measurement_type='V', dot_name='dot1', summary_stat='stdev'),
|
||||
ParsedBinHeader(original_field='APSD_V_dot2_mean', measurement_type='V', dot_name='dot2', summary_stat='mean'),
|
||||
ParsedBinHeader(original_field='APSD_V_dot2_stdev', measurement_type='V', dot_name='dot2', summary_stat='stdev'),
|
||||
ParsedBinHeader(original_field='APSD_V_line_mean', measurement_type='V', dot_name='line', summary_stat='mean'),
|
||||
ParsedBinHeader(original_field='APSD_V_line_stdev', measurement_type='V', dot_name='line', summary_stat='stdev'),
|
||||
ParsedBinHeader(original_field='APSD_V_triangle1_mean', measurement_type='V', dot_name='triangle1', summary_stat='mean'),
|
||||
ParsedBinHeader(original_field='APSD_V_triangle1_stdev', measurement_type='V', dot_name='triangle1', summary_stat='stdev'),
|
||||
ParsedBinHeader(original_field='APSD_V_triangle2_mean', measurement_type='V', dot_name='triangle2', summary_stat='mean'),
|
||||
ParsedBinHeader(original_field='APSD_V_triangle2_stdev', measurement_type='V', dot_name='triangle2', summary_stat='stdev'),
|
||||
ParsedBinHeader(original_field='APSD_V_uprise1_mean', measurement_type='V', dot_name='uprise1', summary_stat='mean'),
|
||||
None,
|
||||
])
|
||||
# ---
|
40
tests/test_read_bin_csv.py
Normal file
40
tests/test_read_bin_csv.py
Normal file
@ -0,0 +1,40 @@
|
||||
import re
|
||||
import kalpaa.read_bin_csv
|
||||
|
||||
# import logging
|
||||
|
||||
# _logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_regex_matches():
|
||||
apsd_v_1 = "APSD_V_dot1_mean"
|
||||
|
||||
actual_match1 = re.match(kalpaa.read_bin_csv.BINNED_HEADER_REGEX, apsd_v_1)
|
||||
|
||||
# For reference, REGEX is currently: APSD_(?P<measurement_type>\w+)_(?P<dot_name>\w+)_(?P<summary_stat>mean|stdev)\s*
|
||||
assert actual_match1 is not None
|
||||
groups = actual_match1.groupdict()
|
||||
assert groups["measurement_type"] == "V"
|
||||
assert groups["dot_name"] == "dot1"
|
||||
assert groups["summary_stat"] == "mean"
|
||||
|
||||
|
||||
def test_parse_headers(snapshot):
|
||||
example_headers = [
|
||||
# using these headers from recent run: APSD_V_dot1_mean, APSD_V_dot1_stdev, APSD_V_dot2_mean, APSD_V_dot2_stdev, APSD_V_line_mean, APSD_V_line_stdev, APSD_V_triangle1_mean, APSD_V_triangle1_stdev, APSD_V_triangle2_mean, APSD_V_triangle2_stdev, APSD_V_uprise1_mean, APSD_V_uprise1_stdev, APSD_V_uprise2_mean, APSD_V_uprise2_stdev
|
||||
"APSD_V_dot1_mean",
|
||||
"APSD_V_dot1_stdev",
|
||||
"APSD_V_dot2_mean",
|
||||
"APSD_V_dot2_stdev",
|
||||
"APSD_V_line_mean",
|
||||
"APSD_V_line_stdev",
|
||||
"APSD_V_triangle1_mean",
|
||||
"APSD_V_triangle1_stdev",
|
||||
"APSD_V_triangle2_mean",
|
||||
"APSD_V_triangle2_stdev",
|
||||
"APSD_V_uprise1_mean",
|
||||
"This is not a valid header",
|
||||
]
|
||||
|
||||
actual_parsed = [kalpaa.read_bin_csv._parse_bin_header(h) for h in example_headers]
|
||||
assert actual_parsed == snapshot
|
Loading…
x
Reference in New Issue
Block a user