#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division
from __future__ import print_function
import os
# import itertools
import numpy as np
import pandas as pd
import networkx as nx
# from functools import partial
from operator import itemgetter
from sklearn.utils import check_X_y
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from sklearn.model_selection import check_cv
from sklearn.model_selection import LeaveOneOut
from sklearn.model_selection import cross_val_score
from sklearn.base import is_classifier
from sklearn.base import clone
from sklearn.base import BaseEstimator
from sklearn.base import ClassifierMixin
from sklearn.metrics import check_scoring
from sklearn.preprocessing import LabelEncoder
from sklearn.naive_bayes import GaussianNB
from DNetPRO.lib.DNetPRO import _DNetPRO_couples
try:
# sklearn >= 1.1
from sklearn.utils.metaestimators import available_if
except ImportError:
# fallback for old sklearn versions
from sklearn.utils.metaestimators import if_delegate_has_method
def available_if(check):
# adapter: ignore the predicate and use old decorator
def decorator(func):
return if_delegate_has_method(delegate='estimator')(func)
return decorator
def _estimator_has(attr):
def check(self):
# use estimator_ in DNetPRO since it has its fit method
return hasattr(self.estimator_, attr)
return check
__author__ = ['Nico Curti']
__email__ = ['nico.curti2@unibo.it']
__all__ = ['DNetPRO']
def _estimator_has (attr) :
def check (self) :
return hasattr(self.estimator, attr)
return check
[docs]
class DNetPRO (BaseEstimator, ClassifierMixin):
'''
DNetPRO feature selection algorithm
Parameters
----------
estimator : object
A supervised learning estimator with a ``fit`` method that provides
information about feature importance either through a ``coef_``
attribute or through a ``feature_importances_`` attribute.
cv : int, cross-validation generator or an iterable, optional
Determines the cross-validation splitting strategy.
Possible inputs for cv are:
- None, to use the default 3-fold cross-validation,
- integer, to specify the number of folds.
- An object to be used as a cross-validation generator.
- An iterable yielding train/test splits.
For integer/None inputs, if ``y`` is binary or multiclass,
:class:`sklearn.model_selection.StratifiedKFold` is used. If the
estimator is a classifier or if ``y`` is neither binary nor multiclass,
:class:`sklearn.model_selection.KFold` is used.
Refer to scikit-learn cross_validation for the various
cross-validation strategies that can be used here.
scoring : string, callable or None, optional, default: None
A string (see model evaluation documentation) or
a scorer callable object / function with signature
``scorer(estimator, X, y)``.
max_chunk : int, default=100
Max number of features allowed in performances-chunk. If the size of chunk is greater than max_chunk
and it is not the first one, features selection iteration is stopped.
percentage : float, default=0.1
Percentage of couples to save after sorting
verbose : int, default=0
Controls verbosity of couples evaluation.
n_jobs : int, default 1
Number of cores to run in parallel while fitting across folds.
Defaults to 1 core. If `n_jobs=-1`, then number of jobs is set
to number of cores.
Example
-------
>>> import numpy as np
>>> from DNetPRO import DNetPRO
>>> from sklearn.naive_bayes import GaussianNB
>>>
>>> Nprobe, Nsample = (5, 4)
>>>
>>> X = np.random.uniform(low=0., high=1., size=(Nsample, Nprobe))
>>> y = np.array(['A', 'A', 'B', 'B'])
>>>
>>> dnet = DNetPRO(estimator=GaussianNB(), max_chunk=4)
>>> dnet.fit(X, y)
>>>
>>> print(dnet.signatures)
Notes
-----
.. note::
The full computation of couples is performed via C++ multithreading thus set an appropriated number of threads
to speed up the execution.
References
----------
- Curti N., Giampieri E., Levi G., Castellani G., Remondini D.; DNetPRO: A network approach for low-dimensional signatures from high-throughput data; bioRxiv 773622; doi: https://doi.org/10.1101/773622
- Mizzi C., Fabbri A., Rambaldi S. et al.; Unraveling pedestrian mobility on a road network using ICTs data during great tourist events. EPJ Data Sci. 7, 44 (2018); https://doi.org/10.1140/epjds/s13688-018-0168-2
- Boccardi V., Paolacci L., Remondini D., Giampieri E., Poli G., Curti N., Cecchetti R., Villa A., Ruggiero C., Brancorsini S., Mecocci P.; Cognitive Decline and Alzheimer's Disease in Old Age: A Sex-Specific Cytokinome Signature. J Alzheimers Dis. 2019;72(3):911-918. doi: 10.3233/JAD-190480. PMID: 31658056.
- Malvisi M., Curti N., Remondini D., De Iorio MG., Palazzo F., Gandini G., Vitali S., Polli M., Williams JL., Minozzi G. Combinatorial Discriminant Analysis Applied to RNAseq Data Reveals a Set of 10 Transcripts as Signatures of Exposure of Cattle to Mycobacterium avium subsp. paratuberculosis. Animals (Basel). 2020 Feb 5;10(2):253. doi: 10.3390/ani10020253. PMID: 32033399; PMCID: PMC7070263.
- R. Biondi, G. Gravante, D. Remondini, S. Peluso, S. Cominetti, F. D'Amore, M. Bignami, A.D. Arosio, N. Curti; Towards Precision Medicine in Sinonasal Tumors: Low-Dimensional Radiomic Signature Extraction from MRI. Diagnostics (2025); doi: 10.3390/diagnostics15131675.
'''
def __init__ (self, estimator=GaussianNB(), cv=LeaveOneOut(), scoring=None, max_chunk=100, percentage=.1, verbose=False, n_jobs=1):
if not (0. < percentage <= 1.):
raise ValueError(f"'percentage' must be > 0 and <= 1. Given {percentage}")
if n_jobs < -1 or n_jobs == 0:
raise ValueError(f"'n_jobs' must be a positive integer (use -1 for the maximum number of CPUs). Given {n_jobs}")
# set the correct value of workers
n_jobs = min(n_jobs, os.cpu_count())
if max_chunk < 0:
raise ValueError(f"'max_chunk' must be >= 0. Given: {max_chunk}")
if not is_classifier(estimator):
raise ValueError(f"'estimator' must be a sklearn-like classifier. Given {estimator}")
scoring = check_scoring(estimator, scoring=scoring)
self.estimator = estimator
self.cv = cv
self.scoring = scoring
self.max_chunk = max_chunk
self.percentage = percentage
self.verbose = verbose
self.n_jobs = n_jobs
[docs]
@staticmethod
def pendrem (graph):
'''
Remove pendant node iterativelly
Parameters
----------
graph: graph
A NetworkX graph
Returns
-------
pruned: graph
The same graph without pendant nodes
Example
-------
>>> import networkx as nx
>>> G = nx.star_graph(n=10)
>>> print(G.nodes)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> pruned = penderem(G)
>>> print(pruned.nodes)
[0]
'''
deg = graph.degree()
while min(map(itemgetter(1), deg)) < 2:
G = graph.copy()
graph.remove_nodes_from(
[n for n, d in deg if d < 2]
)
deg = graph.degree()
if len(deg) == 0:
return G
return graph
@property
def _estimator_type (self):
'''
Get the estimator type
'''
return self.estimator._estimator_type
def _check_chunk (self, X):
'''
Check input parameters
Parameters
----------
X : array of shape [n_samples, n_features]
The input samples.
Raises
------
ValueError if max_chunk <= number of combination (evaluated as Nprobe * (Nprobe - 1) / 2)
'''
_, Nprobe = np.shape(X)
Ncomb = Nprobe * (Nprobe - 1) >> 1
if not (self.max_chunk <= Ncomb):
raise ValueError(
f"'max_chunk' must be <= possible features combinations. Given: {self.max_chunk}"
)
[docs]
@staticmethod
def label2numbers (arr):
'''
Convert labels to numerical values
Parameters
----------
arr : array_like
The array of labels
Returns
-------
numeric_labels : np.ndarray
Array of numerical labels obtained by the LabelEncoder transform
Notes
-----
.. note::
The C++ function allows only numerical (integer) values as labels in input.
For more general support refers to the C++ example.
Examples
--------
>>> from DNetPRO import DNetPRO
>>> y = ('A', 'A', 'B', 'B')
>>> num_y = DNetPRO.label2numbers(y)
>>> print(num_y)
[0, 0, 1, 1]
'''
le = LabelEncoder()
le.fit(arr)
numeric_labels = le.transform(arr)
return numeric_labels.astype(np.int32)
def _evaluate_couples (self, X, y):
'''
Evaluate feature couples using GaussianNB classifier using the C++ support function.
Parameters
----------
X : array-like of shape (Nsample, Nprobe)
The training input samples
y : array-like, shape (Nsample, )
The target values (converted to unique integers)
Returns
-------
performances : pandas dataframe
Dataframe of ordered results with columns (feature_1, feature_2, performances).
The variable pairs are sorted in ascending order according to the performance values.
Notes
-----
.. note::
This function prepares the input according to the C++ requirements and it call the
cython wrap of the method
'''
# Nsample, _ = np.shape(X)
y = np.asarray(y)
if not np.issubdtype(y.dtype, np.integer):
y = DNetPRO.label2numbers(y)
self.X = check_array(X, dtype=np.float32)
# set contiguous order memory for c++ compatibility
self.y = np.ascontiguousarray(y)
X = np.ascontiguousarray(self.X.T)
# pay attention to the transposition
score = _DNetPRO_couples(X, self.y, self.percentage, self.verbose, self.n_jobs)
performances = score.score
return performances
# def _couple_evaluation (self, couple, data, labels):
# '''
# Evaluate couples of features with a LOOCV
# '''
# f1, f2 = couple
#
# samples = data[:, [f1, f2]]
# score = cross_val_score(self.estimator, samples, labels,
# cv=LeaveOneOut(), n_jobs=1).mean()
# return (f1, f2, score * 100.)
# def _couple_pooling (self, data, labels):
# '''
# Compute the DNetPRO couples in pure Python
# '''
# Nsample, Nfeature = data.shape
# couples = itertools.combinations(range(0, Nfeature), 2)
#
# couple_eval = partial(self._couple_evaluation, data=data, labels=labels)
#
# scores = list(map(couple_eval, couples))
# scores = sorted(scores, key=lambda x : x[2], reverse=True)
# return scores
[docs]
def connected_component_subgraphs (G):
'''
Generator of connected components compatible
with old and new networkx versions
Parameters
----------
G : graph
A networkx graph
Returns
-------
subgraph : graph
A subgraph (networkx like) of the input
'''
for c in nx.connected_components(G):
yield G.subgraph(c)
[docs]
def fit (self, X, y=None, **fit_params):
'''
Fit the DNetPRO model meta-transformer
Parameters
----------
X : array-like of shape (n_samples, n_features)
The training input samples.
y : array-like, shape (n_samples,)
The target values (integers that correspond to classes in
classification, real numbers in regression).
**fit_params : Other estimator specific parameters
Returns
-------
self : object
Returns self.
'''
X, y = check_X_y(X, y)
self._check_chunk(X)
# Initialization
cv = check_cv(cv=self.cv, y=y, classifier=is_classifier(self.estimator))
scorer = check_scoring(self.estimator, scoring=self.scoring)
couples = self._evaluate_couples(X, y, **fit_params)
couples = pd.DataFrame(
data=couples,
columns=['feature_1', 'feature_2', 'performances']
)
# compute the max of performances
max_perf = couples.performances.max()
# group-by variables by the same performance
chunks = (couples.groupby('performances', sort=False))
# resulting signature
graph = nx.Graph()
# results
self.signatures = []
for perf, chunk in chunks:
if len(chunk) >= self.max_chunk and perf != max_perf:
break
graph.add_weighted_edges_from(chunk.values)
sub_graphs = iter(DNetPRO.connected_component_subgraphs(graph))
for comp in sub_graphs:
g = DNetPRO.pendrem(comp.copy())
if len(g.nodes) != 0:
comp = g
sub_data = X[:, comp.nodes]
score = cross_val_score(self.estimator, sub_data,
y=self.y, cv=cv,
scoring=scorer).mean()
self.signatures.append({
'number_of_features' : len(comp.nodes()),
'performance_couples' : perf,
'features' : list(comp.nodes()),
'signature' : comp,
'score' : score
})
self.estimator_ = clone(self.estimator)
self.estimator_.fit(self.transform(self.X), self.y)
return self
[docs]
def get_signature (self):
'''
Return the computed signature in ascending order (training score value)
'''
check_is_fitted(self, 'estimator_')
return sorted(self.signatures, key=lambda x : x['score'], reverse=True)
[docs]
def set_signature (self, index):
'''
Set the signature as selected features and fit the model
Parameters
----------
index : int
Index of the signatures array
'''
check_is_fitted(self, 'estimator_')
if not 0 <= index < len(self.signatures):
raise ValueError('Signature index out of range')
self.selected_signature = self.get_signature()[index]['features']
[docs]
@available_if(_estimator_has("predict"))
def predict (self, X):
'''
Reduce X to the selected features and then predict using the
underlying estimator.
Parameters
----------
X : array of shape [n_samples, n_features]
The input samples.
Returns
-------
y : array of shape [n_samples]
The predicted target values.
'''
check_is_fitted(self, 'estimator_')
return self.estimator_.predict(self.transform(X))
# def predict_transform (self, X_train, y_train, X_test, y_test):
# '''
# Fit the DNetPRO model meta-transformer and apply the data transformation,
# i.e feature selection, and then compute the score on test set
# Parameters
# ----------
# X : array-like of shape (n_samples, n_features)
# The training input samples.
# y : array-like, shape (n_samples,)
# The target values (integers that correspond to classes in
# classification, real numbers in regression).
# **fit_params : Other estimator specific parameters
# Returns
# -------
# Xnew : array-like of shape (n_sample, n_signature_features)
# The data filtered according to the best features found by the model
# score : float
# Accuracy score over the test set (X_test)
# Notes
# -----
# The signature is selected as the signature with highest score on test (X_test) data.
# '''
# self.fit(X_train, y_train)
# scores = [self.estimator.fit(X_train[:, sign['features']], y_train).score(X_test[:, sign['features']], y_test)
# for sign in self.signatures]
# index = np.argmax(scores)
# self.set_signature(index)
# self.estimator_.fit(self.transform(self.X), self.y)
# return (Xnew, scores[index])
[docs]
@available_if(_estimator_has('score'))
def score (self, X, y):
'''
Reduce X to the selected features and then return the score of the
underlying estimator.
Parameters
----------
X : array of shape [n_samples, n_features]
The input samples.
y : array of shape [n_samples]
The target values.
'''
return self.estimator_.score(self.transform(X), y)
[docs]
@available_if(_estimator_has('decision_function'))
def decision_function (self, X):
check_is_fitted(self, 'estimator_')
return self.estimator_.decision_function(self.transform(X))
[docs]
@available_if(_estimator_has('predict_proba'))
def predict_proba (self, X):
check_is_fitted(self, 'estimator_')
return self.estimator_.predict_proba(self.transform(X))
[docs]
@available_if(_estimator_has("predict_log_proba"))
def predict_log_proba (self, X):
check_is_fitted(self, 'estimator_')
return self.estimator_.predict_log_proba(self.transform(X))
def __repr__ (self):
'''
Object representation
'''
class_name = self.__class__.__qualname__
params = self.__init__.__code__.co_varnames
params = set(params) - {'self'}
args = ', '.join(['{}={}'.format(k, str(getattr(self, k))) for k in params])
return '{}({})'.format(class_name, args)
if __name__ == '__main__':
from sklearn.model_selection import train_test_split
X = pd.read_csv('../example/data.txt', sep='\t', index_col=0, header=0)
y = np.asarray(X.columns.astype(float).astype(int))
X = X.transpose()
X_train, X_test, y_train, y_test = train_test_split(X.values, y, test_size=0.33, random_state=42)
dnet = DNetPRO(estimator=GaussianNB(), n_jobs=4, verbose=True)
Xnew = dnet.fit_transform(X_train, y_train)
print('Best Signature: {}'.format(dnet.get_signature()[0]))
print('Score: {:.3f}'.format(dnet.score(X_test, y_test)))