import pathlib
from pkg_resources import resource_filename
from typing import List, Literal
import numpy as np
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.preprocessing import FunctionTransformer
from .features import IndentationFeatures
from .regressors import reg_dict, reg_names, reg_trees
[docs]
class IndentationRater(IndentationFeatures):
def __init__(self, regressor=None, scale=None, lda=None,
training_set=None, names=None,
weight=True, sample_weight=None,
*args, **kwargs):
"""Rate quality
Parameters
----------
regressor: sciki-learn RegressorMixin
The regressor used for rating
scale: bool
If True, apply a Standard Scaler. If a regressor based on
decision trees is used, the Standard Scaler is not used
by default, otherwise it is.
lda: bool
If True, apply a Linear Discriminant Analysis (LDA). If a
regressor based on a decision tree is used, LDA is not
used by default, otherwise it is.
training_set: tuple of (X, y)
The training set (samples, response)
names: list of str
Feature names to use
weight: bool
Weight the input samples by the number of occurrences
or with `sample_weight`. For tree-based classifiers, set this
to True to avoid bias.
sample_weight: list-like
The sample weights. If set to `None` sample weights
are computed from the training set.
*args: list
Positional arguments for :class:`IndentationFeatures`
**kwargs:
Keyword arguments for :class:`IndentationFeatures`
See Also
--------
sklearn.preprocessing.StandardScaler:
Standard scaler
sklearn.discriminant_analysis.LinearDiscriminantAnalysis:
Linear discriminant analysis
nanite.rate.regressors.reg_trees:
List of regressors that are identified as tree-based
"""
if regressor is not None:
_name = regressor.__class__.__name__
if lda is None:
lda = False if _name in reg_trees else True
if scale is None:
scale = False if _name in reg_trees else True
# training set
if training_set is None:
# default
training_set = self.load_training_set(names=names)
# sample weights
if sample_weight is None:
sample_weight = self.compute_sample_weight(*training_set)
steps = []
# scaling (does not affect decision trees / random forests)
if scale:
steps.append(StandardScaler())
# linear discriminant analysis
if lda:
steps.append(LinearDiscriminantAnalysis())
if regressor is not None:
steps.append(regressor)
if len(steps) == 0:
dummy = FunctionTransformer(lambda x: x)
steps.append(dummy)
#: sklearn pipeline with transforms (and regressor if given)
self.pipeline = make_pipeline(*steps)
fit_params = {}
if regressor is not None and weight:
# set weighting for regressor
key = "{}__sample_weight".format(self.pipeline.steps[-1][0])
fit_params[key] = sample_weight
if regressor is not None:
self.pipeline.fit(*training_set, **fit_params)
names = self.get_feature_names(names=names, which_type="all")
#: feature names used by the regressor pipeline
self.names = sorted(names)
super(IndentationRater, self).__init__(*args, **kwargs)
def _pre_rate(self, bsample):
"""exclude based on boolean training set"""
if np.sum(bsample == 0):
# bad curve
return False
else:
# good curve
return True
def _rate(self, sample):
gd = self.pipeline.predict(np.atleast_2d(sample))
return gd[0]
[docs]
@staticmethod
def compute_sample_weight(X, y):
"""Weight samples according to occurrence in y"""
if not np.all(np.array(y, dtype=int) == y):
msg = "Only integer ratings allowed."
raise NotImplementedError(msg)
weight = np.zeros(y.shape[0], dtype=float)
for ii in range(11):
idxii = y == ii
occur = np.sum(idxii)
if occur:
# Sometimes the training set is not large enough.
# If no occurences were found, the weights remain
# zero.
weight[idxii] = 1 / occur
# normalize
weight /= np.sum(weight)
return weight
[docs]
@staticmethod
def get_training_set_path(label="zef18"):
"""Return the path to a training set shipped with nanite
Training sets are stored in the `nanite.rate`
module path with ``ts_`` prepended to `label`.
"""
data_loc = "nanite.rate"
resp_path = resource_filename(data_loc, "ts_{}".format(label))
return resp_path
[docs]
@classmethod
def load_training_set(
cls,
path: pathlib.Path | str = None,
names: List[str] = None,
which_type: Literal["all", "binary", "continuous"] | List = None,
replace_inf: bool = True,
impute_zero_rated_nan: bool = True,
remove_nan: bool = True,
ret_names: bool = False):
"""Load a training set from a directory
Parameters
----------
path: pathlib.Path or str
Optional path to the training set directory. If none
is specified, the default "zef18" is loaded.
names: list of str
List of features to use, defaults to all features.
which_type: str
Which type of feature to return see :const:`.VALID_FEATURE_TYPES`
for valid options. By default, only the "continuous" features
are imported. The "binary" features are not needed for training;
they are used to sort out new force-distance data.
replace_inf: bool
Replace infinity-valued feature values with
`2 * sign * max(abs(values))`.
impute_zero_rated_nan: bool
If there are nan-valued features that have a zero response
(rated worst), replace those feature values with the mean
of the zero-response features that are not nan-valued.
remove_nan: bool
Remove any nan-valued features (after `impute_zero_rated_nan`
was applied). This is necessary, since skimage cannot handle
nan-valued sample values.
ret_names: bool
Return the names of the features in addition to the samples
and response.
Returns
-------
samples: 2d ndarray
Sample values with axes `(data_size, num_features)`
response: 1d ndarray
Response array of length `data_size`
names: list, optional
List of feature names corresponsing to axis `1` in `samples`
"""
if which_type is None:
which_type = ["continuous"]
fnames = cls.get_feature_names(which_type=which_type, names=names)
sample_paths = []
if path is None:
path = cls.get_training_set_path()
path = pathlib.Path(path).resolve()
resp_path = str(path / "train_response.txt")
for fn in fnames:
resf = str(path / "train_{}.txt".format(fn))
sample_paths.append(resf)
samples = [np.loadtxt(sp, dtype=float, ndmin=2) for sp in sample_paths]
samples = np.concatenate(samples, axis=1)
response = np.loadtxt(resp_path, dtype=float)
# Deal with NaN-valued feature data with a response of 0.
if impute_zero_rated_nan:
resp0 = response == 0
# For each feature, find values that are NaN where the
# response is zero. Those values are then be set to values
# where the response is zero and the values are not NaN.
for ii, fn in enumerate(fnames):
# locations where the feature is nan
fdat = samples[:, ii]
fnans = np.isnan(fdat)
# locations where feature is nan AND response is 0
# (those are the locations we would like to change)
coloc = np.logical_and(resp0, fnans)
# location where the feature is not nan AND response is 0
# (those are the reference locations)
ref = np.logical_and(resp0, ~fnans)
if np.any(coloc) and np.any(ref):
# We have values
refval = np.mean(fdat[ref])
samples[coloc, ii] = refval
# Deal with remaining NaN-valued feature data.
if remove_nan:
# Remove nan-values from training set
valid = ~np.array(np.sum(np.isnan(samples), axis=1), dtype=bool)
samples = samples[valid, :]
# remove corresponding responses
response = response[valid]
# Deal with infinite feature data.
if replace_inf:
for ii in range(len(fnames)):
si = samples[:, ii]
isinf = np.isinf(si)
if np.any(isinf):
extreme = np.nanmax(np.abs(si[~isinf]))
posinf = np.isposinf(si)
if np.any(posinf):
samples[posinf, ii] = 2 * extreme
neginf = np.isneginf(si)
if np.any(neginf):
samples[neginf, ii] = -2 * extreme
res = [samples, response]
if ret_names:
res.append(fnames)
return res
[docs]
def rate(self, samples=None, datasets=None):
"""Perform rating step
Parameters
----------
samples: 1d or 2d ndarray (cast to 2d ndarray) or None
Measured samples, if set to None, `dataset` must be given.
dataset: list of nanite.Indentation
Full, fitted measurement
Returns
-------
ratings: list
Resulting ratings
"""
if samples is None and datasets is None:
# use dataset from IndentationFeature
datasets = [self.dataset]
elif datasets is None:
# distinguish between binary and other samples
fsamples = []
bsamples = []
fnames = self.get_feature_names(
names=self.names,
which_type=["continuous"])
for samp in samples:
fsamp = [] # continuous samples
bsamp = [] # binary samples
for ii, name in enumerate(self.names):
if name in fnames:
fsamp.append(samp[ii])
else:
assert name.startswith("feat_bin_")
bsamp.append(samp[ii])
fsamples.append(fsamp)
bsamples.append(bsamp)
else:
if not isinstance(datasets, (list, tuple)):
datasets = [datasets]
fsamples = []
bsamples = []
# continuous features
for idnt in datasets:
samp = self.compute_features(
idnt=idnt,
names=self.names,
which_type=["continuous"])
fsamples.append(samp)
# binary features
for idnt in datasets:
bsamp = self.compute_features(idnt=idnt,
names=self.names,
which_type="binary")
bsamples.append(bsamp)
fsamples = np.atleast_2d(fsamples)
bsamples = np.atleast_2d(bsamples)
ratings = []
for bsamp, fsamp in zip(bsamples, fsamples):
if not self._pre_rate(bsamp):
# certainly a bad curve
gd = 0
elif np.isnan(np.sum(fsamp)):
# ignore nan-valued samples
gd = -1
else:
gd = self._rate(fsamp)
ratings.append(gd)
return np.array(ratings).flatten()
[docs]
def get_available_training_sets():
"""List of internal training sets"""
data_loc = "nanite"
resp_path = resource_filename(data_loc, "rate")
avail = []
for pp in pathlib.Path(resp_path).glob("ts_*"):
avail.append(pp.name[3:])
return sorted(avail)
[docs]
def get_rater(regressor, training_set="zef18", names=None,
lda=None, **reg_kwargs):
"""Convenience method to get a rater
Parameters
----------
regressor: str or RegressorMixin
If a string, must be in `reg_names`.
training_set: str or pathlib.Path or tuple (X, y)
A string label representing a training set shipped with
nanite, the path to a training set, or a tuple
representing the training set (samples, response)
for use with sklearn.
names: list of str
Only use these features for rating
lda: bool
Perform linear discriminant analysis
Returns
-------
irater: nanite.IndentationRater
The rating instance.
"""
avr = get_available_training_sets()
if isinstance(training_set, tuple):
pass
else:
if training_set in avr:
ts_path = IndentationRater.get_training_set_path(
label=training_set)
else:
ts_path = training_set
training_set = IndentationRater.load_training_set(
path=ts_path,
names=names)
if len(training_set) != 2:
raise ValueError("Expected training_set of the form (X, y)!")
if isinstance(regressor, str):
if regressor not in reg_names:
msg = "Unknown regressor name: '{}'!".format(regressor) \
+ " Please pass your own sklearn RegressorMixin."
raise ValueError(msg)
reg_cl, default_kw = reg_dict[regressor]
kw = default_kw.copy()
kw.update(reg_kwargs)
regr = reg_cl(**kw)
else:
regr = regressor
rater = IndentationRater(regressor=regr,
training_set=training_set,
names=names,
lda=lda)
return rater