Source code for sa.methods

"""
########################
methods (``sa.methods``)
########################

This script contains techniques for strains analysis.
    
"""
import numpy as np
import sklearn.preprocessing
import sklearn.decomposition
import sklearn.cluster
import sklearn.covariance
import sklearn.manifold
import sklearn.mixture
import math

from sklearn.metrics import silhouette_score
from sklearn import svm
from sklearn.metrics.pairwise import euclidean_distances
from collections import defaultdict
from matplotlib import pyplot as plt
from matplotlib import cm
from matplotlib import font_manager

import utilities as utils
import plotting

#distance

[docs]def compute_distance(dnp, title, out_name, labels, labelsize = 6, rot = "vertical"): """ Considering the rows of :param:`dnp` as vectors, compute the euclidean distance matrix between each pair of vectors. Save computed distance to file named :param:`out_name` and return distance matrix. """ print "Computing distance matrix" dist_matrix = euclidean_distances(dnp, dnp) print "Mean Euclidean distance between observations: %5.3f" % np.mean(dist_matrix) fig = plt.figure(figsize = (10, 8)) ax1 = fig.add_subplot(111) imax = ax1.imshow(dist_matrix, interpolation = "nearest") ax1.set_title("Heatmap (Eucl.) for %s" % title) if "joined" not in title: plt.yticks(np.arange(dnp.shape[0]), labels) plt.xticks(np.arange(dnp.shape[0]), labels, rotation = rot) ax1.tick_params(axis = "x", labelsize = labelsize) ax1.tick_params(axis = "y", labelsize = labelsize) ax1.yaxis.tick_left() ax1.xaxis.tick_bottom() #the smallest values are lighter and the larger values are darker shades of grey fig.colorbar(imax) fname = out_name + "_heatmap.pdf" plt.savefig(fname) plt.close(fig) print "Heatmap saved to file: %s" % fname return dist_matrix
[docs]def analyze_repl(input_file, plates, dnp, res_path, keys = ["RT", "37"], save_dist_mat = False): """ Estimate the robustness of measurements by analyzing mutants that occur multiple times in the data set. Save for each ORF mean distance between all ORF pairs and list of between-replications distances to a file named `distance_reps.csv` in directory :param:`res_path`. :param input_file: File path to multi-occurring mutants specification. :type input_file: `str` :param plates: Plates data as returned from :mod:`utilities.read` :type plates: `list` :param dnp: Preprocessed computational profiles :type dnp: `numpy.array` :param res_path: Full path to the directory where results are to be saved. :type res_path: `str` :param keys: Names of TS (temperature sensitive mutants) plates' extensions. By default, these are ["RT", "37"]. :type keys: `list` :param plot_dist_mat: Indicator whether to plot distance heatmap for each multiple times occurred mutant :type plot_dist_mat: `bool` """ print "Analysing repeating mutants" names, reps = utils.read_repl(input_file, keys) dnp_d = utils.plates2dict(plates, dnp) orf2p = defaultdict(dict) i = 0 for (orf, pn, r, c) in reps: if (pn, r, c) not in dnp_d: print "\tMissed in plates data", (orf, pn, r, c) i += 1 else: orf2p[orf][(pn, r, c)] = dnp_d[(pn, r, c)] print "Number of missed duplicates in plate data %d" % i orf_reps = 0 reps_distl = [] f = open(res_path + "distance_reps.csv", "w") for orf, multi_occ in orf2p.iteritems(): if len(multi_occ) == 1: continue orf_reps += 1 met, profiles = zip(*multi_occ.items()) reps_dnp = np.vstack(profiles) reps_dist = np.tril(euclidean_distances(reps_dnp[:, 1:], reps_dnp[:, 1:]), -1) reps_dist_hvp = reps_dist.ravel()[np.flatnonzero(reps_dist)] f.write("%s,%5.4f,(%s)\n" % (orf, np.mean(reps_dist_hvp), ",".join(map(str, reps_dist_hvp)))) reps_distl.append(reps_dist_hvp) if save_dist_mat: out_name = res_path + orf labels = [",".join(pnrc) for pnrc in met] compute_distance(reps_dnp, title = orf, out_name = out_name, labels = labels, labelsize = 12, rot = "horizontal") print "\tMultiple occurring mutants %s" % orf print "Replicates exists for %d ORFs" % orf_reps f.close() #plot of two overlayed histograms: (1) distances between replicates of same mutants, (2) distances between all observations #do not take into account the number of cells (cells #) field when computing distances tot_dist = np.tril(euclidean_distances(dnp[:, 1:], dnp[:, 1:]), -1) tot_dist_hv = tot_dist.ravel()[np.flatnonzero(tot_dist)] reps_dist_hv = np.hstack(tuple(reps_distl)) fig = plt.figure(figsize = (10, 8)) ax1 = fig.add_subplot(111) rn = reps_dist_hv.shape[0] tn = tot_dist_hv.shape[0] print "Mean observation distance in total: %5.3f, No. dist. %d" % (np.mean(tot_dist_hv), tn) print "Mean observation distance between replicates: %5.3f, No. dist. %d" % (np.mean(reps_dist_hv), rn) plt.hist( [tot_dist_hv, reps_dist_hv], 100, weights = [rn/float(tn) * np.ones_like(tot_dist_hv), np.ones_like(reps_dist_hv)], histtype='bar', label = ["All observations (mean=%5.3f)" % np.mean(tot_dist_hv), "Multiple occurring observations (mean=%5.3f)" % np.mean(reps_dist_hv)], color = ["green", "orange"]) ax1.set_xlabel("Distance") ax1.set_ylabel("Observation pairs") plt.legend() plt.savefig(res_path + "distance_reps_tot_hist.pdf") plt.close(fig) #outlier detection
[docs]def detect_outliers(dnp, plate, out_name = None, save = True): """ Detect outliers by fitting an elliptic envelope and normalizing the decision function by the number of cells in strain. The goal is to separate a core of regular observations from some polluting ones and decide whether a new observation belongs to the same distribution as existing observations (it is an inlier) or should be considered different (it is an outlier). The number of cells per strain used for computing feature values in observation additionally normalize the decision function score. :param dnp: Preprocessed computational profiles :type dnp: `numpy.array` :param plates: Plates data as returned from :mod:`utilities.read` :type plates: `list` :param out_name: The full name of the file where outliers will be saved. :type out_name: `str` :param save: Indicator whether to save outliers' identification to files. True by default. :type save: `bool` Return outlyingness of observations in :param:`dnp` and decision function according to the fitted model for elliptic envelope, respectively and reduced data set without outliers as tuple (reduced_plate, reduced_dnp). """ print "Detecting outliers" envelope = sklearn.covariance.EllipticEnvelope(contamination = 0.1) print "\tFitting elliptic envelope" envelope.fit(dnp) #distance of the samples dnp to the separating hyperplane base = np.max(dnp[:, 0]) dec_envelope = correct_outlier_dec(envelope.decision_function(dnp).ravel(), base) #predicted values: 1 (inliers), -1 (outliers) y_pred_envelope = np.array([1 if s > 0 else -1 for s in dec_envelope]) print "\tOutliers by elliptic env. (wo corr):" env_idx_wo = np.flatnonzero(envelope.predict(dnp) == -1) print "\n".join("\t\t%s" % "\t".join(plate[i][1:5] + [plate[i][-1]]) for i in env_idx_wo) print "\tOutliers by elliptic env. (w corr):" env_idx = np.flatnonzero(y_pred_envelope == -1) print "\n".join("\t\t%s" % "\t".join(plate[i][1:5] + [plate[i][-1]]) for i in env_idx) if save: print "Saving outliers to file: %s" % out_name fout = open(out_name, "a") for i in env_idx_wo: fout.write("%s\n" % ",".join(plate[i][1:5] + [plate[i][-1]])) fout.close() #reduced data set according to elliptic envelope without weight correction r_plate = [ex for i, ex in enumerate(plate) if i not in env_idx_wo] r_dnp = dnp[[i for i in xrange(dnp.shape[0]) if i not in env_idx_wo], :] assert len(r_plate) == r_dnp.shape[0], "Dimension mismatch. Plate data and computational features dimension does not match." return (y_pred_envelope, dec_envelope), (r_plate, r_dnp)
[docs]def correct_outlier_dec(X, base): """Outlier decision function correction. More cells greater the increase in decision.""" m = np.min(X) return X + np.log(X - m + 1)/np.log(base)
[docs]def outlier2cluster(out_labels, c_labels): """ Determine intersection between outliers and cluster mapping (percentage of outliers in each cluster). :param out_labels: Outliers labelling (1 = inlier, -1 = outlier). :type out_labels; `numpy.array` :param c_labels: Cluster indices of observations. :type c_labels: `numpy.array` """ print "Cluster to outliers intersection" K = np.max(c_labels) out_idx = out_labels == -1 for c in xrange(K+1): print "\tcls: %d\tintersect: %5.3f" % (c, sum(c_labels[out_idx] == c)/float(sum(c_labels == c))) #preprocessing
[docs]def standardize(dnp): """ Standardize features by removing the mean and scaling to unit variance. :param dnp: Computational profiles of strains. :type dnp: `numpy.array` """ print "Standardization" scaler = sklearn.preprocessing.Scaler(copy = True) scaler.fit(dnp) return scaler.transform(dnp)
[docs]def normalize(dnp): """ Normalize observations individually to unit L2 norm. :param dnp: Computational profiles of strains. :type dnp: `numpy.array` """ print "Normalization" normalizer = sklearn.preprocessing.Normalizer(norm = "l2", copy = True) normalizer.fit(dnp) return normalizer.transform(dnp) #dimensionality reduction techniques
[docs]def decompose_PCA(dnp, plate, n_components, title, out_name): """ Apply PCA decomposition to the data to reduce the dimensionality. Decompose a multivariate data set in a set of successive orthogonal components that explain a maximum amount of variance. In addition use the probabilistic PCA to provide a probabilistic interpretation of the PCA that can give a likelihood of the data based on the amount of the variance it explains. Return transformed data of shape [n_samples, n_components] and save 3D PCA projection to :param:`out_name`. """ pca = sklearn.decomposition.PCA(n_components = n_components) print "Fitting probabilistic PCA" pca.fit(dnp) print "Explained variance ratio:" print "\n".join("\t%s: %5.3f" % (i, var) for (i, var) in enumerate(pca.explained_variance_ratio_)) trans_pca = pca.transform(dnp) plotting.plot_pca_projection(trans_pca, plate, title, out_name) return trans_pca
[docs]def decompose_manifold(dnp, n_components): """ Apply Isomap algorithm to seek a lower dimensional embedding which maintains geodesic distances between all points. Isometric mapping is nonlinear dimensionality reduction method. Manifold learning is an approach to nonlinear dimensionality reduction. The idea is that the dimensionality of the data set is only artificially high. Manifold learning is an attempt to generalize linear dimensionality reduction such as PCA to be sensitive to nonlinear structure in the data. :param dnp: Preprocessed computational profiles of strains. :type dnp: `numpy.array` :param n_components: Number of coordinates for the manifold. :type n_componets: `int` Return transformed data of shape [n_observations, n_components]. """ isomap = sklearn.manifold.Isomap(n_neighbors = math.sqrt(dnp.size[0]), n_components = n_components) isomap.fit(dnp) dnpi = isomap.transform(dnp) return dnpi
[docs]def decompose_MDS(dnp, c_labels = None, out_dir = None, save_coordinates = True): """ Apply metric MDS for a low dimensional representation of the data in which the distances respect well to the distances in the original high dimensional space. The method constructs a distance matrix with Euclidean distance and runs MDS. The similarities are immersed in 2 dimensions (n_components = 2). Final coordinates are plotted and saved to file `mds_scatterplot.pdf` in :param:`out_dir` directory. Colors in the plot denotes classes (clusters) passed in :param:`c_labels`. :param dnp: Preprocessed computational profiles of strains. :type dnp: `numpy.array` :param c_labels: Cluster indices of observations. :type c_labels: `numpy.array` :param out_dir: Full path to output directory where plot will be saved. :type out_dir: `str` :param save_coordinates: Indicator whether to plot and save final coordinates. By default, it is set to True :type save_coordinates: `bool` Returned are positions of the observations in the embedding space. """ print "Computing Euclidean distances" K = 3000 dnp = dnp[:K if dnp.shape[0] >= K else dnp.shape[0], :] dist_matrix = euclidean_distances(dnp) import mds print "Starting MDS computation" p_mds = mds.MDS(n_components = 2, n_init = 1, max_iter = 500, verbose = 2) print "Fitting MDS" embd = p_mds.fit_transform(dist_matrix) #plottting if save_coordinates: fig = plt.figure() ax = fig.add_subplot(111) #, projection='3d') points = [] for i in xrange(dnp.shape[0]): points.append((embd[i, 0], embd[i, 1], c_labels[i])) K = int(np.max(c_labels)) cols = np.random.rand(K, 3) proxies = [] for k in xrange(K): sel = filter(lambda x: x[-1] == k, points) ax.scatter([s[0] for s in sel], [s[1] for s in sel], c = cols[k, :]) proxies.append((plt.Rectangle((0, 0), 1, 1, fc=cols[k, :]), "Cluster %d" % k)) fname = out_dir + "mds_scatterplot_%d_2d.pdf" % dnp.shape[0] plt.legend(zip(*proxies)[0], zip(*proxies)[1], scatterpoints = 1) plt.savefig(fname) print "MDS scatterplot saved to file: %s" % fname plt.close(fig) return embd #clustering
[docs]def k_means(dnp, plates, k_range, out_name_silhouette = None, out_dir_predictions = None, out_name_predictions = None, save_silhouette = True, save_predictions = True, wt_name = ["YOR202W"]): """ Apply k-Means clustering to the data. :param dnp: Preprocessed computational profiles of strains. :type dnp: `numpy.array` :param plates: Plates data as returned from :mod:`utilities.read`. :type plates: `list` :param k: The range used for testing the number of clusters. :type k: `iterable` :param out_name_silhouette: Fully qualified name of file where silhouette plot will be saved. :type out_name_silhouette: `str` :param out_dir_predictions: Full path to output directory where files with clustering predictions will be stored. :type out_dir_predictions: `str` :param out_name_predictions: Identifier of data used in the name of the clustering predictions. :type out_name_predictions: `str` :param save_silhouette: Indicator whether to plot silhouette plot and save it to file :param:`out_name`. By default, it is set to True. :type save_silhouette: `bool` :param save_predictions: Indicator whether to save predictions (cluster membership for each observation) for each number of clusters in :param:`k_range`. :type save_predictions: `bool` :param wt_name: Names of the wild-type ORFs. :type wt_name: `list` For each number of clusters compute the mean silhouette coefficient of all observations. Return clustering results for clustering with highest mean silhouette coefficient. Mean silhouette coefficient close to 1 means the datum is appropriately clustered, coefficient close to -1 means the observations have been assigned to wrong clusters. Returned are predictions for all observations in data (the closest cluster each observation belongs to), mean silhouette coefficient score of best clustering and a matrix of transformed data to cluster-distance space of shape [n_observations, k]. In the new space, each dimension is the distance to the cluster centers. """ k_best = 0 score_best = -1 print "Starting K-means clustering" for k in k_range: kmeans = sklearn.cluster.KMeans(k = k, n_init = 6, n_jobs = 1) kmeans.fit(dnp) pred = kmeans.predict(dnp) scores = silhouette(dnp, pred, k) mscore = np.mean(scores) if save_silhouette: plotting.plot_silhouette(scores = scores, c_idx = pred, K = k, out_name = out_name_silhouette) if save_predictions: f = open(out_dir_predictions + "%s_clustering_%d.csv" % (out_name_predictions, k), "w") assert len(pred) == len(plates), "Error. Number of predictions does not match with observations in plates." NK = np.max(pred) for j in xrange(NK + 1): wtn = np.sum([1 for i, el in enumerate(plates) if el[-1] in wt_name and pred[i] == j]) f.write("Cluster %d: size = %d (WT = %d)\n" % (j, np.sum(pred == j), wtn)) f.write("%s\n" % (",".join(["ORF", "plate", "date", "row", "col", "cluster"]))) for i, strain in enumerate(plates): f.write("%s,%d\n" % (",".join([strain[-1]] + strain[1:5]), pred[i])) f.close() print "\tk: %d\tscore: %5.3f\tsize: %s" % (k, mscore, " ".join("%d:%d" % (i, sum(1 for el in pred if el == i)) for i in xrange(k))) if mscore > score_best: score_best = mscore k_best = k pred_best = pred trans_best = kmeans.transform(dnp) print "Best K-means with k: %d and score: %5.3f" % (k_best, score_best) return pred_best, score_best, trans_best
[docs]def silhouette(X, c_idx, K): """ Compute the silhouette score for each observation in :param:`X`. :param c_idx: cluster indices for all observations in X. :type c_idx: `list` """ dist_matrix = euclidean_distances(X, X) k_idx = [np.flatnonzero(c_idx == k) for k in xrange(K)] #compute a, b for each observation a = np.zeros(X.shape[0]) b = np.zeros(X.shape[0]) for i in xrange(X.shape[0]): #instances in the same cluster other than instance itself a[i] = np.mean([dist_matrix[i, ind] for ind in k_idx[c_idx[i]] if ind != i]) a[i] = 0. if np.isnan(a[i]) else a[i] #instances in other clusters b[i] = np.min([np.mean(dist_matrix[i, ind]) for k, ind in enumerate(k_idx) if c_idx[i] != k]) b[i] = 0. if np.isnan(b[i]) else b[i] return (b-a)/np.maximum(a, b) #feature selection for unsupervised learning
[docs]def fss_wrapper(dnp, plates, attr_names, out_dir): """ Wrapper approach to unsupervised feature subset selection. The idea is to cluster the data as best we can in each candidate feature subspace and select the most "interesting" subspace with the minimum number of features. Each candidate subspace is evaluated by assessing resulting clusters and feature subset using chosen feature selection criterion. This process is repeated until the best feature subset with its corresponding clusters is found. The wrapper approach divides the task into three components: #. feature search: sequential forward (greedy) #. clustering algorithm: k-means with inertia scoring #. feature subset evaluation: silhouette coefficient :param dnp: Preprocessed computational profiles of strains. :type dnp: `numpy.array` :param plates: Plates data as returned from :mod:`utilities.read`. :type plates: `list` :param attr_names: Names of features corresponding to columns in :param:`dnp`. :type attr_names: `list` :param out_dir: Full path to output directory where feature subspace descriptions and cluster memberships will be saved. :type out_dir: `str` To directory :param:`out_dir` is saved file named `fss_subsets_scores.csv` which contains description of feature subspaces (the name of features used in each candidate space) and achieved mean silhouette coefficient. To directory :param:`out_dir` is saved file named `fss_best_subset.csv` which contains description of feature subspace with highest silhouette coefficient and its score. To directory :param:`out_dir` is saved file named `fss_best_clustering.csv` which contains strain identifiers and their cluster memberships for all observations. Returned are predictions for all observations in :param:`dnp` (the cluster each observation belongs to) as specified by the best evaluated feature subspace, score of best candidate feature space and the names of the features from the best candidate space. """ attr_cur = set() score_cur = -1e6 attr_best = set() pred_best = [] score_best = score_cur attr2idx = {attr: i for i, attr in enumerate(attr_names)} f_subsets = open(out_dir + "fss_subsets_scores.csv", "wr") f_bsubset = open(out_dir + "fss_best_subset.csv", "wr") f_bcls = open(out_dir + "fss_best_clustering.csv", "wr") f_bcls.write("%s\n" % ",".join(["ORF", "plate", "date", "row", "col", "cluster"])) print "Starting with FSS wrapper approach" for step in xrange(50): res_tmp = [] seq, tot = 0, len(attr_names) - len(attr_cur) for attr in set(attr_names).difference(attr_cur): seq +=1 idxs = [attr2idx[at] for at in attr_cur] + [attr2idx[attr]] dnp_fss = dnp[:, idxs] k_tmp = [] for k in xrange(3, 8): _, pred, inertia = sklearn.cluster.k_means(dnp_fss, k = k, n_init = 3) k_tmp.append((inertia, pred, k)) k_tmp.sort() pred = k_tmp[0][1] fidx = [] for i in xrange(k_tmp[0][2]): ridx = np.nonzero(pred == i)[0] np.random.shuffle(ridx) fidx.extend(ridx[:200].tolist() if len(ridx) > 200 else ridx.tolist()) score = silhouette_score(dnp[fidx, :], pred[fidx], metric = "euclidean") print "\t\tadding %d/%d attr: %s gives silhouette: %5.3f" % (seq, tot, attr, score) res_tmp.append((score, attr, pred)) res_tmp.sort(reverse = True) #sequentially add one feature a time. The feature added is the one that #provides the largest criterion value when used in combination with the #features chosen. print "\tCurrent feature space: %s" % ", ".join(attr_cur) attr_cur.add(res_tmp[0][1]) print "\tAdded feature: %s" % res_tmp[0][1] score_cur = res_tmp[0][0] f_subsets.write("%5.4f,%s\n" % (score_cur, ",".join(attr_cur))) if score_cur > score_best: attr_best = attr_cur.copy() score_best = score_cur pred_best = res_tmp[0][2] else: pass #print "The FSS is stopped because adding more features does not improve feature criteria." #break print "\tFinished step: %d, best score: %5.4f, space: %s" % (step, score_best, ",".join(attr_best)) print f_bsubset.write("%5.4f,%s\n" % (score_best, ",".join(attr_best))) assert len(pred_best) == dnp.shape[0] == len(plates), "Dimension mismatch." for i, strain in enumerate(plates): cls = pred_best[i] f_bcls.write("%s,%d\n" % (",".join([strain[-1]] + strain[1:5]), cls)) f_subsets.close() f_bsubset.close() f_bcls.close() return pred_best, score_best, attr_best #novelty detection
[docs]def detect_novelties_SVM(dnp_wt, dnp_mt, plates_wt, plates_mt, out_dir, save_visualization = True): """ Novelty detection to decide whether a new observation belongs to the same distribution as existing observations or should be considered as different. The training data are profiles of WT strains (preprocessed, that is with removed outliers and standardized features). We are interested in detecting anomalies in mutant strains. Identification of strains with non-WT phenotype. In addition to clustering of WT and non-WT strains and analyzing cluster memberships, one can perform novelty detection to address whether new observation (non-WT strain) is so different from the training set (WT strains), that we can doubt it is regular (e.g. it comes from different distirbution). It is used one-class SVM, a semi supervised algorithm that learns a decision function for novelty detection. :param dnp_wt: Preprocessed computational profiles of WT strains. :type dnp_wt: `numpy.array` :param dnp_mt: Preprocesed computational profiles of MT strains. :type dnp_mt: `numpy.array` :param plates_wt: Plates data of WT strains as returned from :mod:`utilities.read`. :type plates_wt: `list` :param plates_mt: Plates data of MT strains as returned from :mod:`utilities.read`. :type plates_mt: `list` :param out_dir: Full path to output directory where novelty detection results will be saved. :type out_dir: `str` :param save_visualization: Indicator whether to visualize training and test observations with labels in a low dimensionl representation. By default, it is set to True. :type save_visualization: `bool` To directory :param:`out_dir` is saved file named `novelty_detection_SVM.csv` which contains strain identifiers and predictions (predicted class for each observation and observation distance to the separating hyperplane) for all non-WT observations. Predicted classes are +1 (regular novel observations) and -1 (abnormal novel observations). Higher distance to the separating hyperplane indicates greater confidence in prediction. To directory :param:`out_dir` is saved plot `novelty_detection_SVM.pdf` visualizing training and test observations with learned labels if :param:`save_visualization` is set. A low (2D) dimensional representation is obtained by MDS. .. note:: If visualization option is enabled, optimization process in MDS can increase duration. .. seealso:: See also function :func:`sa.methods.detect_novelties_GMM`. """ assert dnp_wt.shape[0] == len(plates_wt), "Dimension mismatch" assert dnp_mt.shape[0] == len(plates_mt), "Dimension mismatch" print "Fitting one-class SVM" clf = svm.OneClassSVM(nu = 0.1, kernel = "sigmoid", gamma = 0.1) clf.fit(dnp_wt) print "Predicting one-class SVM for WT and MT strains" pred_wt = clf.predict(dnp_wt) pred_mt = clf.predict(dnp_mt) #distance of the samples to the separating hyperplane dec_fun = clf.decision_function(dnp_mt) print "Training error (spurious WT strains): %d" % pred_wt[pred_wt == -1].size print "Normal MT strains: %d " % pred_mt[pred_mt == 1].size print "Abnormal MT strains: %d" % pred_mt[pred_mt == -1].size #save predictions to file fname = out_dir + "novelty_detection_SVM.csv" print "Saving one-class SVM novelties predictions to file: %s" % fname f = open(fname, "w") f.write("%s\n" % ",".join(["ORF", "plate", "date", "row", "col", "class", "score"])) for i in xrange(len(pred_mt)): f.write("%s,%d,%5.5f\n" % (",".join([plates_mt[i][-1]] + plates_mt[i][1:5]), pred_mt[i], dec_fun[i])) f.close() #####cluster abnormal MT strains dnp_mt_abn = dnp_mt[pred_mt == -1, :] idx_mt_abn = [i for i in xrange(len(pred_mt)) if pred_mt[i] == -1] plates_mt_abn = [el for i, el in enumerate(plates_mt) if pred_mt[i] == -1] kmeans = sklearn.cluster.KMeans(k = 3, n_init = 6, n_jobs = 1) kmeans.fit(dnp_mt_abn) pred_mt_abn = kmeans.predict(dnp_mt_abn) embd_wt = decompose_MDS(dnp_wt, save_coordinates = False) embd_mt = decompose_MDS(dnp_mt, save_coordinates = False) idx_c0 = [idx_mt_abn[i] for i in xrange(len(idx_mt_abn)) if pred_mt_abn[i] == 0] idx_c1 = [idx_mt_abn[i] for i in xrange(len(idx_mt_abn)) if pred_mt_abn[i] == 1] idx_c2 = [idx_mt_abn[i] for i in xrange(len(idx_mt_abn)) if pred_mt_abn[i] == 2] for cls, idxs in [(0, idx_c0), (1, idx_c1), (2, idx_c2)]: fname = out_dir + "novelty_detection_abnormal_MT_cluster%d_SVM.csv" % cls f = open(fname, "w") f.write("%s\n" % ",".join(["ORF", "plate", "date", "row", "col", "class", "score"])) for i in idxs: f.write("%s,%d,%5.5f\n" % (",".join([plates_mt[i][-1]] + plates_mt[i][1:5]), pred_mt[i], dec_fun[i])) f.close() n = embd_mt.shape[0] idx_c0 = [el for el in idx_c0 if el < n] idx_c1 = [el for el in idx_c1 if el < n] idx_c2 = [el for el in idx_c2 if el < n] plt.figure() plt.title("Profiles of abnormal MT strains clustered (yellow, magenta, blue)") b1 = plt.scatter(embd_wt[:, 0], embd_wt[:, 1], c = "green") b2 = plt.scatter(embd_mt[:, 0], embd_mt[:, 1], c = "orange") c0 = plt.scatter(embd_mt[idx_c0, 0], embd_mt[idx_c0, 1], c = "yellow") c1 = plt.scatter(embd_mt[idx_c1, 0], embd_mt[idx_c1, 1], c = "magenta") c2 = plt.scatter(embd_mt[idx_c2, 0], embd_mt[idx_c2, 1], c = "blue") plt.axis('tight') plt.xlim((np.min(np.r_[embd_wt[:, 0], embd_mt[:, 0]]), np.max(np.r_[embd_wt[:, 0], embd_mt[:, 0]]))) plt.ylim((np.min(np.r_[embd_wt[:, 1], embd_mt[:, 1]]), np.max(np.r_[embd_wt[:, 1], embd_mt[:, 1]]))) plt.legend([b1, b2, c0, c1, c2], ["WT strains", "regular MT strains", "abnormal MT strains 0", "abnormal MT strains 1", "abnormal MT strains 2"], loc="upper left", prop = font_manager.FontProperties(size=11)) fname = out_dir + "novelty_detection_abnormal_MT_clusters_SVM.pdf" plt.savefig(fname) plt.close(fig) ##### #visualize if save_visualization: print "Low dimensional projection and one-class SVM visualization." embd_wt = decompose_MDS(dnp_wt, save_coordinates = False) embd_mt = decompose_MDS(dnp_mt, save_coordinates = False) plt.figure() plt.title("Novelty Detection") b1 = plt.scatter(embd_wt[:, 0], embd_wt[:, 1], c = "green") b2 = plt.scatter(embd_mt[:, 0], embd_mt[:, 1], c = "orange") c = plt.scatter(embd_mt[pred_mt[:embd_mt.shape[0]] == -1, 0], embd_mt[pred_mt[:embd_mt.shape[0]] == -1, 1], c = "red") plt.axis('tight') plt.xlim((np.min(np.r_[embd_wt[:, 0], embd_mt[:, 0]]), np.max(np.r_[embd_wt[:, 0], embd_mt[:, 0]]))) plt.ylim((np.min(np.r_[embd_wt[:, 1], embd_mt[:, 1]]), np.max(np.r_[embd_wt[:, 1], embd_mt[:, 1]]))) plt.legend([b1, b2, c], ["WT strains", "regular MT strains", "abnormal MT strains"], loc="upper left", prop = font_manager.FontProperties(size=11)) fname = out_dir + "novelty_detection_SVM.pdf" print "Saving one-class SVM novelties detection plot to file: %s" % fname plt.savefig(fname) plt.close(fig)
[docs]def detect_novelties_GMM(dnp_wt, dnp_mt, plates_wt, plates_mt, out_dir, save_visualization = True, wt_name = ["YOR202W"]): """ Novelty detection using variational inference for the GMM (Gaussian Mixture Model). :param dnp_wt: Preprocessed computational profiles of WT strains. :type dnp_wt: `numpy.array` :param dnp_mt: Preprocesed computational profiles of MT strains. :type dnp_mt: `numpy.array` :param plates_wt: Plates data of WT strains as returned from :mod:`utilities.read`. :type plates_wt: `list` :param plates_mt: Plates data of MT strains as returned from :mod:`utilities.read`. :type plates_mt: `list` :param out_dir: Full path to output directory where novelty detection results will be saved. :type out_dir: `str` :param save_visualization: Indicator whether to visualize training and test observations with labels in a low dimensionl representation. By default, it is set to True. :type save_visualization: `bool` :param wt_name: Names of the wild-type ORFs. :type wt_name: `list` To directory :param:`out_dir` is saved file named `novelty_detection_GMM.csv` which contains strain identifiers and predictions (predicted class for each observation and predicted posterior probability of observation for each Gaussian state in the model) for all non-WT observations. To directory :param:`out_dir` is saved plot `novelty_detection_GMM.pdf` visualizing training and test observations with learned labels if :param:`save_visualization` is set. A low (2D) dimensional representation is obtained by MDS. .. seealso:: function :func:`sa.methods.detect_novelties_SVM`. """ assert dnp_wt.shape[0] == len(plates_wt), "Dimension mismatch" assert dnp_mt.shape[0] == len(plates_mt), "Dimension mismatch" print "Fitting GMM" clf = sklearn.mixture.GMM(n_components = 2, covariance_type = "full") clf.fit(dnp_wt) print "Predicting GMM for WT strains" #predict labels for data pred_wt = clf.predict(dnp_wt) #predict posterior probability of data under each Gaussian in the model proba_wt = clf.predict_proba(dnp_wt) print "Predicting GMM for MT strains" #predict labels for data pred_mt = clf.predict(dnp_mt) #predict posterior probability of data under each Gaussian in the model proba_mt = clf.predict_proba(dnp_mt) cls = [] print "No. of available components: %d" % clf.n_components print "Components sizes:" for i in xrange(clf.n_components): wtn = np.sum([1 for k, el in enumerate(plates_wt) if el[-1] in wt_name and pred_mt[k] == i]) cls.append((np.sum(pred_mt == i), i)) print "\t comp. no. %d: size = %d (WT = %d)" % (i, cls[-1][0], wtn) cls.sort() cls = cls[0][1] #save predictions to file fname = out_dir + "novelty_detection_GMM.csv" print "Saving GMM predictions to file: %s" % fname f = open(fname, "w") f.write("%s\n" % ",".join(["ORF", "plate", "date", "row", "col", "class", "proba"])) for plates, pred, proba in [[plates_mt, pred_mt, proba_mt], [plates_wt, pred_wt, proba_wt]]: for i in xrange(len(pred)): f.write("%s,%d,%s\n" % (",".join([plates[i][-1]] + plates[i][1:5]), pred[i], ",".join(["%5.5f" % el for el in proba[i]]))) f.close() #visualize if save_visualization: print "Low dimensional projection and GMM visualization." embd_wt = decompose_MDS(dnp_wt, save_coordinates = False) embd_mt = decompose_MDS(dnp_mt, save_coordinates = False) plt.figure() plt.title("Novelty Detection") b1 = plt.scatter(embd_wt[:, 0], embd_wt[:, 1], c = "green") b2 = plt.scatter(embd_mt[:, 0], embd_mt[:, 1], c = "orange") c = plt.scatter(embd_mt[pred_mt[:embd_mt.shape[0]] == cls, 0], embd_mt[pred_mt[:embd_mt.shape[0]] == cls, 1], c = "red") plt.axis('tight') plt.xlim((np.min(np.r_[embd_wt[:, 0], embd_mt[:, 0]]), np.max(np.r_[embd_wt[:, 0], embd_mt[:, 0]]))) plt.ylim((np.min(np.r_[embd_wt[:, 1], embd_mt[:, 1]]), np.max(np.r_[embd_wt[:, 1], embd_mt[:, 1]]))) plt.legend([b1, b2, c], ["WT strains", "regular MT strains", "abnormal MT strains"], loc="upper left", prop = font_manager.FontProperties(size=11)) fname = out_dir + "novelty_detection_GMM.pdf" print "Saving GMM novelties detection plot to file: %s" % fname plt.savefig(fname) plt.close(fig)