#!/usr/bin/env python3
import argparse
import json
from collections import defaultdict
import pandas as pd
from pmotools.utils.small_utils import Utils
[docs]def get_parser_terra_amp_output_to_json() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="pmotools-python terra_amp_output_to_json",
description="Convert Terra output to JSON sequence table",
)
parser.add_argument("--file", type=str, required=True, help="Input excel file path")
parser.add_argument(
"--gt_sheet",
type=str,
default="gt",
required=False,
help="The gt sheet to convert, if none provided will default to gt",
)
parser.add_argument(
"--asv_table_sheet",
type=str,
default="asv_table",
required=False,
help="The asv_table sheet to convert, if none provided will default to asv_table",
)
parser.add_argument(
"--asv_seqs_sheet",
type=str,
default="asv_seqs",
required=False,
help="The asv_seqs sheet to convert, if none provided will default to asv_seqs",
)
# parser.add_argument('--index_col_name', type=str, required=False, help='by default output is a list, if an index column name is supplied it will be a dict with this column as index')
parser.add_argument(
"--output", type=str, required=True, help="Output json file path"
)
parser.add_argument(
"--overwrite", action="store_true", help="If output file exists, overwrite it"
)
return parser
[docs]def parse_args_terra_amp_output_to_json():
parser = get_parser_terra_amp_output_to_json()
return parser.parse_args()
[docs]def terra_amp_output_to_json():
args = parse_args_terra_amp_output_to_json()
args.output = Utils.appendStrAsNeeded(args.output, ".json")
# check if input file exists and if output file exists check if --overwrite flag is set
Utils.inputOutputFileCheckFromArgParse(args)
# print(asv_table_look_up)
representative_microhaplotype_id = "terra"
tar_amp_bioinformatics_info_id = "terra"
asv_seqs = pd.read_excel(
args.file, sheet_name=args.asv_seqs_sheet, index_col="asv_id"
)
asv_seq_table_look_up = dict()
for index, row in asv_seqs.iterrows():
asv_seq_table_look_up[index] = row["asv_seq"]
representative_ref_seqs = {
"representative_microhaplotype_id": representative_microhaplotype_id,
"targets": {},
}
asv_table = pd.read_excel(
args.file, sheet_name=args.asv_table_sheet, index_col="hapid"
)
asv_table_look_up = dict()
asv_table_look_up_by_target_by_cigar = defaultdict(lambda: defaultdict(str))
for index, row in asv_table.iterrows():
asv_table_look_up.update(
{
index: {
"asv_id": index,
"CIGAR": row["CIGAR"],
"CIGAR_masked": row["CIGAR_masked"],
"Amplicon": row["Amplicon"],
}
}
)
asv_table_look_up_by_target_by_cigar[row["Amplicon"]][
row["CIGAR_masked"]
] = index
if row["Amplicon"] not in representative_ref_seqs["targets"]:
representative_ref_seqs["targets"][row["Amplicon"]] = {
"target_id": row["Amplicon"],
"seqs": {},
}
CIGAR_masked = row["CIGAR_masked"]
if isinstance(row["CIGAR_masked"], float) and pd.isna(row["CIGAR_masked"]):
CIGAR_masked = "."
representative_ref_seqs["targets"][row["Amplicon"]]["seqs"].update(
{
index: {
"microhaplotype_id": index,
"seq": asv_seq_table_look_up[index],
"CIGAR": row["CIGAR"],
"CIGAR_masked": CIGAR_masked,
}
}
)
microhaplotypes_detected = {
"representative_microhaplotype_id": representative_microhaplotype_id,
"tar_amp_bioinformatics_info_id": tar_amp_bioinformatics_info_id,
"experiment_samples": {},
}
gt_contents = pd.read_excel(
args.file, sheet_name=args.gt_sheet, index_col="Sample_id"
)
warnings = []
for index, row in gt_contents.iterrows():
sample_data = {"experiment_sample_id": index, "target_results": {}}
for amplicon_id in row.keys():
amplicon_data = []
# print(row[amplicon_id])
# print(type(row[amplicon_id]))
# print(str(row[amplicon_id]))
# # print(row[amplicon_id].isna())
# print(isinstance(row[amplicon_id], float))
# print(isinstance(row[amplicon_id], float) and pd.isna(row[amplicon_id]))
if not (isinstance(row[amplicon_id], float) and pd.isna(row[amplicon_id])):
current_haps = row[amplicon_id].split("_")
for current_hap in current_haps:
current_hap_split = current_hap.split(":")
if len(current_hap_split) != 2:
warnings.append(
"sample: "
+ str(index)
+ " target: "
+ amplicon_id
+ "failed to process "
+ current_hap
+ " expected two values separated by :"
)
else:
read_cnt = float(current_hap_split[1])
cigar = current_hap_split[0]
if (
cigar
not in asv_table_look_up_by_target_by_cigar[amplicon_id]
):
warnings.append(
"sample: "
+ str(index)
+ " target: "
+ amplicon_id
+ "failed to find "
+ cigar
+ " in cigar look up"
)
else:
amplicon_data.append(
{
"microhaplotype_id": asv_table_look_up_by_target_by_cigar[
amplicon_id
][cigar],
"read_count": read_cnt,
}
)
if len(amplicon_data) > 0:
sample_data["target_results"].update(
{
amplicon_id: {
"target_id": amplicon_id,
"microhaplotypes": amplicon_data,
}
}
)
microhaplotypes_detected["experiment_samples"].update({index: sample_data})
if len(warnings) > 0:
raise Exception("\n".join(warnings))
output_json = {
"microhaplotypes_detected": {
tar_amp_bioinformatics_info_id: microhaplotypes_detected
},
"representative_microhaplotype_sequences": {
representative_microhaplotype_id: representative_ref_seqs
},
}
json.dump(output_json, open(args.output, "w"), indent=4)
if __name__ == "__main__":
terra_amp_output_to_json()