Source code for pmotools.pmo_engine.pmo_reader

#!/usr/bin/env python3
import copy
import datetime
import json
import gzip
import os
import sys
from collections import defaultdict
from pmotools import __version__ as __pmotools_version__


[docs]class PMOReader: """ A class for reading in PMO from files """
[docs] @staticmethod def read_in_pmo(fnp: str | os.PathLike[str]): """ Read in a PMO file, can either be compressed(.gz) or uncompressed :param fnp: the file name path of the PMO file to read in :return: a PMO like object """ if "STDIN" == fnp: pmo_data = json.load(sys.stdin) else: if fnp.endswith(".gz"): with gzip.open(fnp) as f: pmo_data = json.load(f) else: with open(fnp) as f: pmo_data = json.load(f) return pmo_data
[docs] @staticmethod def read_in_pmos(fnps: list[str] | list[os.PathLike[str]]): """ Read in a PMO file, can either be compressed(.gz) or uncompressed :param fnps: the file name path of the PMO file to read in :return: a list of PMO like object """ ret = [] for fnp in fnps: ret.append(PMOReader.read_in_pmo(fnp)) return ret
[docs] @staticmethod def combine_multiple_pmos(pmos: list[dict]): """ Combine multiple PMOs into one pmo :param pmos: a list of PMO objects :return: a combined PMO """ if len(pmos) <= 1: raise Exception( "Only supplied " + str(len(pmos)) + " but multiple PMO objects were expected" ) # create new pmo out pmo_out = {} # create new pmo_header # currently losing all info about previous header info, # consider coming up with something in standard that might preserve this info if needed pmo_out["pmo_header"] = { "pmo_version": __pmotools_version__, "creation_date": datetime.datetime.now().strftime("%Y-%m-%d"), "generation_method": { "program_name": "pmotools-python.PMOReader.combine_multiple_pmos", "program_version": __pmotools_version__, }, } # combine targeted_genomes fields if present # key: genome name + _ + genome_version, val: index targeted_genomes_out_index_key = {} if "targeted_genomes" in pmos[0]: pmo_out["targeted_genomes"] = copy.deepcopy(pmos[0]["targeted_genomes"]) for genome_info_index, genome in enumerate(pmos[0]["targeted_genomes"]): targeted_genomes_out_index_key[ genome["name"] + "_" + genome["genome_version"] ] = genome_info_index # key1 pmo_index, key2 old_index, val new_index targeted_genomes_old_index_key = defaultdict(dict) for pmo_index, pmo in enumerate(pmos[1:], start=1): if "targeted_genomes" in pmo: for genome_index, genome in enumerate(pmo["targeted_genomes"]): genome_id = genome["name"] + "_" + genome["genome_version"] if genome_id in targeted_genomes_out_index_key: targeted_genomes_old_index_key[pmo_index][ genome_index ] = targeted_genomes_out_index_key[genome_id] else: if "targeted_genomes" not in pmo_out: pmo_out["targeted_genomes"] = [] new_index = len(pmo_out["targeted_genomes"]) pmo_out["targeted_genomes"].append(copy.deepcopy(genome)) targeted_genomes_out_index_key[genome_id] = new_index targeted_genomes_old_index_key[pmo_index][ genome_index ] = new_index # combine target_info fields pmo_out["target_info"] = copy.deepcopy(pmos[0]["target_info"]) # key: target_name, val: index target_info_out_index_key = {} for target_info_index, target_info in enumerate(pmos[0]["target_info"]): target_info_out_index_key[target_info["target_name"]] = target_info_index # key1 pmo_index, key2 old_index, val new_index target_info_old_index_key = defaultdict(dict) for pmo_index, pmo in enumerate(pmos[1:], start=1): for target_index, target in enumerate(pmo["target_info"]): if target["target_name"] in target_info_out_index_key: target_info_old_index_key[pmo_index][ target_index ] = target_info_out_index_key[target["target_name"]] else: new_index = len(pmo_out["target_info"]) target_copy = copy.deepcopy(target) # update genome_id if adding new target if ( "targeted_genomes" in pmo_out and len(pmo_out["targeted_genomes"]) > 1 ): if "insert_location" in target_copy: # update genome_id target_copy["insert_location"][ "genome_id" ] = targeted_genomes_old_index_key[pmo_index][ target_copy["insert_location"]["genome_id"] ] if "location" in target_copy["forward_primer"]: # update genome_id target_copy["forward_primer"]["location"][ "genome_id" ] = targeted_genomes_old_index_key[pmo_index][ target_copy["forward_primer"]["location"]["genome_id"] ] if "location" in target_copy["reverse_primer"]: # update genome_id target_copy["reverse_primer"]["location"][ "genome_id" ] = targeted_genomes_old_index_key[pmo_index][ target_copy["reverse_primer"]["location"]["genome_id"] ] pmo_out["target_info"].append(target_copy) target_info_out_index_key[target_copy["target_name"]] = new_index target_info_old_index_key[pmo_index][target_index] = new_index # combine panel_info # todo, more extensive testing than just panel name, make sure reactions and targets are the same pmo_out["panel_info"] = copy.deepcopy(pmos[0]["panel_info"]) # key: panel_name, val: index panel_info_out_index_key = {} for panel_info_index, panel_info in enumerate(pmos[0]["panel_info"]): panel_info_out_index_key[panel_info["panel_name"]] = panel_info_index # key1 pmo_index, key2 old_index, val new_index panel_info_old_index_key = defaultdict(dict) for pmo_index, pmo in enumerate(pmos[1:], start=1): for panel_index, panel in enumerate(pmo["panel_info"]): panel_copy = copy.deepcopy(panel) if panel_copy["panel_name"] in panel_info_out_index_key: panel_info_old_index_key[pmo_index][ panel_index ] = panel_info_out_index_key[panel_copy["panel_name"]] else: new_index = len(pmo_out["panel_info"]) # update target indexes to make sure reaction points to the right target indexes for reaction in panel_copy["reactions"]: for target_id_reaction in range(len(reaction["panel_targets"])): reaction["panel_targets"][ target_id_reaction ] = target_info_old_index_key[pmo_index][ reaction["panel_targets"][target_id_reaction] ] pmo_out["panel_info"].append(panel_copy) panel_info_out_index_key[panel_copy["panel_name"]] = new_index panel_info_old_index_key[pmo_index][panel_index] = new_index # combine sequencing_info # really shouldn't be possible to have the same sequencing_info in different pmos so # just concatenate sequencing infos. Only way this could have happened is if files were split into different # pmos and then rejoined but even if we concatenate sequencing_info of the same, they will still properly # have the right info per library # key1 pmo_index, key2 old_index, val new_index sequencing_info_old_index_key = defaultdict(dict) if "sequencing_info" in pmos[0]: pmo_out["sequencing_info"] = copy.deepcopy(pmos[0]["sequencing_info"]) for pmo_index, pmo in enumerate(pmos[1:], start=1): if "sequencing_info" not in pmo: continue for sequencing_info_index, sequencing_info in enumerate( pmo["sequencing_info"] ): if "sequencing_info" not in pmo_out: pmo_out["sequencing_info"] = [] new_index = len(pmo_out["sequencing_info"]) pmo_out["sequencing_info"].append(copy.deepcopy(sequencing_info)) sequencing_info_old_index_key[pmo_index][ sequencing_info_index ] = new_index # combine project_info # could be possible to be combining PMOs across one project so check if project already exists project_info_old_index_key = defaultdict(dict) if "project_info" in pmos[0]: pmo_out["project_info"] = copy.deepcopy(pmos[0]["project_info"]) for pmo_index, pmo in enumerate(pmos[1:], start=1): if "project_info" not in pmo: continue for project_info_index, project_info in enumerate(pmo["project_info"]): # check to see if the project already exists found_project_info = False if "project_info" in pmo_out: for current_project_id, current_project_info in enumerate( pmo_out["project_info"] ): if ( current_project_info["project_name"] == project_info["project_name"] ): if ( current_project_info["project_description"] != project_info["project_description"] ): raise Exception( "Project description mismatch for project_name: " + project_info["project_name"] ) else: project_info_old_index_key[pmo_index][ project_info_index ] = current_project_id found_project_info = True if not found_project_info: if "project_info" not in pmo_out: pmo_out["project_info"] = [] new_index = len(pmo_out["project_info"]) pmo_out["project_info"].append(copy.deepcopy(project_info)) project_info_old_index_key[pmo_index][ project_info_index ] = new_index # combine specimen_info and library_sample_info # update project_id pmo_out["specimen_info"] = copy.deepcopy(pmos[0]["specimen_info"]) specimen_names = [] specimen_index_key = {} duplicate_specimen_names = [] for specimen in pmo_out["specimen_info"]: if specimen["specimen_name"] in specimen_names: duplicate_specimen_names.append(specimen["specimen_name"]) specimen_index_key[specimen["specimen_name"]] = len(specimen_names) specimen_names.append(specimen["specimen_name"]) # key1 pmo_index, key2 old_index, val new_index specimen_info_old_index_key = defaultdict(dict) for pmo_index, pmo in enumerate(pmos[1:], start=1): for specimen_info_index, specimen_info in enumerate(pmo["specimen_info"]): # checkin for duplicates if specimen_info["specimen_name"] in specimen_names: # if specimen is exactly the same, then no issues # @todo allow merging of info and as long as the meta present in both are the same then it should be fine if ( specimen_info != pmo_out["specimen_info"][ specimen_index_key[specimen_info["specimen_name"]] ] ): duplicate_specimen_names.append(specimen_info["specimen_name"]) specimen_info_old_index_key[pmo_index][ specimen_info_index ] = specimen_index_key[specimen_info["specimen_name"]] else: # update key for the already present id specimen_info_old_index_key[pmo_index][ specimen_info_index ] = specimen_index_key[specimen_info["specimen_name"]] else: specimen_index_key[specimen_info["specimen_name"]] = len( specimen_names ) specimen_names.append(specimen_info["specimen_name"]) new_index = len(pmo_out["specimen_info"]) specimen_info_copy = copy.deepcopy(specimen_info) # update project_id if "project_id" in specimen_info_copy: specimen_info_copy["project_id"] = project_info_old_index_key[ pmo_index ][specimen_info_copy["project_id"]] pmo_out["specimen_info"].append(specimen_info_copy) specimen_info_old_index_key[pmo_index][ specimen_info_index ] = new_index ## library_sample_info pmo_out["library_sample_info"] = copy.deepcopy(pmos[0]["library_sample_info"]) # key1 pmo_index, key2 old_index, val new_index library_sample_info_old_index_key = defaultdict(dict) duplicate_library_sample_names = [] library_sample_names = [] # have to add the library_sample_names already added in the first PMO for library_sample in pmo_out["library_sample_info"]: if library_sample["library_sample_name"] in library_sample_names: duplicate_library_sample_names.append( library_sample["library_sample_name"] ) library_sample_names.append(library_sample["library_sample_name"]) for pmo_index, pmo in enumerate(pmos[1:], start=1): for library_sample_info_index, library_sample_info in enumerate( pmo["library_sample_info"] ): # checkin for duplicates if library_sample_info["library_sample_name"] in library_sample_names: duplicate_library_sample_names.append( library_sample_info["library_sample_name"] ) library_sample_names.append(library_sample_info["library_sample_name"]) # update indexes library_sample_info_copy = copy.deepcopy(library_sample_info) library_sample_info_copy["specimen_id"] = specimen_info_old_index_key[ pmo_index ][library_sample_info_copy["specimen_id"]] library_sample_info_copy["panel_id"] = panel_info_old_index_key[ pmo_index ][library_sample_info_copy["panel_id"]] if "sequencing_info_id" in library_sample_info_copy: library_sample_info_copy[ "sequencing_info_id" ] = sequencing_info_old_index_key[pmo_index][ library_sample_info_copy["sequencing_info_id"] ] # append to the out library_sample_info_copy after getting new index new_index = len(pmo_out["library_sample_info"]) pmo_out["library_sample_info"].append(library_sample_info_copy) library_sample_info_old_index_key[pmo_index][ library_sample_info_index ] = new_index warnings = [] if len(duplicate_specimen_names) > 0: warnings.append( "Duplicate specimen names were supplied for the following specimens: " + ",".join(duplicate_specimen_names) ) if len(duplicate_library_sample_names) > 0: warnings.append( "Duplicate library sample names were supplied for the following librarys: " + ",".join(duplicate_library_sample_names) ) if len(warnings) > 0: raise Exception("\n".join(warnings)) # update bioinformatics_methods_info # the different bioinformatics_methods_info might be the same but there's no easy way to perfectly match up right now if "bioinformatics_methods_info" in pmos[0]: pmo_out["bioinformatics_methods_info"] = copy.deepcopy( pmos[0]["bioinformatics_methods_info"] ) # key1 pmo_index, key2 old_index, val new_index bioinformatics_methods_info_old_index_key = defaultdict(dict) for pmo_index, pmo in enumerate(pmos[1:], start=1): if "bioinformatics_methods_info" not in pmo: continue for ( bioinformatics_methods_info_index, bioinformatics_methods_info, ) in enumerate(pmo["bioinformatics_methods_info"]): if "bioinformatics_methods_info" not in pmo_out: pmo_out["bioinformatics_methods_info"] = [] new_index = len(pmo_out["bioinformatics_methods_info"]) pmo_out["bioinformatics_methods_info"].append( copy.deepcopy(bioinformatics_methods_info) ) bioinformatics_methods_info_old_index_key[pmo_index][ bioinformatics_methods_info_index ] = new_index # update bioinformatics_run_info if "bioinformatics_run_info" in pmos[0]: pmo_out["bioinformatics_run_info"] = copy.deepcopy( pmos[0]["bioinformatics_run_info"] ) # key1 pmo_index, key2 old_index, val new_index bioinformatics_run_info_old_index_key = defaultdict(dict) for pmo_index, pmo in enumerate(pmos[1:], start=1): if "bioinformatics_run_info" not in pmo: continue for bioinformatics_run_info_index, bioinformatics_run_info in enumerate( pmo["bioinformatics_run_info"] ): bioinformatics_run_info_copy = copy.deepcopy(bioinformatics_run_info) bioinformatics_run_info_copy[ "bioinformatics_methods_id" ] = bioinformatics_methods_info_old_index_key[pmo_index][ bioinformatics_run_info_index ] if "bioinformatics_run_info" not in pmo_out: pmo_out["bioinformatics_run_info"] = [] new_index = len(pmo_out["bioinformatics_run_info"]) pmo_out["bioinformatics_run_info"].append(bioinformatics_run_info_copy) bioinformatics_run_info_old_index_key[pmo_index][ bioinformatics_run_info_index ] = new_index # update representative_microhaplotypes pmo_out["representative_microhaplotypes"] = pmos[0][ "representative_microhaplotypes" ] # key: target_name (not index), val: index in representative_microhaplotypes representative_microhaplotypes_out_index_key = {} for ( representative_microhaplotypes_index, representative_microhaplotypes, ) in enumerate(pmo_out["representative_microhaplotypes"]["targets"]): representative_microhaplotypes_out_index_key[ pmo_out["target_info"][representative_microhaplotypes["target_id"]][ "target_name" ] ] = representative_microhaplotypes_index # key1: pmo_index, key2: old_mhaps_target_id, val: new_mhaps_target_id representative_microhaplotypes_old_index_key = defaultdict(dict) # key1: pmo_index, key2: old_mhaps_target_id, key3: old_mhap_id, val: new_mhap_id representative_microhaplotypes_hmap_for_target_index_old_index_key = ( defaultdict(lambda: defaultdict(dict)) ) # @todo need to check for mhap_location and update the genome_id if not the same genome for pmo_index, pmo in enumerate(pmos[1:], start=1): for ( representative_microhaplotypes_index, representative_microhaplotypes, ) in enumerate(pmo["representative_microhaplotypes"]["targets"]): if ( pmo["target_info"][representative_microhaplotypes["target_id"]][ "target_name" ] in representative_microhaplotypes_out_index_key ): representative_microhaplotypes_old_index_key[pmo_index][ representative_microhaplotypes_index ] = representative_microhaplotypes_out_index_key[ pmo["target_info"][representative_microhaplotypes["target_id"]][ "target_name" ] ] # now update per microhaplotype for adding_microhap_index, adding_microhap in enumerate( representative_microhaplotypes["microhaplotypes"] ): found = False # print(pmo_out["representative_microhaplotypes"]["targets"][representative_microhaplotypes_out_index_key[pmo["target_info"][representative_microhaplotypes["target_id"]]["target_name"]]]["microhaplotypes"]) for ( already_have_microhap_index, already_have_microhap, ) in enumerate( pmo_out["representative_microhaplotypes"]["targets"][ representative_microhaplotypes_out_index_key[ pmo["target_info"][ representative_microhaplotypes["target_id"] ]["target_name"] ] ]["microhaplotypes"] ): # print(already_have_microhap) if adding_microhap["seq"] == already_have_microhap["seq"]: representative_microhaplotypes_hmap_for_target_index_old_index_key[ pmo_index ][representative_microhaplotypes_index][ adding_microhap_index ] = already_have_microhap_index found = True break if not found: new_index = len( pmo_out["representative_microhaplotypes"]["targets"][ representative_microhaplotypes_out_index_key[ pmo["target_info"][ representative_microhaplotypes["target_id"] ]["target_name"] ] ]["microhaplotypes"] ) pmo_out["representative_microhaplotypes"]["targets"][ representative_microhaplotypes_out_index_key[ pmo["target_info"][ representative_microhaplotypes["target_id"] ]["target_name"] ] ]["microhaplotypes"].append(copy.deepcopy(adding_microhap)) representative_microhaplotypes_hmap_for_target_index_old_index_key[ pmo_index ][representative_microhaplotypes_index][ adding_microhap_index ] = new_index else: # if not currently in representative_microhaplotypes, update keys and look-ups new_mhaps_target_index = len( pmo_out["representative_microhaplotypes"]["targets"] ) pmo_out["representative_microhaplotypes"]["targets"].append( copy.deepcopy(representative_microhaplotypes) ) representative_microhaplotypes_old_index_key[pmo_index][ representative_microhaplotypes_index ] = new_mhaps_target_index representative_microhaplotypes_out_index_key[ pmo["target_info"][representative_microhaplotypes["target_id"]][ "target_name" ] ] = new_mhaps_target_index for adding_microhap_index, adding_microhap in enumerate( representative_microhaplotypes["microhaplotypes"] ): representative_microhaplotypes_hmap_for_target_index_old_index_key[ pmo_index ][representative_microhaplotypes_index][ adding_microhap_index ] = adding_microhap_index # print(representative_microhaplotypes_hmap_for_target_index_old_index_key) # update detected_microhaplotypes pmo_out["detected_microhaplotypes"] = copy.deepcopy( pmos[0]["detected_microhaplotypes"] ) for pmo_index, pmo in enumerate(pmos[1:], start=1): # update indexes for detected_microhaplotypes in pmo["detected_microhaplotypes"]: detected_microhaplotypes_copy = copy.deepcopy(detected_microhaplotypes) for library_sample in detected_microhaplotypes_copy["library_samples"]: for target_result in library_sample["target_results"]: for hap in target_result["mhaps"]: hap[ "mhap_id" ] = representative_microhaplotypes_hmap_for_target_index_old_index_key[ pmo_index ][target_result["mhaps_target_id"]][hap["mhap_id"]] target_result[ "mhaps_target_id" ] = representative_microhaplotypes_old_index_key[pmo_index][ target_result["mhaps_target_id"] ] library_sample[ "library_sample_id" ] = library_sample_info_old_index_key[pmo_index][ library_sample["library_sample_id"] ] if "bioinformatics_run_id" in detected_microhaplotypes_copy: detected_microhaplotypes_copy[ "bioinformatics_run_id" ] = bioinformatics_run_info_old_index_key[pmo_index][ detected_microhaplotypes_copy["bioinformatics_run_id"] ] # append after the indexes have been updated pmo_out["detected_microhaplotypes"].append( detected_microhaplotypes_copy ) pmo_indexes_with_read_counts_by_stage = [] for pmo_index, pmo in enumerate(pmos): if "read_counts_by_stage" in pmo: pmo_indexes_with_read_counts_by_stage.append(pmo_index) if 0 not in pmo_indexes_with_read_counts_by_stage: pmo_out["read_counts_by_stage"] = [] for pmo_index in pmo_indexes_with_read_counts_by_stage: # if read_counts_by_stage is in pmos[0] then no indexes need to be updated if 0 == pmo_index: pmo_out["read_counts_by_stage"] = copy.deepcopy( pmos[pmo_index]["read_counts_by_stage"] ) else: # update index and then append to out for read_counts_by_stage in pmos[pmo_index]["read_counts_by_stage"]: read_counts_by_stage_copy = copy.deepcopy(read_counts_by_stage) for ( read_counts_by_library_sample_by_stage ) in read_counts_by_stage_copy[ "read_counts_by_library_sample_by_stage" ]: if ( "read_counts_for_targets" in read_counts_by_library_sample_by_stage ): for ( read_counts_for_target ) in read_counts_by_library_sample_by_stage[ "read_counts_for_targets" ]: read_counts_for_target[ "target_id" ] = target_info_old_index_key[pmo_index][ read_counts_for_target["target_id"] ] read_counts_by_library_sample_by_stage[ "library_sample_id" ] = library_sample_info_old_index_key[pmo_index][ read_counts_by_library_sample_by_stage["library_sample_id"] ] if "bioinformatics_run_id" in read_counts_by_stage_copy: read_counts_by_stage_copy[ "bioinformatics_run_id" ] = bioinformatics_run_info_old_index_key[pmo_index][ read_counts_by_stage_copy["bioinformatics_run_id"] ] pmo_out["read_counts_by_stage"].append(read_counts_by_stage_copy) return pmo_out