#
# Created by Lukas Lüftinger on 2/5/19.
#
import numpy as np
from typing import List, Dict, Tuple
from collections import Counter
import json
from pica.util.logging import get_logger
from pica.structure.records import GenotypeRecord, PhenotypeRecord, GroupRecord, TrainingRecord
DEFAULT_TRAIT_SIGN_MAPPING = {"YES": 1, "NO": 0}
[docs]def load_genotype_file(input_file: str) -> List[GenotypeRecord]:
"""
Loads a genotype .tsv file and returns a list of GenotypeRecord for each entry.
:param input_file: The path to the input genotype file.
:return: List[GenotypeRecord] of records in the genotype file
"""
with open(input_file) as genotype_file:
genotype_records = []
for line in genotype_file:
identifier, *features = line.strip().split("\t")
genotype_records.append(GenotypeRecord(identifier=identifier,
features=features))
dupcount = Counter([x.identifier for x in genotype_records])
if dupcount.most_common()[0][1] > 1:
raise RuntimeError(f"Duplicate entries found in genotype file: {dupcount}")
return sorted(genotype_records, key=lambda x: x.identifier)
[docs]def load_phenotype_file(input_file: str, sign_mapping: Dict[str, int] = None) -> List[PhenotypeRecord]:
"""
Loads a phenotype .tsv file and returns a list of PhenotypeRecord for each entry.
:param input_file: The path to the input phenotype file.
:param sign_mapping: an optional Dict to change mappings of trait sign. Default: {"YES": 1, "NO": 0}
:return: List[PhenotypeRecord] of records in the phenotype file
"""
with open(input_file) as phenotype_file:
identifiers = []
trait_signs = []
_, trait_name = phenotype_file.readline().strip().split("\t")
for line in phenotype_file:
identifier, trait_sign = line.strip().split("\t")
identifiers.append(identifier)
trait_signs.append(trait_sign)
dupcount = Counter(identifiers)
if dupcount.most_common()[0][1] > 1:
raise RuntimeError(f"Duplicate entries found in genotype file: {dupcount}")
if sign_mapping is None:
sign_mapping = DEFAULT_TRAIT_SIGN_MAPPING
trait_signs = [sign_mapping.get(x, None) for x in trait_signs]
phenotype_records = [PhenotypeRecord(identifier=x,
trait_name=trait_name,
trait_sign=y) for x, y in zip(identifiers, trait_signs)]
ret = sorted(phenotype_records, key=lambda x: x.identifier)
return ret
[docs]def load_groups_file(input_file: str, selected_rank: str = None) -> List[GroupRecord]:
"""
Loads a .tsv file which contains group or taxid for each sample in the other training files.
Group-Ids may be ncbi-taxon-ids or arbitrary group names.
Taxon-Ids are only used if a standard rank is selected,
otherwise user-specified group-ids are assumed.
Automatically classifies the [TODO missing text?]
:param input_file: path to the file that is processed
:param selected_rank: the standard rank that is selected (optional) if not set,
the input file is assumed to contain groups,
i.e., each unique entry of the ID will be a new group
:return: a list of GroupRecords
"""
with open(input_file) as groups_file:
identifiers = []
group_ids = []
for line in groups_file:
identifier, group_id = line.strip().split("\t")
identifiers.append(identifier)
group_ids.append(group_id)
dupcount = Counter(identifiers)
if dupcount.most_common()[0][1] > 1:
raise RuntimeError(f"Duplicate entries found in groups file: {dupcount}")
if selected_rank:
try:
from pica.util.taxonomy import get_taxonomic_group_mapping
group_name_mapping, group_id_mapping = get_taxonomic_group_mapping(group_ids=group_ids,
selected_rank=selected_rank)
group_records = [GroupRecord(identifier=x, group_id=group_id_mapping[y],
group_name=group_name_mapping[y])
for x, y in zip(identifiers, group_ids)]
except ImportError:
raise RuntimeError(
"A required package was not found. ete3 is required to support taxonomic ids for"
" grouping. Please install or divide your samples into groups manually")
else:
group_id_mapping = {x: group_id for group_id, x in enumerate(set(group_ids))}
group_records = [GroupRecord(identifier=x, group_id=group_id_mapping[y], group_name=y)
for x, y in zip(identifiers, group_ids)]
ret = sorted(group_records, key=lambda x: x.identifier)
return ret
[docs]def collate_training_data(genotype_records: List[GenotypeRecord],
phenotype_records: List[PhenotypeRecord],
group_records: List[GroupRecord],
universal_genotype: bool = False, verb: bool = False) -> List[
TrainingRecord]:
"""
Returns a list of TrainingRecord from two lists of GenotypeRecord and PhenotypeRecord.
To be used for training and CV of TrexClassifier.
Checks if 1:1 mapping of phenotypes and genotypes exists,
and if all PhenotypeRecords pertain to same trait.
:param genotype_records: List[GenotypeRecord]
:param phenotype_records: List[PhenotypeRecord]
:param group_records: List[GroupRecord] optional, if leave one group out is the split strategy
:param universal_genotype: Whether to use an universal genotype file.
:param verb: toggle verbosity.
:return: List[TrainingRecord]
"""
logger = get_logger(__name__, verb=verb)
gr_dict = {x.identifier: x for x in genotype_records}
pr_dict = {x.identifier: x for x in phenotype_records}
gp_dict = {x.identifier: x for x in group_records}
traits = set(x.trait_name for x in phenotype_records)
if universal_genotype:
if not set(gr_dict.keys()).issuperset(set(pr_dict.keys())):
raise RuntimeError(
"Not all identifiers of phenotype records were found in the universal genotype."
"Cannot collate to TrainingRecords.")
else:
different_identifiers = set(gr_dict.keys()).symmetric_difference(set(pr_dict.keys()))
if different_identifiers:
logger.error(f"Identifiers not present in all record types: {different_identifiers}")
raise RuntimeError("Different identifiers found among genotype and phenotype records. "
"Cannot collate to TrainingRecords.")
if group_records:
if len(gp_dict) != len(pr_dict):
raise RuntimeError("Group and phenotype/genotype records are of unequal length."
"Cannot collate to TrainingRecords.")
if set(gp_dict.keys()) != set(pr_dict.keys()):
raise RuntimeError(
"Different identifiers found among groups and phenotype/genotype records. "
"Cannot collate to TrainingRecords.")
if len(traits) > 1:
raise RuntimeError("More than one traits have been found in phenotype records. "
"Cannot collate to TrainingRecords.")
ret = [TrainingRecord(identifier=pr_dict[x].identifier,
trait_name=pr_dict[x].trait_name,
trait_sign=pr_dict[x].trait_sign,
features=gr_dict[x].features,
group_name=gp_dict[x].group_name,
group_id=gp_dict[x].group_id) for x in pr_dict.keys()]
logger.info(f"Collated genotype and phenotype records into {len(ret)} TrainingRecord.")
return ret
[docs]def load_training_files(genotype_file: str, phenotype_file: str, groups_file: str = None,
selected_rank: str = None,
universal_genotype: bool = False, verb=False) -> Tuple[List[TrainingRecord],
List[GenotypeRecord],
List[PhenotypeRecord],
List[GroupRecord]]:
"""
Convenience function to load phenotype and genotype file together, and return a list of TrainingRecord.
:param genotype_file: The path to the input genotype file.
:param phenotype_file: The path to the input phenotype file.
:param groups_file: The path to the input groups file.
:param selected_rank: The selected standard rank to use for taxonomic grouping
:param universal_genotype: Whether to use an universal genotype file.
:param verb: toggle verbosity.
:return: Tuple[List[TrainingRecord], List[GenotypeRecord], List[PhenotypeRecord]]
"""
logger = get_logger(__name__, verb=verb)
gr = load_genotype_file(genotype_file)
pr = load_phenotype_file(phenotype_file)
if groups_file:
gp = load_groups_file(groups_file, selected_rank=selected_rank)
else:
# if not set, each sample gets its own group (not used currently)
gp = [GroupRecord(identifier=x.identifier, group_name=x.identifier, group_id=y)
for y, x in enumerate(pr)]
logger.info("Genotype and Phenotype records successfully loaded from file.")
return collate_training_data(gr, pr, gp, universal_genotype=universal_genotype,
verb=verb), gr, pr, gp
[docs]def write_weights_file(weights_file: str, weights: Dict):
"""
Function to write the weights to specified file in tab-separated fashion with header
:param weights_file: The path to the file to which the output will be written
:param weights: sorted dictionary storing weights with feature names as indices
:return: nothing
"""
header = ["Rank", "Feature_name", "Weight"]
with open(weights_file, "w") as output_file:
output_file.write("%s\n" % "\t".join(header))
for rank, (name, weight) in enumerate(weights.items()):
output_file.write(f"{rank + 1}\t{name.upper()}\t{weight}\n")
[docs]def write_cccv_accuracy_file(output_file: str, cccv_results):
"""
Function to write the cccv accuracies in the exact format that phendb uses as input.
:param output_file: file
:param cccv_results:
:return: nothing
"""
write_list = []
for completeness, data in cccv_results.items():
for contamination, nested_data in data.items():
write_item = {
"mean_balanced_accuracy" : nested_data["score_mean"],
"stddev_balanced_accuracy": nested_data["score_sd"],
"contamination" : contamination,
"completeness" : completeness
}
write_list.append(write_item)
with open(output_file, "w") as outf_handler:
json.dump(write_list, outf_handler, indent="\t")
[docs]def write_misclassifications_file(output_file: str, records: List[TrainingRecord],
misclassifications,
use_groups: bool = False):
"""
Function to write the misclassifications file.
:param output_file: name of the outputfile
:param records: List of trainingRecord objects
:param misclassifications: List of percentages of misclassifications
:param use_groups: toggles average over groups and groups output
:return:
"""
identifier_list = [record.identifier for record in records]
trait_sign_list = [record.trait_sign for record in records]
if use_groups:
group_names = [record.group_name for record in records]
identifier_list = list(set(group_names))
grouped_mcs = []
grouped_signs = []
for group in identifier_list:
group_mcs = [mcs for mcs, group_name in zip(misclassifications, group_names)
if group == group_name]
group_sign = [trait_sign for trait_sign, group_name in zip(trait_sign_list, group_names)
if group == group_name]
grouped_mcs.append(np.mean(group_mcs))
grouped_signs.append(np.mean(group_sign))
trait_sign_list = grouped_signs
misclassifications = grouped_mcs
sorted_tuples = sorted(zip(identifier_list, trait_sign_list, misclassifications),
key=lambda k: k[2], reverse=True)
header = ["Identifier", "Trait present", "Mis-classifications [frac.]"]
trait_translation = {y: x for x, y in DEFAULT_TRAIT_SIGN_MAPPING.items()}
with open(output_file, "w") as outf:
outf.write("%s\n" % "\t".join(header))
for identifier, trait_sign, mcs in sorted_tuples:
outf.write(f'{identifier}\t{trait_translation.get(trait_sign, "MIXED")}\t{mcs}\n')