Source code for pmotools.scripts.pmo_to_tables.extract_allele_table
#!/usr/bin/env python3
import argparse
import json
import os
import sys
from pmotools.pmo_engine.pmo_reader import PMOReader
from pmotools.utils.small_utils import Utils
from pmotools.pmo_engine.pmo_checker import PMOChecker
from pmotools.pmo_engine.pmo_processor import PMOProcessor
from pmotools.pmo_engine.pmo_exporter import PMOExporter
from pmotools import __version__ as __pmotools_version__
[docs]def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="pmotools-python extract_allele_table",
description="Extract allele tables for tools like dcifer or moire",
epilog="""
Examples:
%(prog)s --file input.pmo --output output.tsv
%(prog)s --file input.pmo.gz --output output.tsv --delim comma
%(prog)s --file input.pmo --output output.tsv --allele_freqs_output freqs.tsv --overwrite
%(prog)s --file input.pmo --output output.tsv --microhap_fields reads,mhap_id
%(prog)s --file input.pmo --output output.tsv --specimen_info_meta_fields collection_date,collection_country
""",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--file", type=str, required=True, help="PMO file")
parser.add_argument(
"--jsonschema",
default=os.path.join(
os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
),
"schemas/",
f"portable_microhaplotype_object_v{__pmotools_version__}.schema.json",
),
type=str,
required=False,
help="the jsonschema to check the PMO against",
)
parser.add_argument(
"--delim",
default="tab",
type=str,
required=False,
help="the delimiter of the input text file, examples tab,comma",
)
parser.add_argument(
"--output", type=str, required=True, help="Output allele table file name path"
)
parser.add_argument(
"--overwrite", action="store_true", help="If output file exists, overwrite it"
)
parser.add_argument(
"--skip_validation", action="store_true", help="skip validation of PMO"
)
parser.add_argument(
"--allele_freqs_output",
type=str,
help="if also writing out allele frequencies, write to this file",
)
parser.add_argument(
"--specimen_info_meta_fields",
type=str,
required=False,
help="Meta Fields if any to include from the specimen table",
)
parser.add_argument(
"--library_sample_info_meta_fields",
type=str,
required=False,
help="Meta Fields if any to include from the library sample table",
)
parser.add_argument(
"--microhap_fields",
type=str,
required=False,
help="additional optional fields from the detected microhaplotype object to include",
)
parser.add_argument(
"--representative_haps_fields",
type=str,
required=False,
help="additional optional fields from the detected representative object to include",
)
parser.add_argument(
"--default_base_col_names",
type=str,
required=False,
default="library_sample_name,target_name,mhap_id",
help="default base column names, must be length 3",
)
return parser
[docs]def extract_for_allele_table():
args = parse_args_extract_for_allele_table()
compressed_output = (
"." not in args.output and args.file.endswith(".gz")
) or args.output.endswith(".gz")
output_delim, output_extension = Utils.process_delimiter_and_output_extension(
args.delim, gzip=compressed_output
)
allele_per_sample_table_out_fnp = (
args.output
if "STDOUT" == args.output
else Utils.appendStrAsNeeded(args.output, output_extension)
)
Utils.inputOutputFileCheck(
args.file, allele_per_sample_table_out_fnp, args.overwrite
)
allele_freq_output = ""
if args.allele_freqs_output is not None:
allele_freq_output = Utils.appendStrAsNeeded(
args.allele_freqs_output, output_extension
)
Utils.inputOutputFileCheck(args.file, allele_freq_output, args.overwrite)
pmodata = PMOReader.read_in_pmo(args.file)
if not args.skip_validation:
with open(args.jsonschema, "r") as f:
schema_dict = json.load(f)
checker = PMOChecker(schema_dict)
# make sure PMO is valid
checker.validate_pmo_json(pmodata)
sys.stderr.write("PMO is valid\n")
if args.specimen_info_meta_fields is not None:
args.specimen_info_meta_fields = Utils.parse_delimited_input_or_file(
args.specimen_info_meta_fields, ","
)
if args.microhap_fields is not None:
args.microhap_fields = Utils.parse_delimited_input_or_file(
args.microhap_fields, ","
)
if args.library_sample_info_meta_fields is not None:
args.library_sample_info_meta_fields = Utils.parse_delimited_input_or_file(
args.library_sample_info_meta_fields, ","
)
if args.representative_haps_fields is not None:
args.representative_haps_fields = Utils.parse_delimited_input_or_file(
args.representative_haps_fields, ","
)
allele_table = PMOExporter.extract_alleles_per_sample_table(
pmodata,
additional_specimen_info_fields=args.specimen_info_meta_fields,
additional_library_sample_info_fields=args.library_sample_info_meta_fields,
additional_microhap_fields=args.microhap_fields,
additional_representative_info_fields=args.representative_haps_fields,
default_base_col_names=args.default_base_col_names.split(","),
)
with Utils.smart_open_write(allele_per_sample_table_out_fnp) as f:
allele_table.to_csv(f, sep=output_delim, index=False)
if args.allele_freqs_output is not None:
allele_freqs = PMOProcessor.extract_allele_counts_freq_from_pmo(pmodata)
with Utils.smart_open_write(allele_freq_output) as f:
allele_freqs.to_csv(f, sep=output_delim, index=False)
if __name__ == "__main__":
extract_for_allele_table()