Source code for pmotools.pmo_builder.merge_to_pmo

#!/usr/bin/env python3
from datetime import date
import numpy as np
from pmotools import __version__ as __pmotools_version__
from pmotools import __schema_version__

from pmotools.pmo_builder.mhap_table_to_pmo import (
    create_minimum_library_specimen_dict_from_mhap_table,
)
import warnings
import copy


def _convert_numpy_scalars(obj):
    """Recursively convert numpy scalar types to native Python types."""
    if isinstance(obj, dict):
        return {key: _convert_numpy_scalars(value) for key, value in obj.items()}
    if isinstance(obj, list):
        return [_convert_numpy_scalars(value) for value in obj]
    if isinstance(obj, np.generic):
        return obj.item()
    return obj


[docs]def merge_to_pmo( mhap_info: dict, panel_target_info: dict, specimen_info: list | None = None, library_sample_info: list | None = None, sequencing_info: list | None = None, bioinfo_method_info: list | None = None, bioinfo_run_info: list | None = None, project_info: list | None = None, read_counts_by_stage_info: list | None = None, ): """ Merge components into PMO, replacing names with indices. The required input are ``mhap_info`` (must have fields:detected_microhaplotypes and representative_microhaplotypes) and ``panel_target_info`` (must have fields: target_info and panel_info). If no ``library_sample_info`` or ``specimen_info`` are provided, they will be automatically generated from the detected_microhaplotypes. Is also possible to provide only ``specimen_info`` or ``library_sample_info`` but their names must match up with the detected_microhaplotypes names. Args: mhap_info (dict): microhaplotypes within this project, both detected and representative; must contain ``detected_microhaplotypes`` and ``representative_microhaplotypes``. panel_target_info (dict): panel and target information; must contain ``target_info`` and ``panel_info``. specimen_info (list, optional): all the specimens within this project. library_sample_info (list, optional): library samples within this project. sequencing_info (list, optional): sequencing info for this project. bioinfo_method_info (list, optional): bioinformatics pipeline/methods. bioinfo_run_info (list, optional): runtime info for the pipeline. project_info (list, optional): info about projects stored in this PMO. read_counts_by_stage_info (list, optional): read counts by stage. Returns: str: a JSON-formatted PMO string. """ # Deep copy every provided input up front so the caller's objects are never # mutated by anything below (name replacement, appends, numpy conversion, # etc.). All work happens on these local copies. mhap_info = copy.deepcopy(mhap_info) panel_target_info = copy.deepcopy(panel_target_info) specimen_info = copy.deepcopy(specimen_info) if specimen_info is not None else None library_sample_info = ( copy.deepcopy(library_sample_info) if library_sample_info is not None else None ) sequencing_info = ( copy.deepcopy(sequencing_info) if sequencing_info is not None else None ) bioinfo_method_info = ( copy.deepcopy(bioinfo_method_info) if bioinfo_method_info is not None else None ) bioinfo_run_info = ( copy.deepcopy(bioinfo_run_info) if bioinfo_run_info is not None else None ) project_info = copy.deepcopy(project_info) if project_info is not None else None read_counts_by_stage_info = ( copy.deepcopy(read_counts_by_stage_info) if read_counts_by_stage_info is not None else None ) missing_fields = [] if "panel_info" not in panel_target_info: missing_fields.append("panel_info") if "target_info" not in panel_target_info: missing_fields.append("target_info") if "representative_microhaplotypes" not in mhap_info: missing_fields.append("representative_microhaplotypes") if "detected_microhaplotypes" not in mhap_info: missing_fields.append("detected_microhaplotypes") if missing_fields: raise ValueError( f"Missing required fields for panel_target_info or mhap_info: {missing_fields}" ) if bioinfo_run_info is not None and bioinfo_method_info is None: raise ValueError( "bioinfo_method_info must be provided if bioinfo_run_info is provided" ) if specimen_info is not None and library_sample_info is None: spec_and_lib_info = create_minimum_library_specimen_dict_from_mhap_table( mhap_info["detected_microhaplotypes"], panel_target_info["panel_info"][0]["panel_name"], ) library_sample_info = spec_and_lib_info["library_sample_info"] # Validate that library_sample_info and specimen_info have matching names library_sample_names = { item["library_sample_name"] for item in library_sample_info } specimen_names = {item["specimen_name"] for item in specimen_info} # Check for names in library_sample_info that are not in specimen_info # Check for names in specimen_info that are not in library_sample_info missing_in_specimen = library_sample_names - specimen_names missing_in_library = specimen_names - library_sample_names for missing_lib_name in missing_in_specimen: specimen_info.append({"specimen_name": missing_lib_name}) if missing_in_specimen: warnings.warn( f"library_sample_names found in the detected_microhaplotypes that don't have corresponding supplied specimen_names: {sorted(missing_in_specimen)}, will be added to specimen_info with no meta" ) if missing_in_library: warnings.warn( f"specimen_name were supplied that don't have corresponding library_sample_names in detected_microhaplotypes: {sorted(missing_in_library)}" ) # names match, so the supplied specimen_info names will match up with the names in library_sample_info if specimen_info is None and library_sample_info is None: if len(panel_target_info["panel_info"]) > 1: raise Exception( "If multiple panels are included in the panel information,specimen_info and library_sample_info must also be provided to indicate which panel was used for each specimen. Otherwise, provide only a single panel. Panels found: " + str(len(panel_target_info["panel_info"])) ) spec_and_lib_info = create_minimum_library_specimen_dict_from_mhap_table( mhap_info["detected_microhaplotypes"], panel_target_info["panel_info"][0]["panel_name"], ) specimen_info = spec_and_lib_info["specimen_info"] library_sample_info = spec_and_lib_info["library_sample_info"] elif specimen_info is None: # if giving only library sample info can default to the specimen being # just the library_sample_names for library_sample in library_sample_info: library_sample["specimen_name"] = library_sample["library_sample_name"] specimen_info = [ {"specimen_name": library_sample["library_sample_name"]} for library_sample in library_sample_info ] panel_target_info = _convert_numpy_scalars(panel_target_info) mhap_info = _convert_numpy_scalars(mhap_info) # optional if sequencing_info is not None: sequencing_info = _convert_numpy_scalars(sequencing_info) if bioinfo_method_info is not None: bioinfo_method_info = _convert_numpy_scalars(bioinfo_method_info) if bioinfo_run_info is not None: bioinfo_run_info = _convert_numpy_scalars(bioinfo_run_info) if project_info is not None: project_info = _convert_numpy_scalars(project_info) # Handle read_counts_by_stage_info if provided if read_counts_by_stage_info is not None: read_counts_by_stage_info = [ _convert_numpy_scalars(d) for d in read_counts_by_stage_info ] specimen_info = _convert_numpy_scalars(specimen_info) library_sample_info = _convert_numpy_scalars(library_sample_info) _replace_names_with_IDs( specimen_info=specimen_info, project_info=project_info, library_sample_info=library_sample_info, sequencing_info=sequencing_info, panel_target_info=panel_target_info, mhap_info=mhap_info, bioinfo_run_info=bioinfo_run_info, read_counts_by_stage_info=read_counts_by_stage_info, ) # Build PMO pmo_header = _generate_pmo_header() pmo = ( { "pmo_header": pmo_header, "library_sample_info": library_sample_info, "specimen_info": specimen_info, } | panel_target_info | mhap_info ) if sequencing_info: pmo["sequencing_info"] = sequencing_info if bioinfo_method_info: pmo["bioinformatics_methods_info"] = bioinfo_method_info if bioinfo_run_info: pmo["bioinformatics_run_info"] = bioinfo_run_info if project_info: pmo["project_info"] = project_info # Add read_counts_by_stage_info if provided if read_counts_by_stage_info is not None: pmo["read_counts_by_stage"] = read_counts_by_stage_info return _convert_numpy_scalars(pmo)
def _make_lookup(dict, key): lookup = {entry[key]: idx for idx, entry in enumerate(dict)} return lookup def _replace_key_with_id(target_list, reference_list, name_key, id_key, lookup=None): """ Replaces name_key in target_list with id_key, based on lookup from reference_list. """ if not lookup: lookup = _make_lookup(reference_list, name_key) unique_names = set() for entry in target_list: name = str(entry.pop(name_key)) unique_names.add(name) entry[id_key] = lookup.get(name) missing_items = list(unique_names - lookup.keys()) return missing_items def _generate_pmo_header( pmotools_version=__pmotools_version__, pmo_schema_version=__schema_version__ ): today = date.today().isoformat() pmo_header = { "pmo_version": pmo_schema_version, "creation_date": today, "generation_method": { "program_name": "pmotools-python", "program_version": pmotools_version, }, } return pmo_header def _report_missing_IDs( missing_projects, missing_sequencing, missing_specimen, missing_panels, missing_targets, missing_bioinfo_runs, missing_libs, missing_read_counts_bioinfo_runs, missing_read_counts_libs, missing_read_counts_targets, ): if any( [ missing_projects, missing_sequencing, missing_specimen, missing_panels, missing_targets, missing_bioinfo_runs, missing_libs, missing_read_counts_bioinfo_runs, missing_read_counts_libs, missing_read_counts_targets, ] ): error_message = ( "The following fields were found in one table and not another:\n" ) if missing_projects: error_message += f"Project names in Specimen Info not in Project Info: {missing_projects}\n" if missing_sequencing: error_message += f"Sequencing names in Library Sample Info not in Sequencing Info: {missing_sequencing}\n" if missing_specimen: error_message += f"Specimen names in Library Sample Info not in Specimen Info: {missing_specimen}\n" if missing_panels: error_message += f"Panel names in Library Sample Info not in Panel Info: {missing_panels}\n" if missing_targets: error_message += f"Target names in Representative Microhaplotypes not in Target Info: {missing_targets}\n" if missing_bioinfo_runs: error_message += f"Bioinformatics run names in Detected Microhaplotypes not in Bioinformatic Run Info: {missing_bioinfo_runs}\n" if missing_libs: error_message += f"Library Sample names in Detected Microhaplotypes not in Library Sample Info: {missing_libs}\n" if missing_read_counts_bioinfo_runs: error_message += f"Bioinformatics run names in Read Counts by Stage not in Bioinformatic Run Info: {missing_read_counts_bioinfo_runs}\n" if missing_read_counts_libs: error_message += f"Library Sample names in Read Counts by Stage not in Library Sample Info: {missing_read_counts_libs}\n" if missing_read_counts_targets: error_message += f"Target names in Read Counts by Stage not in Target Info: {missing_read_counts_targets}\n" raise ValueError(error_message) def _replace_names_with_IDs( specimen_info: list, panel_target_info: dict, mhap_info: dict, project_info: list | None = None, library_sample_info: list | None = None, sequencing_info: list | None = None, bioinfo_run_info: list | None = None, read_counts_by_stage_info: list | None = None, ): # SPECIMEN INFO # replace name with project ID any_missing_project_names = False missing_projects = [] if project_info is not None: for spec in specimen_info: if "project_name" not in spec: any_missing_project_names = True break if not any_missing_project_names: missing_projects = _replace_key_with_id( specimen_info, project_info, "project_name", "project_id" ) # LIBRARY SAMPLE INFO # replace with sequencing_info_id, specimen_id, panel_id missing_specimen = _replace_key_with_id( library_sample_info, specimen_info, "specimen_name", "specimen_id" ) missing_panels = _replace_key_with_id( library_sample_info, panel_target_info["panel_info"], "panel_name", "panel_id", ) any_missing_sequence_info_names = False missing_sequencing = [] if sequencing_info is not None: for lib_sample in library_sample_info: if "sequencing_info_name" not in lib_sample: any_missing_sequence_info_names = True break if not any_missing_sequence_info_names: missing_sequencing = _replace_key_with_id( library_sample_info, sequencing_info, "sequencing_info_name", "sequencing_info_id", ) # REP MHAPS # replace target_name with ID missing_targets = _replace_key_with_id( mhap_info["representative_microhaplotypes"]["targets"], panel_target_info["target_info"], "target_name", "target_id", ) # DETECTED MHAPS # Replace library_sample_name and bioinformatics_run_name any_missing_bioinfo_run_names = False missing_bioinfo_runs = [] if bioinfo_run_info is not None: for detected in mhap_info["detected_microhaplotypes"]: if "bioinformatics_run_name" not in detected: any_missing_bioinfo_run_names = True break if not any_missing_bioinfo_run_names: missing_bioinfo_runs = _replace_key_with_id( mhap_info["detected_microhaplotypes"], bioinfo_run_info, "bioinformatics_run_name", "bioinformatics_run_id", ) lib_sample_lookup = _make_lookup(library_sample_info, "library_sample_name") missing_libs = [] for detected in mhap_info["detected_microhaplotypes"]: missing_libs += _replace_key_with_id( detected["library_samples"], library_sample_info, "library_sample_name", "library_sample_id", lookup=lib_sample_lookup, ) # READ COUNTS BY STAGE # Replace bioinformatics_run_name and library_sample_name if provided missing_read_counts_bioinfo_runs = [] missing_read_counts_libs = [] missing_read_counts_targets = [] target_lookup = _make_lookup(panel_target_info["target_info"], "target_name") any_read_counts_by_stage_missing_bioinfo_run_names = False if read_counts_by_stage_info is not None: for read_counts_run in read_counts_by_stage_info: if "bioinformatics_run_name" not in read_counts_run: any_read_counts_by_stage_missing_bioinfo_run_names = True break if not any_read_counts_by_stage_missing_bioinfo_run_names: if bioinfo_run_info is not None: # Replace bioinformatics_run_name with bioinformatics_run_id missing_read_counts_bioinfo_runs = _replace_key_with_id( read_counts_by_stage_info, bioinfo_run_info, "bioinformatics_run_name", "bioinformatics_run_id", ) else: missing_read_counts_bioinfo_runs = [] # Replace library_sample_name with library_sample_id in each run and map targets for read_counts_run in read_counts_by_stage_info: missing_read_counts_libs += _replace_key_with_id( read_counts_run["read_counts_by_library_sample_by_stage"], library_sample_info, "library_sample_name", "library_sample_id", lookup=lib_sample_lookup, ) for library_entry in read_counts_run.get( "read_counts_by_library_sample_by_stage", [] ): target_entries = library_entry.get("read_counts_for_targets") or [] for target_entry in target_entries: target_name = target_entry.pop("target_name", None) if target_name is None: continue if target_name in target_lookup: target_entry["target_id"] = target_lookup[target_name] else: missing_read_counts_targets.append(target_name) merging_warnings = [] if any_missing_project_names and project_info: merging_warnings.append( "project_info provided but there are specimens missing project_name field" ) if any_missing_sequence_info_names and sequencing_info: merging_warnings.append( "sequencing_info provided but there are library samples missing sequencing_info_name field" ) if any_missing_bioinfo_run_names and bioinfo_run_info: merging_warnings.append( "bioinformatics_run_info provided but there are detected microhaplotypes missing bioinformatics_run_name field" ) if any_read_counts_by_stage_missing_bioinfo_run_names and bioinfo_run_info: merging_warnings.append( "bioinformatics_run_info provided but there are read counts by stage missing bioinformatics_run_name field" ) if merging_warnings: warnings_text = "\n".join(merging_warnings) raise Exception( f"The following warnings were encountered during merging:\n{warnings_text}" ) # If any names were missing from reference tables error _report_missing_IDs( missing_projects, missing_sequencing, missing_specimen, missing_panels, missing_targets, missing_bioinfo_runs, missing_libs, missing_read_counts_bioinfo_runs, missing_read_counts_libs, missing_read_counts_targets, )