Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross validation of Pipeline/estimators using MLDataset / xarray.Dataset #221

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
55959a5
cross validation of MLDataset Pipeline
Oct 24, 2017
396f9aa
changes with CV sampling
Oct 26, 2017
33bac56
changes to cv_cache
Oct 26, 2017
b422e68
closer to working cross validation for MLDataset
Oct 26, 2017
d45d4e1
CV / xarray experimentation - work in progress
Oct 31, 2017
92054c9
MLDataset cross validation working for pipeline of 1 step that is uns…
Nov 1, 2017
35450c1
wrapped sklearn classes need to wrap score methods as fit, predict, o…
Nov 1, 2017
f86a079
update tests;fix cross validation with most data structures
Nov 3, 2017
5cf646f
a couple tests for Python 2.7
Nov 3, 2017
744109a
avoid dask-searchcv test in conda.recipe;add test_config.yml to MANIF…
Nov 3, 2017
1e7bec8
remove print statement
Nov 3, 2017
83437f5
ensure test_config.yaml included in pkg
Nov 3, 2017
de9efd0
remove elm.mldataset.cross_validation - modify environment.yml for el…
Nov 3, 2017
6267041
fix usage of is_arr utility to separate X, y tuple
Nov 3, 2017
66013e6
1850 passing tests
Nov 4, 2017
a91caf6
dask-searchcv in meta.yaml
Nov 4, 2017
e9b5d85
use elm/label/dev and elm for CI installs
Nov 4, 2017
f6ef7c8
change earthio version for fixing CI build
Nov 4, 2017
948efe5
ensure EARTHIO_CHANNEL_STR is set correctly in .travis.yml
Nov 6, 2017
edbe1f5
ensure ANACONDA_UPLOAD_USER is defined in .travis for pkg upload
Nov 6, 2017
6304e37
change order of channels to ensure dask-searchcv comes from elm
Nov 6, 2017
8a6d46f
subset the number of tests being run in CI
Nov 6, 2017
21a18d9
better diagnostics on upload failure in CI
Nov 6, 2017
8ad7b4c
remove earthio from CI
Nov 6, 2017
9a1734d
be sure to create env from elm's conda build output
Nov 6, 2017
dc47f65
remove diagnostic print from deploy section
Nov 6, 2017
00ea1be
refactor to simplify changes in dask-searchcv
Nov 8, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
include elm/config/defaults/environment_vars_spec.yaml
include elm/config/defaults/config_standard.yaml
include elm/tests/test_config.yaml
2 changes: 1 addition & 1 deletion conda.recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ test:
imports:
- elm.config
- elm.mldataset
- elm.model_selection
#- elm.model_selection
- elm.pipeline.pipeline
- elm.pipeline.steps
- elm.scripts
Expand Down
1 change: 1 addition & 0 deletions elm/mldataset/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from elm.mldataset.util import is_mldataset
39 changes: 39 additions & 0 deletions elm/mldataset/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import numpy as np
import dask.array as da

from collections import Sequence


def is_mldataset(arr, raise_err=False):
try:
from xarray_filters import MLDataset
from xarray import Dataset
return True
except Exception as e:
MLDataset = Dataset = None
if not raise_err:
return False
# Much of the ML logic
# wrapping Xarray would fail
# if only xarray and not Xarray_filters
# is installed, but when xarray_filters
# is installed, xarray.Dataset can be
# used
raise ValueError('Cannot use cross validation for xarray Dataset without xarray_filters')
return MLDataset and isinstance(arr, (MLDataset, Dataset))


def is_arr(arr, raise_err=False):
is_ml = is_mldataset(arr, raise_err=raise_err)
return is_ml or isinstance(arr, (np.ndarray, da.Array))


def _split_transformer_result(Xt, y):
if isinstance(Xt, Sequence) and len(Xt) == 2 and is_arr(Xt[1]):
Xt, new_y = Xt
else:
new_y = y
if y is None and new_y is not None:
y = new_y
assert not isinstance(y, tuple), repr((Xt, y, new_y))
return Xt, y
59 changes: 50 additions & 9 deletions elm/mldataset/wrap_sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
from dask.utils import derived_from # May be useful here?
from sklearn.utils.metaestimators import if_delegate_has_method # May be useful here?
from sklearn.linear_model import LinearRegression as skLinearRegression
from sklearn.metrics import r2_score, accuracy_score
from xarray_filters.mldataset import MLDataset
from xarray_filters.func_signatures import filter_args_kwargs
from xarray_filters.constants import FEATURES_LAYER_DIMS, FEATURES_LAYER
from elm.mldataset.util import _split_transformer_result
import xarray as xr
import yaml

Expand All @@ -27,6 +29,7 @@ def get_row_index(X, features_layer=None):
def _as_numpy_arrs(self, X, y=None, **kw):
'''Convert X, y for a scikit-learn method numpy.ndarrays
'''
X, y = _split_transformer_result(X, y)
if isinstance(X, np.ndarray):
return X, y, None
if isinstance(X, xr.Dataset):
Expand All @@ -46,7 +49,7 @@ def _as_numpy_arrs(self, X, y=None, **kw):

def _from_numpy_arrs(self, y, row_idx, features_layer=None):
'''Convert a 1D prediction to ND using the row_idx MultiIndex'''
if isinstance(y, MLDataset):
if isinstance(y, MLDataset) or row_idx is None:
return y
features_layer = features_layer or FEATURES_LAYER
coords = [row_idx,
Expand All @@ -64,38 +67,46 @@ class SklearnMixin:
_as_numpy_arrs = _as_numpy_arrs
_from_numpy_arrs = _from_numpy_arrs

def _call_sk_method(self, sk_method, X=None, y=None, **kw):
def _call_sk_method(self, sk_method, X=None, y=None, do_split=True, **kw):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am currently working on simplifying this function - checking what is actually needed.

'''Call a method of ._cls, typically an sklearn class,
for a method that requires numpy arrays'''
_cls = self._cls
if _cls is None:
raise ValueError('Define .cls as a scikit-learn estimator')
raise ValueError('Define ._cls as a scikit-learn estimator')
# Get the method of the class instance
func = getattr(_cls, sk_method, None)
if func is None:
raise ValueError('{} is not an attribute of {}'.format(sk_method, _cls))
X, y, row_idx = self._as_numpy_arrs(X, y=y)
if do_split:
X, y = _split_transformer_result(X, y)
if row_idx is not None:
self._temp_row_idx = row_idx
kw.update(dict(self=self, X=X))
if y is not None:
kw['y'] = y
kw = filter_args_kwargs(func, **kw)
return func(**kw)
Xt = func(**kw)
if do_split:
Xt, y = _split_transformer_result(Xt, y)
return Xt, y
return Xt

def _predict_steps(self, X, row_idx=None, sk_method=None, **kw):
def _predict_steps(self, X, y=None, row_idx=None, sk_method=None, **kw):
'''Call a prediction-related method, e.g. predict, score,
but extract the row index of X, if it exists, so that
y '''
X2, _, temp_row_idx = self._as_numpy_arrs(X, y=None)
X2, y, temp_row_idx = self._as_numpy_arrs(X, y=y)
if temp_row_idx is None:
row_idx = temp_row_idx
if row_idx is None:
row_idx = getattr(self, '_temp_row_idx', None)
y3 = self._call_sk_method(sk_method, X2, **kw)
if y is not None:
kw['y'] = y
y3 = self._call_sk_method(sk_method, X2, do_split=False, **kw)
return y3, row_idx

def predict(self, X, row_idx=None, **kw):
def predict(self, X, row_idx=None, as_mldataset=True, **kw):
'''Predict from MLDataset X and return an MLDataset with
DataArray called "predict" that has the dimensions of
X's MultiIndex. That MultiIndex typically comes from
Expand Down Expand Up @@ -146,7 +157,7 @@ def fit(self, X, y=None, **kw):
def _fit(self, X, y=None, **kw):
'''This private method is expected by some sklearn
models and must take X, y as numpy arrays'''
return self._call_sk_method('_fit', X, y=y, **kw)
return self._call_sk_method('_fit', X, y=y, do_split=False, **kw)

def transform(self, X, y=None, **kw):
if hasattr(self._cls, 'transform'):
Expand All @@ -173,3 +184,33 @@ def __repr__(self):
def fit_predict(self, X, y=None, **kw):
return self.fit(X, y=y, **kw).predict(X)

def _regressor_default_score(self, X, y, sample_weight=None, row_idx=None, **kw):
X, y = _split_transformer_result(X, y)
y_pred, row_idx = self._predict_steps(X, row_idx=row_idx, y=y,
sk_method='predict',
**kw)
return r2_score(y, y_pred, sample_weight=sample_weight,
multioutput='variance_weighted')

def _classifier_default_score(self, X, y=None, sample_weight=None, row_idx=None, **kw):
X, y = _split_transformer_result(X, y)
y_pred, row_idx = self._predict_steps(X, row_idx=row_idx, y=y,
sk_method='predict',
**kw)
return accuracy_score(y, y_pred, sample_weight=sample_weight)

def score(self, X, y=None, sample_weight=None, row_idx=None, **kw):

if self._cls._estimator_type == 'regressor':
func = self._regressor_default_score
elif self._cls._estimator_type == 'classifier':
func = self._classifier_default_score
else:
func = None
if func:
return func(X, y, sample_weight=sample_weight, row_idx=row_idx, **kw)
score, row_idx = self._predict_steps(X, row_idx=row_idx, y=y,
sk_method='score',
**kw)
return score

17 changes: 12 additions & 5 deletions elm/model_selection/ea_searchcv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
RandomizedSearchCV,
DaskBaseSearchCV,
_randomized_parameters)
from dask_searchcv.utils import is_pipeline
import numpy as np
from elm.model_selection.evolve import (fit_ea,
DEFAULT_CONTROL,
ind_to_new_params,
DEFAULT_EVO_PARAMS,)
from elm.mldataset.serialize_mixin import SerializeMixin
from elm.mldataset.wrap_sklearn import SklearnMixin
from dask_searchcv.methods import CVCacheSampler
from elm.mldataset.util import is_arr
from elm.model_selection.sorting import pareto_front
from elm.model_selection.base import base_selection
from elm.pipeline import Pipeline
Expand Down Expand Up @@ -132,7 +135,9 @@ class EaSearchCV(RandomizedSearchCV, SklearnMixin, SerializeMixin):
parameters=_ea_parameters,
example=_ea_example)

def __init__(self, estimator, param_distributions, n_iter=10,
def __init__(self, estimator, param_distributions,
n_iter=10,
sampler=None,
random_state=None,
ngen=3, score_weights=None,
sort_fitness=pareto_front,
Expand All @@ -143,9 +148,10 @@ def __init__(self, estimator, param_distributions, n_iter=10,
scoring=None,
iid=True, refit=True,
cv=None, error_score='raise', return_train_score=True,
scheduler=None, n_jobs=-1, cache_cv=True):
scheduler=None, n_jobs=-1, cache_cv=CVCacheSampler):
filter_kw_and_run_init(RandomizedSearchCV.__init__, **locals())
self.ngen = ngen
self.sampler = sampler
self.select_with_test = select_with_test
self.model_selection = model_selection
self.model_selection_kwargs = model_selection_kwargs
Expand Down Expand Up @@ -264,10 +270,11 @@ def _as_dask_array(self, X, y=None, **kw):

def fit(self, X, y=None, groups=None, **fit_params):
self._open()
X, y = self._as_dask_array(X, y=y)
if not self.get_params('sampler'):
X, y = self._as_dask_array(X, y=y)
for self._gen in range(self.ngen):
print('Generation', self._gen)
RandomizedSearchCV.fit(self, X, y, groups, **fit_params)
RandomizedSearchCV.fit(self, X, y, groups=groups, **fit_params)
fitnesses = self._get_cv_scores()
self.cv_results_all_gen_ = _concat_cv_results(self.cv_results_all_gen_,
self.cv_results_,
Expand All @@ -289,7 +296,7 @@ def fit(self, X, y=None, groups=None, **fit_params):
return self

def _get_param_iterator(self):
if self._is_ea and not getattr(self, '_invalid_ind', None):
if self._gen != 0 and self._is_ea and not getattr(self, '_invalid_ind', None):
return iter(())
if not self._is_ea and self._gen == 0:
self.next_params_ = tuple(RandomizedSearchCV._get_param_iterator(self))
Expand Down
1 change: 0 additions & 1 deletion elm/model_selection/multilayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def concat_features(method):
'''Decorator to run an estimator method on
predictions of estimators'''
def new_func(self, X, y=None, **kw):
nonlocal method
X, y = MultiLayer._concat_features(self, X, y=y)
func = getattr(self.estimator, method)
if 'predict' in method:
Expand Down
44 changes: 8 additions & 36 deletions elm/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from elm.mldataset.wrap_sklearn import (_as_numpy_arrs,
_from_numpy_arrs,
get_row_index,
SklearnMixin)
SklearnMixin,)
from elm.mldataset.util import _split_transformer_result

from sklearn.utils.metaestimators import _BaseComposition
from xarray_filters.pipeline import Step
Expand All @@ -44,37 +45,12 @@ def _sk_method(self, method):
def _astype(self, step, X, y=None):
astype = 'numpy'
if not isinstance(step, Step):
print('Numpy')
X, y, row_idx = self._as_numpy_arrs(X, y)
if row_idx is not None:
self.row_idx = row_idx
return X, y

#def _validate_steps(self):
# return True

def _do_this_step(self, step_idx):
name, est = self.steps[step_idx]
self._generic = {}
for name, est in self.steps:
if isinstance(est, Step):
self._generic[name] = True
else:
self._generic[name] = False
print('GEn', self._generic, name)
do_step = True
if getattr(self, '_run_generic_only', None) is None:
pass
else:
if self._run_generic_only and not name in self._generic:
do_step = False
if getattr(self, '_skip_generic', None) is None:
pass
else:
if self._skip_generic and name in self._generic:
do_step = False
print('do_step', name, do_step)
return do_step
# Check to see if Xt is actually an (Xt, y) tuple
Xt, y = _split_transformer_result(X, y)
return Xt, y

def _fit_generic_only(self, X, y, **fit_params):
self._generic = {}
Expand All @@ -84,7 +60,6 @@ def _fit_generic_only(self, X, y, **fit_params):
else:
self._generic[name] = False


def _fit(self, X, y=None, **fit_params):

self._validate_steps()
Expand All @@ -108,9 +83,7 @@ def _fit(self, X, y=None, **fit_params):
fit_params_steps[step][param] = pval
Xt = X
for step_idx, (name, transformer) in enumerate(self.steps[:-1]):
#if self._do_this_step(step_idx):
Xt, y = self._astype(transformer, Xt, y=y)
print('Types', step_idx, [type(_) for _ in (Xt, y)])
if transformer is None:
pass
else:
Expand Down Expand Up @@ -177,13 +150,12 @@ def _before_predict(self, method, X, y=None, **fit_params):
Xt = X
for step_idx, (name, transform) in enumerate(self.steps[:-1]):
if transform is not None:
#if not self._do_this_step(step_idx):
# continue
Xt, y = self._astype(transform, Xt, y=y)
Xt = transform.transform(Xt)
row_idx = self.row_idx
Xt, y = _split_transformer_result(Xt, y)
row_idx = getattr(self, 'row_idx', fit_params.get('row_idx'))
else:
row_idx = getattr(self, 'row_idx', None)
row_idx = getattr(self, 'row_idx', fit_params.get('row_idx'))
final_estimator = self.steps[-1][-1]
fit_params = dict(row_idx=row_idx, **fit_params)
if y is not None:
Expand Down
28 changes: 12 additions & 16 deletions elm/pipeline/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,6 @@ def get_module_classes(m):
return {attr: getattr(module, attr) for attr in attrs}


def patch_cls(cls):

class Wrapped(SklearnMixin, cls):
_cls = cls
__init__ = cls.__init__
_cls_name = cls.__name__
name = 'Elm{}'.format(cls.__name__)
globals()[name] = Wrapped
return globals()[name]


_all = []
_seen = set()
ALL_STEPS = {}
Expand All @@ -55,12 +44,20 @@ class Wrapped(SklearnMixin, cls):
for cls in get_module_classes(m).values():
if cls.__name__ in _seen:
continue
if not m in cls.__module__:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just checking that we are getting StandardScaler or similar from the sklearn module where it is actually defined, not some other one where it is imported for internal usage.

continue
_seen.add(cls.__name__)
w = patch_cls(cls)
if any(s in cls.__name__ for s in SKIP):
name = cls.__name__
if any(s in name for s in SKIP):
continue
this_module[cls.__name__] = w
ALL_STEPS[(m, cls.__name__)] = w
class Wrapped(SklearnMixin, cls):
_cls = cls
__init__ = cls.__init__
_cls_name = name

globals()[name] = Wrapped
this_module[cls.__name__] = globals()[name]
ALL_STEPS[(m, cls.__name__)] = globals()[name]
this_module = Namespace(**this_module)
if m == 'cluster.bicluster':
bicluster = this_module # special case (dotted name)
Expand All @@ -75,5 +72,4 @@ class Wrapped(SklearnMixin, cls):
del _all
del m
del this_module
del w
del _seen
1 change: 1 addition & 0 deletions elm/tests/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ SKIP: [label_propagation, semi_supervised, multiclass, multioutput, ensemble, ke
covariance, naive_bayes, calibration, cross_decomposition, IsotonicRegression, MultiTaskLassoCV,
MultiTaskLasso, MultiTaskElasticNetCV, MultiTaskElasticNet, RANSACRegressor, OneHotEncoder,
RFE, RFECV, Birch, SparseCoder, OrthogonalMatchingPursuitCV]
SKIP_CV: [LeavePGroupsOut, StratifiedKFold, StratifiedShuffleSplit]
Loading