#!/usr/bin/env python3
import os
import copy
import pandas as pd
from collections import defaultdict
[docs]class PMOProcessor:
"""
A class to extract info out of a loaded PMO object
"""
[docs] @staticmethod
def get_index_key_of_specimen_names(pmodata):
"""
Get key of specimen_name to index in pmodata["specimen_info"]
:param pmodata: the PMO to get indexes from
:return: a dictionary of indexes keyed by specimen_name
"""
ret = {}
for idx, specimen in enumerate(pmodata["specimen_info"]):
ret[specimen["specimen_name"]] = idx
return ret
[docs] @staticmethod
def get_index_key_of_library_sample_names(pmodata):
"""
Get key of library_sample_name to index in pmodata["library_sample_info"]
:param pmodata: the PMO to get indexes from
:return: a dictionary of indexes keyed by library_sample_name
"""
ret = {}
for idx, library in enumerate(pmodata["library_sample_info"]):
ret[library["library_sample_name"]] = idx
return ret
[docs] @staticmethod
def get_index_key_of_target_names(pmodata):
"""
Get key of target_name to index in pmodata["target_info"]
:param pmodata: the PMO to get indexes from
:return: a dictionary of indexes keyed by target_name
"""
ret = {}
for idx, target in enumerate(pmodata["target_info"]):
ret[target["target_name"]] = idx
return ret
[docs] @staticmethod
def get_index_key_of_panel_names(pmodata):
"""
Get key of panel_name to index in pmodata["panel_info"]
:param pmodata: the PMO to get indexes from
:return: a dictionary of indexes keyed by panel_name
"""
ret = {}
for idx, panel in enumerate(pmodata["panel_info"]):
ret[panel["panel_name"]] = idx
return ret
[docs] @staticmethod
def get_sorted_specimen_names(pmodata) -> list[str]:
"""
Get a name sorted list of specimen_names in pmodata["specimen_info"]
:param pmodata: the PMO to get specimen_names from
:return: a list of all specimen_names
"""
return sorted(PMOProcessor.get_index_key_of_specimen_names(pmodata).keys())
[docs] @staticmethod
def get_sorted_library_sample_names(pmodata) -> list[str]:
"""
Get a name sorted list of library_sample_names in pmodata["library_sample_info"]
:param pmodata: the PMO to get library_sample_names from
:return: a list of all library_sample_names
"""
return sorted(
PMOProcessor.get_index_key_of_library_sample_names(pmodata).keys()
)
[docs] @staticmethod
def get_sorted_target_names(pmodata) -> list[str]:
"""
Get a name sorted list of target_names in pmodata["target_info"]
:param pmodata: the PMO to get target_names from
:return: a list of all target_names
"""
return sorted(PMOProcessor.get_index_key_of_target_names(pmodata).keys())
[docs] @staticmethod
def get_sorted_panel_names(pmodata) -> list[str]:
"""
Get a name sorted list of panel_names in pmodata["panel_info"]
:param pmodata: the PMO to get panel_names from
:return: a list of all panel_names
"""
return sorted(PMOProcessor.get_index_key_of_panel_names(pmodata).keys())
[docs] @staticmethod
def get_specimen_names(pmodata) -> list[str]:
"""
Get a list of specimen_names in pmodata["specimen_info"] in the order they appear
:param pmodata: the PMO to get specimen_names from
:return: a list of all specimen_names
"""
ret = []
for specimen in pmodata["specimen_info"]:
ret.append(specimen["specimen_name"])
return ret
[docs] @staticmethod
def get_library_sample_names(pmodata) -> list[str]:
"""
Get a list of library_sample_names in pmodata["library_sample_info"] in the order they appear
:param pmodata: the PMO to get library_sample_names from
:return: a list of all library_sample_names
"""
ret = []
for library_sample in pmodata["library_sample_info"]:
ret.append(library_sample["library_sample_name"])
return ret
[docs] @staticmethod
def get_target_names(pmodata) -> list[str]:
"""
Get a list of target_names in pmodata["target_info"] in the order they appear
:param pmodata: the PMO to get target_names from
:return: a list of all target_names
"""
ret = []
for target in pmodata["target_info"]:
ret.append(target["target_name"])
return ret
[docs] @staticmethod
def get_panel_names(pmodata) -> list[str]:
"""
Get a list of panel_names in pmodata["panel_info"] in the order they appear
:param pmodata: the PMO to get panel_names from
:return: a list of all panel_names
"""
ret = []
for panel in pmodata["panel_info"]:
ret.append(panel["panel_name"])
return ret
[docs] @staticmethod
def get_index_key_of_target_in_representative_microhaplotypes(pmodata):
"""
Get key of target_name to index for the representative microhaplotypes for the target_name in pmodata["representative_microhaplotypes"]
:param pmodata: the PMO to get indexes from
:return: a dictionary of indexes keyed by target_name
"""
ret = {}
for idx, representative_microhaplotypes_for_target in enumerate(
pmodata["representative_microhaplotypes"]["targets"]
):
ret[
pmodata["target_info"][
representative_microhaplotypes_for_target["target_id"]
]["target_name"]
] = idx
return ret
[docs] @staticmethod
def get_index_of_specimen_names(pmodata, specimen_names: list[str]):
"""
Get index of specimen_name in pmodata["specimen_info"]
:param pmodata: the PMO to get indexes from
:param specimen_names: a list of specimen_names
:return: the index of specimen_names in pmodata["specimen_info"] returned in the same order as specimen_names
"""
specimen_key = PMOProcessor.get_index_key_of_specimen_names(pmodata)
return [specimen_key[name] for name in specimen_names]
[docs] @staticmethod
def get_index_of_library_sample_names(pmodata, library_sample_names: list[str]):
"""
Get index of library_sample_name in pmodata["library_sample_info"]
:param pmodata: the PMO to get indexes from
:param library_sample_names: a list of library_sample_names
:return: the index of library_sample_names in pmodata["library_sample_info"] returned in the same order as library_sample_names
"""
library_sample_key = PMOProcessor.get_index_key_of_library_sample_names(pmodata)
return [library_sample_key[name] for name in library_sample_names]
[docs] @staticmethod
def get_index_of_target_names(pmodata, target_names: list[str]):
"""
Get index of target_name in pmodata["target_info"]
:param pmodata: the PMO to get indexes from
:param target_names: a list of target_names
:return: the index of target_names in pmodata["target_info"] returned in the same order as target_names
"""
target_key = PMOProcessor.get_index_key_of_target_names(pmodata)
return [target_key[name] for name in target_names]
[docs] @staticmethod
def get_index_of_panel_names(pmodata, panel_names: list[str]):
"""
Get index of panel_name in pmodata["panel_info"]
:param pmodata: the PMO to get indexes from
:param panel_names: a list of panel_names
:return: the index of panel_names in pmodata["panel_info"] returned in the same order as panel_names
"""
panel_key = PMOProcessor.get_index_key_of_panel_names(pmodata)
return [panel_key[name] for name in panel_names]
[docs] @staticmethod
def get_index_of_target_in_representative_microhaplotypes(
pmodata, target_names: list[str]
):
"""
Get index of target_name in pmodata["representative_microhaplotypes"]["targets"]
:param pmodata: the PMO to get indexes from
:param target_names: a list of target_names
:return: the index of target_names in pmodata["representative_microhaplotypes"]["targets"] returned in the same order as target_names
"""
microhap_target_key = (
PMOProcessor.get_index_key_of_target_in_representative_microhaplotypes(
pmodata
)
)
return [microhap_target_key[name] for name in target_names]
[docs] @staticmethod
def get_library_ids_for_specimen_ids(pmodata, specimen_ids: set[int]):
"""
get a dictionary that lists the library_ids for a specimen_id
:param pmodata: the PMO to get indexes from
:param specimen_ids: a set of specimen_ids
:return: a dictionary that lists the library_ids for a specimen_id
"""
ret = defaultdict(set)
# check to make sure the supplied specimens actually exist within the data
warnings = []
for specimen_id in specimen_ids:
if specimen_id > len(pmodata["specimen_info"]):
warnings.append(
f"{specimen_id} id is beyond the length of specimen_info: "
+ str(len(pmodata["specimen_info"]))
)
if len(warnings) > 0:
raise Exception("\n".join(warnings))
for library_sample_id, library_sample in enumerate(
pmodata["library_sample_info"]
):
if library_sample["specimen_id"] in specimen_ids:
ret[library_sample["specimen_id"]].add(library_sample_id)
return ret
[docs] @staticmethod
def count_targets_per_library_sample(
pmodata, min_reads: float = 0.0
) -> pd.DataFrame:
"""
Count the number of targets per library sample.
:param pmodata: the loaded PMO
:param min_reads: a minimum number of reads for a target in order for it to be counted
:return: a pandas DataFrame, columns = [bioinformatics_run_id, library_sample_name, target_number]
"""
records = []
library_sample_info = pmodata["library_sample_info"]
detected_microhaplotypes_count = 0
for result in pmodata["detected_microhaplotypes"]:
run_id = (
f"detected_microhaplotypes_count_idx_{detected_microhaplotypes_count}"
)
if "bioinformatics_run_id" in result:
run_id = result["bioinformatics_run_id"]
detected_microhaplotypes_count += 1
for sample in result["library_samples"]:
sample_id = sample["library_sample_id"]
sample_name = library_sample_info[sample_id]["library_sample_name"]
target_count = sum(
sum(hap["reads"] for hap in target["mhaps"]) >= min_reads
for target in sample["target_results"]
)
record = {
"bioinformatics_run_id": run_id,
"library_sample_name": sample_name,
"target_number": target_count,
}
records.append(record)
return pd.DataFrame.from_records(records)
[docs] @staticmethod
def count_library_samples_per_target(
pmodata, min_reads: float = 0.0, collapse_across_runs: bool = False
) -> pd.DataFrame:
"""
Count the number of library samples per target, optionally collapsing across bioinformatics runs.
:param pmodata: the loaded PMO
:param min_reads: the minimum number of reads for a target in order for it to be counted
:param collapse_across_runs: if True, sums across bioinformatics_run_id per target
:return: a pandas dataframe:
- if ``collapse_across_runs=False``: columns are ``bioinformatics_run_id``, ``target_name``, ``sample_count``
- if ``collapse_across_runs=True``: columns are ``target_name``, ``sample_count``
"""
records = []
microhap_targets = pmodata["representative_microhaplotypes"]["targets"]
target_info = pmodata["target_info"]
detected_microhaplotypes_count = 0
for result in pmodata["detected_microhaplotypes"]:
run_id = (
f"detected_microhaplotypes_count_idx_{detected_microhaplotypes_count}"
)
if "bioinformatics_run_id" in result:
run_id = result["bioinformatics_run_id"]
detected_microhaplotypes_count += 1
target_sample_counts = defaultdict(int)
for sample in result["library_samples"]:
for target_result in sample["target_results"]:
if sum(hap["reads"] for hap in target_result["mhaps"]) >= min_reads:
mhaps_target_id = target_result["mhaps_target_id"]
target_id = microhap_targets[mhaps_target_id]["target_id"]
target_name = target_info[target_id]["target_name"]
target_sample_counts[target_name] += 1
for target_name, count in target_sample_counts.items():
records.append(
{
"bioinformatics_run_id": run_id,
"target_name": target_name,
"sample_count": count,
}
)
ret = pd.DataFrame.from_records(records)
if collapse_across_runs:
ret = ret.groupby("target_name", as_index=False)["sample_count"].sum()
ret = ret[["target_name", "sample_count"]]
return ret.sort_values(by="target_name").reset_index(drop=True)
return ret.sort_values(by=["bioinformatics_run_id", "target_name"]).reset_index(
drop=True
)
[docs] @staticmethod
def count_targets_per_panel(pmodata) -> pd.DataFrame:
"""
Count the targets per panel.
:param pmodata: the pmo to count from
:return: counts for each panel
"""
# how many targets in each panel
panels = []
target_count = []
for panel in pmodata["panel_info"]:
panel_targets = []
panels.append(panel["panel_name"])
for reaction in panel["reactions"]:
panel_targets.extend(reaction["panel_targets"])
target_count.append(len(set(panel_targets)))
return pd.DataFrame(
data={"panel_name": panels, "panel_target_count": target_count}
)
[docs] @staticmethod
def count_specimen_by_field_value(pmodata, meta_fields: list[str]) -> pd.DataFrame:
"""
Count the values of the meta fields. If a specimen doesn't have a field, it is marked as 'NA'.
Groups are combinations of all given meta fields.
:param pmodata: the pmo to count from
:param meta_fields: a list of meta fields to count
:type meta_fields: list[str]
:return: counts for all sub-field groups, with metadata
"""
total_specimens = len(pmodata["specimen_info"])
field_counts = defaultdict(int)
for specimen in pmodata["specimen_info"]:
key = tuple(str(specimen.get(field, "NA")) for field in meta_fields)
field_counts[key] += 1
records = []
for key, count in field_counts.items():
record = dict(zip(meta_fields, key))
record.update(
{
"specimens_count": count,
"specimens_freq": count / total_specimens,
"total_specimen_count": total_specimens,
}
)
records.append(record)
return (
pd.DataFrame.from_records(records)
.sort_values(by=meta_fields)
.reset_index(drop=True)
)
[docs] @staticmethod
def filter_pmo_by_library_sample_ids(pmodata, library_sample_ids: set[int]):
"""
Extract out of a load PMO the data associated with select library_sample_ids
:param pmodata:the loaded PMO
:param library_sample_ids: the library_sample_ids to extract the info for
:return: a new PMO with only the data associated with the supplied library_sample_ids
"""
# create a new pmo out
# pmo_name, panel_info, sequencing_info, taramp_bioinformatics_info will stay the same
# specimen_info, library_sample_info, detected_microhaplotypes, representative_microhaplotype_sequences will be
# created based on the supplied library ids
# check to make sure the supplied specimens actually exist within the data
warnings = []
for library_sample_id in library_sample_ids:
if library_sample_id > len(pmodata["library_sample_info"]):
warnings.append(
f"{library_sample_id} id is beyond the length of library_sample_info: "
+ str(len(pmodata["library_sample_info"]))
)
if len(warnings) > 0:
raise Exception("\n".join(warnings))
pmo_out = {
"pmo_header": copy.deepcopy(pmodata["pmo_header"]),
"panel_info": copy.deepcopy(pmodata["panel_info"]),
"target_info": copy.deepcopy(pmodata["target_info"]),
"representative_microhaplotypes": copy.deepcopy(
pmodata["representative_microhaplotypes"]
),
"specimen_info": [],
"library_sample_info": [],
"detected_microhaplotypes": [],
}
if "sequencing_info" in pmodata:
pmo_out["sequencing_info"] = copy.deepcopy(pmodata["sequencing_info"])
if "project_info" in pmodata:
pmo_out["project_info"] = copy.deepcopy(pmodata["project_info"])
if "bioinformatics_methods_info" in pmodata:
pmo_out["bioinformatics_methods_info"] = copy.deepcopy(
pmodata["bioinformatics_methods_info"]
)
if "bioinformatics_run_info" in pmodata:
pmo_out["bioinformatics_run_info"] = copy.deepcopy(
pmodata["bioinformatics_run_info"]
)
if "targeted_genomes" in pmodata:
pmo_out["targeted_genomes"] = copy.deepcopy(pmodata["targeted_genomes"])
if "read_counts_by_stage" in pmodata:
pmo_out["read_counts_by_stage"] = []
# need to update read_counts_by_stage, library_sample_info, specimen_info, detected_microhaplotypes
# specimen_info
# first get the specimen_ids needed and then build
specimen_ids = set()
specimen_id_index_key = {}
for library_sample_id in library_sample_ids:
specimen_ids.add(
pmodata["library_sample_info"][library_sample_id]["specimen_id"]
)
for specimen_id in specimen_ids:
specimen_id_index_key[specimen_id] = len(pmo_out["specimen_info"])
pmo_out["specimen_info"].append(
copy.deepcopy(pmodata["specimen_info"][specimen_id])
)
# library_sample_info
library_id_index_key = {}
for library_sample_id in library_sample_ids:
library_id_index_key[library_sample_id] = len(
pmo_out["library_sample_info"]
)
pmo_out["library_sample_info"].append(
copy.deepcopy(pmodata["library_sample_info"][library_sample_id])
)
# update specimen_id
pmo_out["library_sample_info"][len(pmo_out["library_sample_info"]) - 1][
"specimen_id"
] = specimen_id_index_key[
pmodata["library_sample_info"][library_sample_id]["specimen_id"]
]
# detected_microhaplotypes
for detected_microhaplotypes in pmodata["detected_microhaplotypes"]:
new_detected_microhaplotypes = {
"library_samples": [],
}
if "bioinformatics_run_id" in detected_microhaplotypes:
new_detected_microhaplotypes[
"bioinformatics_run_id"
] = detected_microhaplotypes["bioinformatics_run_id"]
for sample in detected_microhaplotypes["library_samples"]:
if sample["library_sample_id"] in library_sample_ids:
new_detected_microhaplotypes["library_samples"].append(
copy.deepcopy(sample)
)
# update library_sample_id
new_detected_microhaplotypes["library_samples"][
len(new_detected_microhaplotypes["library_samples"]) - 1
]["library_sample_id"] = library_id_index_key[
sample["library_sample_id"]
]
pmo_out["detected_microhaplotypes"].append(new_detected_microhaplotypes)
# read_counts_by_stage
if "read_counts_by_stage" in pmodata:
for read_count in pmodata["read_counts_by_stage"]:
new_read_count = {
"read_counts_by_library_sample_by_stage": [],
}
if "bioinformatics_run_id" in read_count:
new_read_count["bioinformatics_run_id"] = read_count[
"bioinformatics_run_id"
]
for sample in read_count["read_counts_by_library_sample_by_stage"]:
if sample["library_sample_id"] in library_sample_ids:
new_read_count["read_counts_by_library_sample_by_stage"].append(
copy.deepcopy(sample)
)
# update library_sample_id
new_read_count["read_counts_by_library_sample_by_stage"][
len(
new_read_count["read_counts_by_library_sample_by_stage"]
)
- 1
]["library_sample_id"] = library_id_index_key[
sample["library_sample_id"]
]
return pmo_out
[docs] @staticmethod
def filter_pmo_by_library_sample_names(pmodata, library_sample_names: set[str]):
"""
Filters pmodata by library sample names
:param pmodata: the pmodata object
:param library_sample_names: set of library sample names, will be converted into indexes to extract out
:return: filtered pmodata object containing only the indexes
"""
library_sample_names_list = sorted(list(library_sample_names))
library_sample_ids_list = PMOProcessor.get_index_of_library_sample_names(
pmodata, library_sample_names_list
)
return PMOProcessor.filter_pmo_by_library_sample_ids(
pmodata, set(library_sample_ids_list)
)
[docs] @staticmethod
def filter_pmo_by_specimen_ids(pmodata, specimen_ids: set[int]):
"""
Extract out of a load PMO the data associated with select specimen_ids
:param pmodata:the loaded PMO
:param specimen_ids: the specimen_ids to extract the info for
:return: a new PMO with only the data associated with the supplied specimen_ids
"""
# check to make sure the supplied specimens actually exist within the data
warnings = []
for specimen_id in specimen_ids:
if specimen_id > len(pmodata["specimen_info"]):
warnings.append(
f"{specimen_id} id is beyond the length of specimen_info: "
+ str(len(pmodata["specimen_info"]))
)
if len(warnings) > 0:
raise Exception("\n".join(warnings))
library_sample_ids_for_specimen_ids = (
PMOProcessor.get_library_ids_for_specimen_ids(pmodata, specimen_ids)
)
all_library_sample_ids = {
exp_samp
for spec in library_sample_ids_for_specimen_ids.values()
for exp_samp in spec
}
return PMOProcessor.filter_pmo_by_library_sample_ids(
pmodata, all_library_sample_ids
)
[docs] @staticmethod
def filter_pmo_by_specimen_names(pmodata, specimen_names: set[str]):
"""
Extract out of a loaded PMO the data associated with select specimen_names
:param pmodata:the loaded PMO
:param specimen_names: the specimen_names to extract the info for
:return: a new PMO with only the data associated with the supplied specimen_names
"""
specimen_names_list = sorted(list(specimen_names))
specimen_ids_list = PMOProcessor.get_index_of_specimen_names(
pmodata, specimen_names_list
)
return PMOProcessor.filter_pmo_by_specimen_ids(pmodata, set(specimen_ids_list))
[docs] @staticmethod
def filter_pmo_by_target_ids(pmodata, target_ids: set[int]):
"""
Extract out data from the PMO for only select target IDs
:param pmodata: the pmo to extract data from
:param target_ids: the target_ids to extract
:return: a new pmo with the data for only the targets supplied
"""
# create a new pmo out
# check to make sure the supplied specimens actually exist within the data
warnings = []
for target_id in target_ids:
if target_id >= len(pmodata["target_info"]):
warnings.append(
f"{target_id} out of range of target_info, length is {len(pmodata['target_info'])}"
)
target_ids_in_representative_microhaplotypes = []
for target in pmodata["representative_microhaplotypes"]["targets"]:
target_ids_in_representative_microhaplotypes.append(target["target_id"])
for target_id in target_ids:
if target_id not in target_ids_in_representative_microhaplotypes:
warnings.append(
f'{target_id} not in pmodata["representative_microhaplotypes"]'
)
if len(warnings) > 0:
raise Exception("\n".join(warnings))
pmo_out = {
"pmo_header": copy.deepcopy(pmodata["pmo_header"]),
"specimen_info": copy.deepcopy(pmodata["specimen_info"]),
"library_sample_info": copy.deepcopy(pmodata["library_sample_info"]),
"target_info": [],
}
if "sequencing_info" in pmodata:
pmo_out["sequencing_info"] = copy.deepcopy(pmodata["sequencing_info"])
if "project_info" in pmodata:
pmo_out["project_info"] = copy.deepcopy(pmodata["project_info"])
if "bioinformatics_methods_info" in pmodata:
pmo_out["bioinformatics_methods_info"] = copy.deepcopy(
pmodata["bioinformatics_methods_info"]
)
if "bioinformatics_run_info" in pmodata:
pmo_out["bioinformatics_run_info"] = copy.deepcopy(
pmodata["bioinformatics_run_info"]
)
if "targeted_genomes" in pmodata:
pmo_out["targeted_genomes"] = copy.deepcopy(pmodata["targeted_genomes"])
# function will update target_info, panel_info, representative_microhaplotypes, detected_microhaplotypes, read_counts_by_stage based
# on target_ids selecting for first update representative_microhaplotypes, detected_microhaplotypes, read_counts_by_stage
# then update target_info, panel_info
# then update the target_ids
# target_info
target_info_index_key = {}
for target_info_id, target_info in enumerate(pmodata["target_info"]):
if target_info_id in target_ids:
target_info_index_key[target_info_id] = len(pmo_out["target_info"])
pmo_out["target_info"].append(copy.deepcopy(target_info))
# panel_info
pmo_out["panel_info"] = []
for panel_info in pmodata["panel_info"]:
new_panel_info = {"panel_name": panel_info["panel_name"], "reactions": []}
for reaction in panel_info["reactions"]:
new_reaction = {
"reaction_name": reaction["reaction_name"],
"panel_targets": [],
}
for panel_target_id in reaction["panel_targets"]:
if panel_target_id in target_ids:
# add new updated target_id index
new_reaction["panel_targets"].append(
target_info_index_key[panel_target_id]
)
if len(new_reaction["panel_targets"]) > 0:
new_panel_info["reactions"].append(new_reaction)
pmo_out["panel_info"].append(new_panel_info)
# representative_microhaplotypes
pmo_out["representative_microhaplotypes"] = {"targets": []}
# key=old_mhaps_target_id, value = new_mhaps_target_id
mhaps_target_id_new_key = {}
for microhap_info_index, microhap_info in enumerate(
pmodata["representative_microhaplotypes"]["targets"]
):
if microhap_info["target_id"] in target_ids:
mhaps_target_id_new_key[microhap_info_index] = len(
pmo_out["representative_microhaplotypes"]["targets"]
)
# update new target_id index
microhap_info["target_id"] = target_info_index_key[
microhap_info["target_id"]
]
pmo_out["representative_microhaplotypes"]["targets"].append(
copy.deepcopy(microhap_info)
)
# representative_microhaplotypes
pmo_out["detected_microhaplotypes"] = []
for detected_microhaplotypes in pmodata["detected_microhaplotypes"]:
new_detected_microhaplotypes = {
"library_samples": [],
}
if "bioinformatics_run_id" in detected_microhaplotypes:
new_detected_microhaplotypes[
"bioinformatics_run_id"
] = detected_microhaplotypes["bioinformatics_run_id"]
for sample in detected_microhaplotypes["library_samples"]:
new_sample = {
"library_sample_id": sample["library_sample_id"],
"target_results": [],
}
for target in sample["target_results"]:
if target["mhaps_target_id"] in mhaps_target_id_new_key:
# update with new mhaps_target_id id
target["mhaps_target_id"] = mhaps_target_id_new_key[
target["mhaps_target_id"]
]
new_sample["target_results"].append(copy.deepcopy(target))
new_detected_microhaplotypes["library_samples"].append(new_sample)
pmo_out["detected_microhaplotypes"].append(new_detected_microhaplotypes)
# read_counts_by_stage
if "read_counts_by_stage" in pmodata:
pmo_out["read_counts_by_stage"] = []
for read_counts_by_bioid in pmodata["read_counts_by_stage"]:
new_read_counts_by_bioid = {
"read_counts_by_library_sample_by_stage": [],
}
if "bioinformatics_run_id" in read_counts_by_bioid:
new_read_counts_by_bioid[
"bioinformatics_run_id"
] = read_counts_by_bioid["bioinformatics_run_id"]
for sample in read_counts_by_bioid[
"read_counts_by_library_sample_by_stage"
]:
new_samples = {
"library_sample_id": sample["library_sample_id"],
"total_raw_count": sample["total_raw_count"],
}
if "read_counts_for_targets" in sample:
new_samples["read_counts_for_targets"] = []
for target in sample["read_counts_for_targets"]:
if target["target_id"] in target_ids:
# update with new target_id index
target["target_id"] = target_info_index_key[
target["target_id"]
]
new_samples["read_counts_for_targets"].append(
copy.deepcopy(target)
)
new_read_counts_by_bioid[
"read_counts_by_library_sample_by_stage"
].append(new_samples)
pmo_out["read_counts_by_stage"].append(new_read_counts_by_bioid)
return pmo_out
[docs] @staticmethod
def filter_pmo_by_target_names(pmodata, target_names: set[str]):
"""
Extract out data from the PMO for only select target names
:param pmodata: the pmo to extract data from
:param target_names: the target_names to extract
:return: a new pmo with the data for only the targets supplied
"""
target_names_list = sorted(list(target_names))
target_ids_list = PMOProcessor.get_index_of_target_names(
pmodata, target_names_list
)
return PMOProcessor.filter_pmo_by_target_ids(pmodata, set(target_ids_list))