Source code for pmotools.pmo_builder.mhap_table_to_pmo

#!/usr/bin/env python3
import pandas as pd
import warnings

from ..pmo_builder.json_convert_utils import check_additional_columns_exist


[docs]def mhap_table_to_pmo( microhaplotype_table: pd.DataFrame, bioinformatics_run_name: str | None = None, library_sample_name_col: str = "library_sample_name", target_name_col: str = "target_name", seq_col: str = "seq", reads_col: str = "reads", genome_id: int = 0, umis_col: str | None = None, chrom_col: str | None = None, start_col: str | None = None, end_col: str | None = None, ref_seq_col: str | None = None, strand_col: str | None = None, alt_annotations_col: str | None = None, masking_seq_start_col: str | None = None, masking_seq_segment_size_col: str | None = None, masking_replacement_size_col: str | None = None, masking_delim: str = ",", microhaplotype_name_col: str | None = None, pseudocigar_col: str | None = None, quality_col: str | None = None, additional_representative_mhap_cols: list | None = None, additional_mhap_detected_cols: list | None = None, ): """ Convert a dataframe of microhaplotype calls into a dictionary containing a dictionary for the haplotypes_detected and a dictionary for the representative_haplotype_sequences. :param microhaplotype_table: the dataframe containing microhaplotype calls :type microhaplotype_table: pd.DataFrame :param bioinformatics_run_name: unique name for the bioinformatics run that generated the data (column name or individual run name). Default: None :type bioinformatics_run_name: str, optional :param library_sample_name_col: the name of the column containing the library sample names. Default: library_sample_name :type library_sample_name_col: str :param target_name_col: the name of the column containing the targets. Default: target_name :type target_name_col: str :param seq_col: the name of the column containing the microhaplotype sequences. Default: seq :type seq_col: str :param reads_col: the name of the column containing the read counts. Default: reads :type reads_col: str :param genome_id: the ID of the genome used as reference. Default: None :type genome_id: int, optional :param umis_col: the name of the column with the unique molecular identifier count associated with this microhaplotype :type umis_col: str, optional :param chrom_col: the name of the column containing the chromosome name of the microhaplotype :type chrom_col: str, optional :param start_col: the name of the column containing the start of the microhaplotype :type start_col: str, optional :param end_col: the name of the column containing the end of the microhaplotype :type end_col: str, optional :param ref_seq_col: the name of the column containing the reference sequence for the microhaplotype :type ref_seq_col: str, optional :param strand_col: the name of the column containing the strand of the microhaplotype :type strand_col: str, optional :param alt_annotations_col: the name of the column containing any alternative annotations :type alt_annotations_col: str, optional :param masking_seq_start_col: the name of the column containing a list of start positions for masking :type masking_seq_start_col: str, optional :param masking_seq_segment_size_col: the name of the column containing a list of lengths of the segments in seq being masked :type masking_seq_segment_size_col: str, optional :param masking_replacement_size_col: the name of the column containing a list of lengths of the masking replacements :type masking_replacement_size_col: str, optional :param masking_delim: delimiter of the masking information. Default: ',' :type masking_delim: str, optional :param microhaplotype_name_col: the name of the column containing an optional name for this microhaplotype :type microhaplotype_name_col: str, optional :param pseudocigar_col: the name of the column containing a pseudocigar for the microhaplotype :type pseudocigar_col: str, optional :param quality_col: the name of the column containing the ANSI FASTQ per-base quality score for this sequence :type quality_col: str, optional :param additional_representative_mhap_cols: additional columns to add to the representative microhaplotypes table :type additional_representative_mhap_cols: list of str, optional :param additional_mhap_detected_cols: additional columns to add to the detected microhaplotypes table :type additional_mhap_detected_cols: list of str, optional :return: a dict of both the haplotypes_detected and representative_haplotype_sequences :rtype: dict """ representative_microhaplotype_dict = create_representative_microhaplotype_dict( microhaplotype_table=microhaplotype_table, target_name_col=target_name_col, seq_col=seq_col, genome_id=genome_id, chrom_col=chrom_col, start_col=start_col, end_col=end_col, ref_seq_col=ref_seq_col, strand_col=strand_col, alt_annotations_col=alt_annotations_col, masking_seq_start_col=masking_seq_start_col, masking_seq_segment_size_col=masking_seq_segment_size_col, masking_replacement_size_col=masking_replacement_size_col, masking_delim=masking_delim, microhaplotype_name_col=microhaplotype_name_col, pseudocigar_col=pseudocigar_col, quality_col=quality_col, additional_representative_mhap_cols=additional_representative_mhap_cols, ) detected_mhap_dict_list = [] if bioinformatics_run_name in microhaplotype_table.columns: for bioinfo_run in microhaplotype_table[bioinformatics_run_name].unique(): microhaplotype_table_per_run = microhaplotype_table[ microhaplotype_table[bioinformatics_run_name] == bioinfo_run ] detected_mhap_dict = create_detected_microhaplotype_dict( microhaplotype_table=microhaplotype_table_per_run, representative_microhaplotype_dict=representative_microhaplotype_dict, bioinformatics_run_name=bioinfo_run, library_sample_name_col=library_sample_name_col, target_name_col=target_name_col, seq_col=seq_col, reads_col=reads_col, umis_col=umis_col, additional_mhap_detected_cols=additional_mhap_detected_cols, ) detected_mhap_dict_list.append(detected_mhap_dict) else: detected_mhap_dict = create_detected_microhaplotype_dict( microhaplotype_table=microhaplotype_table, representative_microhaplotype_dict=representative_microhaplotype_dict, bioinformatics_run_name=bioinformatics_run_name, library_sample_name_col=library_sample_name_col, target_name_col=target_name_col, seq_col=seq_col, reads_col=reads_col, umis_col=umis_col, additional_mhap_detected_cols=additional_mhap_detected_cols, ) detected_mhap_dict_list.append(detected_mhap_dict) output_data_dict = { "representative_microhaplotypes": representative_microhaplotype_dict, "detected_microhaplotypes": detected_mhap_dict_list, } return output_data_dict
[docs]def create_representative_microhaplotype_dict( microhaplotype_table: pd.DataFrame, target_name_col: str = "target_name", seq_col: str = "seq", genome_id: int = 0, chrom_col: str | None = None, start_col: str | None = None, end_col: str | None = None, ref_seq_col: str | None = None, strand_col: str | None = None, alt_annotations_col: str | None = None, masking_seq_start_col: str | None = None, masking_seq_segment_size_col: str | None = None, masking_replacement_size_col: str | None = None, masking_delim: str = ",", microhaplotype_name_col: str | None = None, pseudocigar_col: str | None = None, quality_col: str | None = None, additional_representative_mhap_cols: list[str] | None = None, ): """ Convert the read-in microhaplotype calls table into a representative microhaplotype JSON-like dictionary. :param microhaplotype_table: the dataframe containing microhaplotype calls :type microhaplotype_table: pd.DataFrame :param target_name_col: the name of the column containing the targets. Default: target_name :type target_name_col: str :param seq_col: the name of the column containing the microhaplotype sequences. Default: seq :type seq_col: str :param genome_id: the genome ID :type genome_id: int :param chrom_col: the name of the column containing the chromosome name of the microhaplotype :type chrom_col: str, optional :param start_col: the name of the column containing the start of the microhaplotype :type start_col: str, optional :param end_col: the name of the column containing the end of the microhaplotype :type end_col: str, optional :param ref_seq_col: the name of the column containing the reference sequence for the microhaplotype :type ref_seq_col: str, optional :param strand_col: the name of the column containing the strand of the microhaplotype :type strand_col: str, optional :param alt_annotations_col: the name of the column containing any alternative annotations :type alt_annotations_col: str, optional :param masking_seq_start_col: the name of the column containing a list of start positions for masking :type masking_seq_start_col: str, optional :param masking_seq_segment_size_col: the name of the column containing a list of lengths of the segments in seq being masked :type masking_seq_segment_size_col: str, optional :param masking_replacement_size_col: the name of the column containing a list of lengths of the masking replacements :type masking_replacement_size_col: str, optional :param masking_delim: delimiter of the masking information. Default: ',' :type masking_delim: str, optional :param microhaplotype_name_col: the name of the column containing an optional name for this microhaplotype :type microhaplotype_name_col: str, optional :param pseudocigar_col: the name of the column containing a pseudocigar for the microhaplotype :type pseudocigar_col: str, optional :param quality_col: the name of the column containing the ANSI FASTQ per-base quality score for this sequence :type quality_col: str, optional :param additional_representative_mhap_cols: additional columns to add to the representative microhaplotypes table :type additional_representative_mhap_cols: list of str, optional :return: a dictionary formatted for JSON output with representative microhaplotype sequences :rtype: dict """ if additional_representative_mhap_cols: check_additional_columns_exist( microhaplotype_table, additional_representative_mhap_cols ) def get_if_present(row, col): return row[col] if col and pd.notna(row[col]) else None def extract_masking(row): if not ( masking_seq_start_col and masking_seq_segment_size_col and masking_replacement_size_col ): return [] if all( [ pd.notna(row[masking_seq_start_col]), pd.notna(row[masking_seq_segment_size_col]), pd.notna(row[masking_replacement_size_col]), ] ): starts = str(row[masking_seq_start_col]).split(masking_delim) sizes = str(row[masking_seq_segment_size_col]).split(masking_delim) replacements = str(row[masking_replacement_size_col]).split(masking_delim) return [ { "seq_start": int(s), "seq_segment_size": int(sz), "replacement_size": int(r), } for s, sz, r in zip(starts, sizes, replacements) if s and sz and r ] else: return [] def warn_if_duplicated_seqs(df, target_col, seq_col): dup_counts = df.groupby([target_col, seq_col]).size() duplicate_combos = dup_counts[dup_counts > 1] if not duplicate_combos.empty: warnings.warn( f"Duplicate (target, asv) combinations found:\n{duplicate_combos}", UserWarning, ) # Determine which columns to keep optional_cols = [ chrom_col, start_col, end_col, ref_seq_col, strand_col, alt_annotations_col, microhaplotype_name_col, pseudocigar_col, quality_col, ] masking_cols = [ masking_seq_start_col, masking_seq_segment_size_col, masking_replacement_size_col, ] # Check location cols are set correctly if any(masking_cols): if not all(masking_cols): raise ValueError( "If one of masking_seq_start_col, masking_seq_segment_size_col, masking_replacement_size_col is set, then all must be." ) if any([chrom_col, start_col, end_col, ref_seq_col, strand_col]): if not all([chrom_col, start_col, end_col]): raise ValueError( "If any location columns set (chrom_col, start_col, end_col, ref_seq_col, strand_col), then all required ones must be (chrom_col, start_col, end_col)." ) all_cols = [target_name_col, seq_col] + [ c for c in optional_cols + masking_cols if c ] if additional_representative_mhap_cols: all_cols += additional_representative_mhap_cols all_cols = list(set(all_cols)) unique_table = ( microhaplotype_table[all_cols].drop_duplicates().reset_index(drop=True) ) warn_if_duplicated_seqs(unique_table, target_name_col, seq_col) mhap_data = {"targets": []} for target, group in unique_table.groupby(target_name_col): target_dict = {"target_name": target, "microhaplotypes": []} first_row = group.iloc[0] if chrom_col and pd.notna(group[chrom_col].iloc[0]): loc = { "genome_id": genome_id, "chrom": first_row[chrom_col], "start": first_row[start_col], "end": first_row[end_col], } if ref_seq_col and pd.notna(first_row[ref_seq_col]): loc["ref_seq"] = first_row[ref_seq_col] if strand_col and pd.notna(first_row[strand_col]): loc["strand"] = first_row[strand_col] target_dict["mhap_location"] = loc for _, row in group.iterrows(): mhap = {"seq": row[seq_col]} if val := get_if_present(row, alt_annotations_col): mhap["alt_annotations"] = val if val := get_if_present(row, microhaplotype_name_col): mhap["microhaplotype_name"] = val if val := get_if_present(row, pseudocigar_col): mhap["pseudo_cigar"] = val if val := get_if_present(row, quality_col): mhap["quality"] = val if additional_representative_mhap_cols: for col in additional_representative_mhap_cols: if val := get_if_present(row, col): mhap[col] = val # Add masking if present masking = extract_masking(row) if masking: mhap["masking"] = masking target_dict["microhaplotypes"].append(mhap) mhap_data["targets"].append(target_dict) return mhap_data
[docs]def create_detected_microhaplotype_dict( microhaplotype_table: pd.DataFrame, representative_microhaplotype_dict: dict, bioinformatics_run_name: str | None = None, library_sample_name_col: str = "library_sample_name", target_name_col: str = "target_name", seq_col: str = "seq", reads_col: str = "reads", umis_col: str | None = None, additional_mhap_detected_cols: list | None = None, ): """ Convert the read-in microhaplotype calls table into the detected microhaplotype dictionary. :param microhaplotype_table: Parsed microhaplotype calls table. :param representative_microhaplotype_dict: Dictionary of representative microhaplotypes. :param bioinformatics_run_name: Optional Unique name for the bioinformatics run that generated the data. :param library_sample_name_col: Column containing the sample IDs. :param target_name_col: Column containing the locus IDs. :param seq_col: Column containing the microhaplotype sequences. :param reads_col: Column containing the read counts. :param umis_col: Optional Column with unique molecular identifier count associated with this microhaplotype :param additional_mhap_detected_cols: Optional additional columns to add to the microhaplotypes detected, the key is the pandas column and the value is what to name it in the output. :return: A dictionary of detected microhaplotype results. """ # Rename columns in dataframe and gather columns column_mapping = { library_sample_name_col: "library_sample_name", target_name_col: "target_name", seq_col: "seq", reads_col: "reads", } mhap_cols = ["mhap_id", "reads"] if umis_col: column_mapping[umis_col] = "umis" mhap_cols.append("umis") df = microhaplotype_table.rename(columns=column_mapping).copy() # Validate additional columns if provided if additional_mhap_detected_cols: check_additional_columns_exist(df, additional_mhap_detected_cols) mhap_cols += additional_mhap_detected_cols # Find IDs for targets and mhaps in the representative table df = get_target_id_in_representative_mhaps(df, representative_microhaplotype_dict) df = get_mhap_index_in_representative_mhaps(df, representative_microhaplotype_dict) # Build detected mhap table mhap_detected = build_detected_mhap_dict(df, bioinformatics_run_name, mhap_cols) return mhap_detected
[docs]def build_detected_mhap_dict( df, bioinformatics_run_name, mhap_cols, always_include=None ): if always_include is None: always_include = ["mhap_id", "reads"] mhap_detected = { "library_samples": [], } if bioinformatics_run_name is not None: mhap_detected["bioinformatics_run_name"] = bioinformatics_run_name for sample, sample_df in df.groupby("library_sample_name"): target_results = [] for target_id, target_df in sample_df.groupby("mhaps_target_id"): mhaps = target_df.apply( lambda row: { col: row[col] for col in mhap_cols if col in always_include or pd.notna(row[col]) }, axis=1, ).to_list() target_results.append({"mhaps_target_id": target_id, "mhaps": mhaps}) mhap_detected["library_samples"].append( {"library_sample_name": sample, "target_results": target_results} ) return mhap_detected
[docs]def get_target_id_in_representative_mhaps(df, representative_dict): target_name_to_mhaps_target_id = { entry["target_name"]: i for i, entry in enumerate(representative_dict["targets"]) } df["mhaps_target_id"] = df["target_name"].map(target_name_to_mhaps_target_id) if df["mhaps_target_id"].isnull().any(): missing_targets = df[df.mhaps_target_id.isnull()]["target_name"].unique() raise ValueError( f"Missing target_name(s) in representative microhaplotype table: {missing_targets}" ) return df
[docs]def get_mhap_index_in_representative_mhaps(df, representative_dict): target_seq_to_mhap_id = { (target_id, mhap["seq"]): i for target_id, target_entry in enumerate(representative_dict["targets"]) for i, mhap in enumerate(target_entry["microhaplotypes"]) } df["mhap_id"] = df.apply( lambda row: target_seq_to_mhap_id.get((row["mhaps_target_id"], row["seq"])), axis=1, ) if df["mhap_id"].isnull().any(): missing_seqs = df[df["mhap_id"].isnull()][ ["target_name", "seq"] ].drop_duplicates() raise ValueError( f"Some seq values not found in representative microhaplotype table:\n{missing_seqs}" ) return df
[docs]def create_minimum_library_specimen_dict_from_mhap_table( detected_microhaps: list[dict], panel_name: str, library_sample_field_name: str = "library_sample_name", library_sample_specimen_key: dict[str, str] | pd.DataFrame | None = None, library_sample_name_col: str = "library_sample_name", specimen_name_col: str = "specimen_name", missing_library_sample_becomes_specimen_name: bool = False, ): """ Create a minimum library_sample_info and specimen_info dicts from the detected microhaps :param detected_microhaps: the detected microhaps object created by create_detected_microhaplotype_dict :param panel_name: the panel_name for the library_sample :param library_sample_field_name: the field name to use to extract the library_sample_name from the detected_michrohaplotypes :param library_sample_specimen_key: a dict mapping library_sample_name -> specimen_name, or a pandas DataFrame with two columns for renaming controlled by library_sample_name_col and specimen_name_col if None, specimen_name == library_sample_name :param library_sample_name_col: the column name in library_sample_specimen_key that contains the library_sample_name :param specimen_name_col: the column name in library_sample_specimen_key that contains the specimen_name :param missing_library_sample_becomes_specimen_name: if True and a library_sample_name is missing from library_sample_specimen_key, fall back to using the library_sample_name as the specimen_name; if False, raise an error :return: dict with keys 'library_sample_info' and 'specimen_info' """ # Collect all sample dicts across every entry in detected_microhaps all_samples: list[dict] = [] for entry in detected_microhaps: all_samples.extend(entry.get("library_samples", [])) # check that every sample has the expected key missing_key_indices = [ i for i, s in enumerate(all_samples) if library_sample_field_name not in s ] if missing_key_indices: raise KeyError( f"The following sample indices are missing the field name '{library_sample_field_name}': " f"{missing_key_indices}" ) # check that all library_sample_name values are unique raw_names: list[str] = [s[library_sample_field_name] for s in all_samples] seen: set[str] = set() duplicates: set[str] = set() for name in raw_names: if name in seen: duplicates.add(name) seen.add(name) if duplicates: raise ValueError(f"Duplicate library sample names found: {sorted(duplicates)}") actual_library_sample_specimen_key = None if library_sample_specimen_key is not None and isinstance( library_sample_specimen_key, dict ): actual_library_sample_specimen_key = library_sample_specimen_key elif library_sample_specimen_key is not None and isinstance( library_sample_specimen_key, pd.DataFrame ): actual_library_sample_specimen_key = library_sample_specimen_key.set_index( library_sample_name_col )[specimen_name_col].to_dict() # now construct library_sample_info library_sample_info: list[dict] = [] for sample in all_samples: lib_name: str = sample[library_sample_field_name] # use look up table to get specimen_name if provided, otherwise use library_sample_name as specimen_name if actual_library_sample_specimen_key is not None: if lib_name in actual_library_sample_specimen_key: specimen_name = actual_library_sample_specimen_key[lib_name] elif missing_library_sample_becomes_specimen_name: # if not in key but allowing missing to become specimen_name, use library_sample_name as specimen_name specimen_name = lib_name else: raise KeyError( f"library_sample_name '{lib_name}' not found in library_sample_specimen_key " f"and missing_library_sample_becomes_specimen_name is False." ) else: specimen_name = lib_name library_sample_info.append( { "library_sample_name": lib_name, "panel_name": panel_name, "specimen_name": specimen_name, } ) # build specimen_info from unique specimen_names (preserving first-seen order) seen_specimens: set[str] = set() specimen_info: list[dict] = [] for entry in library_sample_info: sp = entry["specimen_name"] if sp not in seen_specimens: seen_specimens.add(sp) specimen_info.append({"specimen_name": sp}) return { "library_sample_info": library_sample_info, "specimen_info": specimen_info, }