Source code for pmotools.pmo_builder.pmo_updater

#!/usr/bin/env python3

import pandas as pd
from pmotools.pmo_engine.pmo_processor import PMOProcessor
from datetime import datetime
import copy
import logging
from typing import Any

logger = logging.getLogger(__name__)


[docs]class PMOUpdater(object): @staticmethod def _check_if_date_yyyy_mm_or_yyyy_mm_dd(date_string: str) -> bool: """ Checks if a string is in YYYY-MM or YYYY-MM-DD format. :param date_string: the string to be checked """ try: datetime.strptime(date_string, "%Y-%m-%d") return True # Matches YYYY-MM-DD except ValueError: try: datetime.strptime(date_string, "%Y-%m") return True # Matches YYYY-MM except ValueError: return False # Does not match either format
[docs] @staticmethod def update_specimen_meta_with_traveler_info( pmo, traveler_info: pd.DataFrame, specimen_name_col: str = "specimen_name", travel_country_col: str = "travel_country", travel_start_col: str = "travel_start_date", travel_end_col: str = "travel_end_date", bed_net_usage_col: str = None, geo_admin1_col: str = None, geo_admin2_col: str = None, geo_admin3_col: str = None, lat_lon_col: str = None, replace_current_traveler_info: bool = False, ): """ Update a PMO's specimen's metadata with travel info :param pmo: the PMO to update, will directly modify this PMO :param traveler_info: the traveler info :param specimen_name_col: the specimen name column within the traveler input table :param travel_country_col: the column name containing the traveled to country :param travel_start_col: the column name containing the traveled start date, format YYYY-MM-DD or YYYY-MM :param travel_end_col: the column name containing the traveled end date, format YYYY-MM-DD or YYYY-MM :param bed_net_usage_col: (Optional) a number between 0 - 1 for rough frequency of bednet usage while traveling :param geo_admin1_col: (Optional) the column name containing the traveled to country admin level 1 info :param geo_admin2_col: (Optional) the column name containing the traveled to country admin level 2 info :param geo_admin3_col: (Optional) the column name containing the traveled to country admin level 3 info :param lat_lon_col: (Optional) the latitude and longitude column name containing the region traveled to latitude and longitude :param replace_current_traveler_info: whether to replace current travel info :return: a reference to the updated PMO """ required_cols = [ specimen_name_col, travel_country_col, travel_start_col, travel_end_col, ] if bed_net_usage_col is not None: required_cols.append(bed_net_usage_col) if geo_admin1_col is not None: required_cols.append(geo_admin1_col) if geo_admin2_col is not None: required_cols.append(geo_admin2_col) if geo_admin3_col is not None: required_cols.append(geo_admin3_col) if lat_lon_col is not None: required_cols.append(lat_lon_col) if not set(required_cols).issubset(traveler_info.columns): raise Exception( "missing traveler_info columns: " + ",".join(required_cols), " columns in table: " + ",".join(traveler_info.columns), ) specimen_names_in_pmo = set(PMOProcessor.get_specimen_names(pmo)) specimen_names_in_traveler_info = set( traveler_info[specimen_name_col].astype(str).tolist() ) # check to see if provided traveler info for a specimen that cannot be found in PMO missing_traveler_specs = specimen_names_in_traveler_info - specimen_names_in_pmo if missing_traveler_specs: raise ValueError( f"Provided traveler info for the following specimens but they are missing from the PMO: {sorted(missing_traveler_specs)}" ) traveler_info_records = traveler_info[required_cols].to_dict(orient="records") spec_indexs = PMOProcessor.get_index_key_of_specimen_names(pmo) # prep traveler info lists, clear the list if we are replacing or start an empty list to append to if none exist already for specimen_name in specimen_names_in_traveler_info: if ( replace_current_traveler_info or "travel_out_six_month" not in pmo["specimen_info"][spec_indexs[specimen_name]] ): pmo["specimen_info"][spec_indexs[specimen_name]][ "travel_out_six_month" ] = [] for travel_rec in traveler_info_records: specimen_name = str(travel_rec[specimen_name_col]) # Validate date formats for date_col in (travel_start_col, travel_end_col): val = travel_rec[date_col] if pd.isna(val): raise ValueError( f"Missing required date value in column '{date_col}' for specimen '{specimen_name}'" ) val_str = str(val) if not PMOUpdater._check_if_date_yyyy_mm_or_yyyy_mm_dd(val_str): raise ValueError( f"Invalid date format in '{date_col}' for specimen '{specimen_name}': '{val_str}'. " f"Expected YYYY-MM or YYYY-MM-DD" ) # add in travel_rec travel_rec.pop(specimen_name_col, None) pmo["specimen_info"][spec_indexs[specimen_name]][ "travel_out_six_month" ].append(travel_rec) return pmo
[docs] @staticmethod def merge_dicts_by_key( main_list: list[dict], update_list: list[dict], key_field: str, replace: bool = False, ignore_fields: list[str] | None = None, ) -> list[dict]: """ Merge two lists of dicts by a shared key field. The first list is treated as the main/base data source. The second list provides updates that are applied on top. Both input lists are left untouched (deep copies are used internally). Args: main_list: The primary list of dicts (source of truth). update_list: The list of dicts whose values will be merged in. key_field: The dict key used to match records across lists. replace: If True, existing values in main are overwritten by update values. If False, a conflict raises a ValueError. ignore_fields: Optional list of field names to skip entirely during the merge (they are never read from update_list). Returns: A new list of dicts with updates applied. Raises: ValueError: If either list contains duplicate values for key_field. KeyError: If any dict in either list is missing key_field. KeyError: If update_list contains a key_field value that does not exist in main_list. ValueError: If replace=False and an update would overwrite an existing field. """ ignore_fields = set(ignore_fields or []) # check to see if any of the input (the main or the update lists) have missing key_field def _check_missing_key(lst: list[dict], label: str) -> None: bad = [i for i, d in enumerate(lst) if key_field not in d] if bad: raise KeyError(f"{label} is missing '{key_field}' at index(es): {bad}") _check_missing_key(main_list, "main_list") _check_missing_key(update_list, "update_list") # check if there are duplicate key_field values def _check_duplicates(lst: list[dict], label: str) -> None: seen: set = set() dupes: set = set() for d in lst: val = d[key_field] (dupes if val in seen else seen).add(val) if dupes: raise ValueError( f"{label} contains duplicate '{key_field}' values: {sorted(dupes)}" ) _check_duplicates(main_list, "main_list") _check_duplicates(update_list, "update_list") # Build lookup from deep copies main_map: dict[Any, dict] = {d[key_field]: copy.deepcopy(d) for d in main_list} update_map: dict[Any, dict] = { d[key_field]: copy.deepcopy(d) for d in update_list } # update keys must exist in main extra_keys = set(update_map) - set(main_map) if extra_keys: raise KeyError( f"update_list contains '{key_field}' values not found in " f"main_list: {sorted(extra_keys)}" ) # Warn if any of the main keys absent from update, this way can update some of the values but # not necessary to update all of them missing_from_update = set(main_map) - set(update_map) if missing_from_update: logger.warning( "The following '%s' values are in main_list but not in " "update_list (skipping): %s", key_field, sorted(missing_from_update), ) # now merge for key, update_dict in update_map.items(): main_dict = main_map[key] for field, value in update_dict.items(): if field == key_field or field in ignore_fields: continue if field in main_dict: if not replace: raise ValueError( f"Field '{field}' already exists in record " f"'{key_field}={key}' and replace=False." ) main_dict[field] = value else: main_dict[field] = value return list(main_map.values())