Source code for starlingrt.functions

"""
Functions to manipulate and compute properties of GCMS data.

Author: Nathan A. Mahynski
"""

import starlingrt
from starlingrt import data
import scipy

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from numpy.typing import NDArray
from typing import Union, Any


[docs]def flag_entry_rt( entries: list[tuple["starlingrt.data.Entry", str]], min_entries: int = 10, k: float = 3.0, cv: int = 5, style: str = "classical", ) -> NDArray[np.bool_]: """ Flag entries with anomolous retention times based on the group's consensus. If a point is considered an outlier in any single fold, it is flagged. Parameters ---------- entries : list(tuple(Entry, str)) List of entries (e.g., grouped by name) to examine. min_entries : int, optional(default=10) Minimum length of `entries` to do KFold cross validation (CV), otherwise ignore `cv` and do LOOCV is instead. k : float, optional(default=3.0) Number of standard deviations from center allowed. `k` = 1.5 is more appropriate if using the `robust` approach based on the IQR as a measure of "spread"; `k` = 3 is more appropriate for `classical`, but can be reasonable for the `robust` approach as well. cv : int, optional(default=5) Number of folds to use in cross-validation. If len(`entries`) > `cv`, LOOCV is used instead. style: str, optional(default="classical") When `classical` use mean and std vs. `robust` which uses median and iqr for center and spread, respectively. Inliers are considered those for which: `center - k*spread < x < center + k*spread`. The rest are flagged. Returns ------- mask : ndarray(bool) Mask of outliers corresponding to the ordering in `entries`. """ from sklearn.model_selection import KFold if style == "classical": center_f = np.mean spread_f = np.std elif style == "robust": center_f = np.median # type: ignore [assignment] spread_f = scipy.stats.iqr # rng = (25,75) by default else: raise ValueError(f"Unrecognized style : {style}") entries_ = np.asarray(entries) outlier_mask = np.zeros(len(entries_), dtype=bool) if (len(entries_) < min_entries) or (len(entries_) < cv): # Do LOOCV instead of KFold CV cv = len(entries_) if len(entries_) < 2: # Not enough examples to estimate distribution, assuming inliers return outlier_mask else: kf = KFold(n_splits=cv, shuffle=False) retention_times = np.array([e[0].rt for e in entries_]) for train_index, test_index in kf.split(entries_): # Estimate center and spread from training fraction rt_train = retention_times[train_index] center = center_f(rt_train) spread = spread_f(rt_train) # Test all (including training set) inliers = (center - k * spread < retention_times) & ( retention_times < center + k * spread ) for i in np.where(~inliers)[0]: outlier_mask[ i ] = True # If any fold finds a point to be an outlier, it is flagged that way return outlier_mask
[docs]def make_histograms( by_name: dict[str, list[tuple["starlingrt.data.Entry", str]]], k_values: NDArray[np.floating], bins: int = 10, cv: int = 3, style: str = "robust", min_entries: int = 5, ) -> tuple[dict, dict, dict]: """ Make histograms of retention times for each compound by name. Parameters ---------- by_name : dict(str, list(tuple(Entry, str))) Dictionary of Entry whose keys are hit names and values are tuples of (Entry, hash). k_values : array-like List of `k` values to use to flag "outlying" retention times. bins : int Number of histogram bins to use for retention times. cv : int, optional(default=3) Number of folds to use in cross-validation. style: str, optional(default="classical") When `classical` use mean and std vs. `robust` which uses median and iqr for center and spread, respectively. Inliers are considered those for which: `center - k*spread < x < center + k*spread`. The rest are flagged. min_entries : int, optional(default=5) Minimum length of `entries` to do KFold cross validation (CV), otherwise ignore `cv` and do LOOCV is instead. Returns ------- histograms : dict(str, list) Values of the histogram for each compound. bin_edges : dict(str, list) Bin edges for the histogram of each compound. points : dict(str, dict(str, dict(str, dict(str, list)))) Nested dictionary of points which are of concern or not of concern for each `k` value; e.g., points['methane']['3.0']['concern'] = {'x': rention_times, 'y': staggered_bin_counts}. """ histograms = {} bin_edges = {} for name in by_name: rt = [entry[0].rt for entry in by_name[name]] hist, edges = np.histogram(rt, bins=bins) histograms[name] = hist.tolist() bin_edges[name] = edges.tolist() points: dict[str, dict[str, dict[str, dict[str, list]]]] = {} for name in by_name: points[name] = {} for k in k_values: points[name][str(k)] = {} potential_issues = flag_entry_rt( by_name[name], min_entries=min_entries, k=k, cv=cv, style=style ) concerns = [ x[0].rt for x in np.array(by_name[name])[potential_issues] ] not_concern = [ x[0].rt for x in np.array(by_name[name])[~potential_issues] ] def get_bin(x, edges): bin_ = 0 while x > edges[bin_ + 1]: bin_ += 1 return bin_ def stagger_y(points, bin_edges, name): bin_counter = {} y = [] for x in points: bin_ = get_bin(x, bin_edges[name]) if bin_ in bin_counter: bin_counter[bin_] += 1 else: bin_counter[bin_] = 1 y.append(bin_counter[bin_]) return y if len(concerns) > 0: points[name][str(k)]["concern"] = { "x": concerns, "y": stagger_y(concerns, bin_edges, name), } else: points[name][str(k)]["concern"] = {"x": [], "y": []} if len(not_concern) > 0: points[name][str(k)]["not_concern"] = { "x": not_concern, "y": stagger_y(not_concern, bin_edges, name), } else: points[name][str(k)]["not_concern"] = {"x": [], "y": []} return histograms, bin_edges, points
[docs]def make_dataframe(entries: dict) -> "pd.DataFrame": """ Create a dataframe out of the entries. Parameters ---------- entries : dict(str, Entry) Dictionary of Entry whose keys are sha1 hashes and values are Entry objects. Returns ------- dataframe : pd.DataFrame DataFrame of Entries sorted by Hit name. """ by_name = starlingrt.data.Utilities.group_entries_by_name(entries) columns = sorted(by_name[list(by_name.keys())[0]][0][0].get_params().keys()) data = [] for name in by_name.keys(): for entry, hash in by_name[name]: raw = entry.get_params() data.append([raw[k] for k in columns] + [str(hash)]) df = pd.DataFrame(data=data, columns=columns + ["hash"]) return df
[docs]def get_dataframe( entries: dict, target: Union[str, None] = None, pm: int = 0 ) -> tuple["pd.DataFrame", "pd.api.typing.DataFrameGroupBy", list]: # type: ignore [name-defined] """ Get dataframe centered on a target. Parameters ---------- entries : dict(str, Entry) Dictionary of Entry whose keys are sha1 hashes and values are Entry objects. target : str, optional(default=None) Name of retention time group to target. pm : int, optional(default=0) Number of neighbors around `target` to select. Returns ------- results : pd.DataFrame DataFrame of retention times for selected target and any selected neighbors. name_groups : pd.api.typing.DataFrameGroupBy Pandas GroupBy object containing the `entries` grouped by Hit name. order_cats_used : list Ordered list of names of the groups selected. """ df = make_dataframe(entries) # Add the number of observations to the x-axis labels name_groups = df.groupby("hit_name") ordered_cats = name_groups["rt"].median().sort_values().index.tolist() df["origin"] = df.sample_filename.apply( lambda x: x.strip().split("/")[-2:-1][0] ) def select(target, pm=0): """Select the pm number of neighbors.""" if target is None: # Look at all compounds return ordered_cats else: if target not in ordered_cats: raise ValueError(f"{target} not found") else: center = ordered_cats.index(target) return ordered_cats[ np.max([0, center - pm]) : np.min( [len(ordered_cats), center + pm + 1] ) ] order_cats_used = select(target, pm) mask = df["hit_name"].apply(lambda x: x in order_cats_used) return df.loc[mask], name_groups, order_cats_used
[docs]def get_quantiles_df( name_groups: "pd.api.typing.DataFrameGroupBy", # type: ignore [name-defined] ) -> "pd.DataFrame": """ Get the 0.25, 0.50, and 0.75 percentiles of the groups. Parameters ---------- name_groups : pd.api.typing.DataFrameGroupBy Pandas GroupBy object containing the `entries` grouped by Hit name. Returns ------- dataframe : pd.DataFrame DataFrame summarizing the IQR and whisker bounds for the `name_groups`. """ q1 = name_groups["rt"].quantile(q=0.25) q2 = name_groups["rt"].quantile(q=0.5) q3 = name_groups["rt"].quantile(q=0.75) iqr = q3 - q1 upper = q3 + 1.5 * iqr # Do not bound by max/min, just report ranges lower = q1 - 1.5 * iqr # Do not bound by max/min, just report ranges df = pd.concat( [ pd.DataFrame(q1).rename(columns={"rt": "q1"}), pd.DataFrame(q2).rename(columns={"rt": "q2"}), pd.DataFrame(q3).rename(columns={"rt": "q3"}), pd.DataFrame(upper).rename(columns={"rt": "upper"}), pd.DataFrame(lower).rename(columns={"rt": "lower"}), ], axis=1, ) df["new_name"] = df.index.copy() return df
[docs]def closest_rt(rt: float, df_iqr: "pd.DataFrame") -> str: """ Suggest the (visible) species with the closest median retention time. Parameters ---------- rt : float Retention time targeted. df_iqr : pd.DataFrame DataFrame summarizing the IQR and whisker bounds for the `name_groups`. Returns ------- guess : str Name of the Hit with the closes retention time. """ return ( df_iqr.sort_values(by="q2", axis=0, key=lambda x: (x - rt) ** 2) .iloc[0] .new_name )
[docs]def group_by_rt_step( df: "pd.DataFrame", threshold: float = 0.04 ) -> list[list[tuple[Any, str, float, float]]]: """ Create groups based on similar retention times. This algorithm simply sorts based on retention time, then creates a new group when a gap between consecutive (sorted) points exceeds the threshold. Parameters ---------- df : pd.DataFrame DataFrame of retention times (see `get_dataframe`). threshold : float, optional(default=0.04) Minimum retention time gap between consecutive compounds to be resolved as different. Returns ------- rt_groups : list(list(tuples)) List of tuples of (index, hit_name, quality, rt) by group. """ rt_sorted_df = df.sort_values(by="rt", ascending=True) rt_groups = [] for i, (index, row) in enumerate(rt_sorted_df.iterrows()): if i == 0: # First time level = row["rt"] rt_groups.append( [(index, row["hit_name"], row["quality"], row["rt"])] ) else: if (row["rt"] - level) > threshold: rt_groups.append( [(index, row["hit_name"], row["quality"], row["rt"])] ) else: rt_groups[-1].append( (index, row["hit_name"], row["quality"], row["rt"]) ) level = row["rt"] return rt_groups
[docs]def suggest_names( rt_groups: list[list[tuple[int, str, float, float]]] ) -> tuple[list, dict, list]: """ Suggest the best name for group compounds with similar retention times. Computes a "probability" using the quality of each observation in a group to determine the most likely name. Parameters ---------- rt_groups : list(tuple) List of tuples of (index, hit_name, quality, rt) by group (see `group_by_rt_step`). Returns ------- suggested_name : list Suggested name of each group. ties : dict Dictionary of ties. entropy : list Entropy of each group. """ ties = {} suggested_name = [] entropy = [] for i, group in enumerate(rt_groups): probs: dict[str, float] = {} for entry in group: hit_name, quality = entry[1], entry[2] if hit_name in probs: probs[hit_name] += quality else: probs[hit_name] = float(quality) sum_ = np.sum(list(probs.values())) entropy.append(0) for hit_name in probs: probs[hit_name] /= sum_ entropy[-1] -= probs[hit_name] * np.log(probs[hit_name]) sorted_probs = sorted(probs.items(), key=lambda x: x[1], reverse=True) best_guess, best_prob = sorted_probs[0] if len(sorted_probs) > 1: # Check if there is a tie if best_prob == sorted_probs[1][1]: ties[i] = sorted_probs suggested_name.append(best_guess) return suggested_name, ties, entropy
[docs]def assign_suggestions(df, rt_groups, suggested_name, ties): """ Unroll the suggestions and assign them to the dataframe. Parameters ---------- df : pd.DataFrame DataFrame of retention times (see `get_dataframe`). rt_groups : list(tuple) List of tuples of (index, hit_name, quality, rt) by group (see `group_by_rt_step`). suggested_name : list Suggested name of each group (see `suggest_names`). ties : dict Dictionary of ties (see `suggest_names`). Returns ------- dataframe : pd.DataFrame DataFrame with "suggested_name" and "flag" columns added. """ new_df = df.copy() suggested_name = np.asarray(suggested_name) unrolled_names = [""] * len(df) second_suggestion = [""] * len(df) flags = [0] * len(df) for i, group in enumerate(rt_groups): flag = "" suggestion = suggested_name[i] if ( i in ties ): # Flag if this group's assignment was tied b.w multiple compounds flag = "Tied for most likely compound" if ( np.sum(suggested_name == suggestion) > 1 ): # Flag if multiple groups being assigned this name flag = "Suggested compound has multiple RT groups" for entry in group: index = entry[0] assert unrolled_names[index] == "" unrolled_names[index] = suggestion if i in ties: assert second_suggestion[index] == "" second_suggestion[index] = ties[i][1][0] flags[index] = flag new_df["suggested_name"] = unrolled_names new_df["second_suggestion"] = second_suggestion new_df["flag"] = flags return new_df
[docs]def estimate_threshold( df: "pd.DataFrame", thresholds: NDArray = np.logspace(-4, 1, 100), display: bool = False, ) -> float: """ Estimate the minimum gap (threshold) to separate groups from each other. Parameters ---------- df : pd.DataFrame DataFrame of retention times for selected target and any selected neighbors (see `get_dataframe`). thresholds : ndarray(float, ndim=1), optional(default=np.logspace(-4, 1, 100)) Sequence of thresholds to try. display : bool, optional(default=False) Whether or not to display the results visually. Returns ------- threshold : float Choice of threshold. """ rt_sorted_df = df.sort_values(by="rt", ascending=True) duplicates = [] confusion = [] size = [] disorder = [] for res in thresholds: rt_groups = group_by_rt_step(rt_sorted_df, res) suggested_name, ties, entropy = suggest_names(rt_groups) names, counts = np.unique(suggested_name, return_counts=True) confusion.append(len(ties)) duplicates.append(np.sum(counts > 1)) size.append(len(rt_groups)) disorder.append(np.median(entropy)) product = np.array(duplicates) * np.array(disorder) if display: fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(12, 5)) ax = axes[0] ax.plot(thresholds, duplicates, label="Duplicates") ax.plot(thresholds, confusion, label="Group Assignments with Ties") ax.plot(thresholds, size, label="Retention Time Groups") ax.plot(thresholds, disorder, label="Median Entropy of Groups") ax.set_xlabel("Threshold") ax.set_ylabel("Value") ax.legend(loc="best") ax.set_yscale("log") ax.set_xscale("log") ax.axhline( 1, color="green", ls="--" ) # Everything lumped into one group ax.axhline( len(rt_sorted_df["rt"].unique()), color="green", ls="--" ) # Everything on its own up to some numerical precision ax = axes[1] ax.plot(thresholds, product) ax.set_xscale("log") ax.text( thresholds[np.argmax(product)], np.max(product) * 1.01, "(%.3f, %.3f)" % (thresholds[np.argmax(product)], np.max(product)), ) ax.set_ylabel(r"Duplicates $\times$ Median Entropy") ax.set_xlabel("Threshold") return thresholds[np.argmax(product)]