"""Nested target encoder"""
__author__ = "david26694", "cmougan"
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator
from category_encoders.ordinal import OrdinalEncoder
from category_encoders.m_estimate import MEstimateEncoder
import category_encoders.utils as util
from sklearn.utils.random import check_random_state
[docs]class NestedTargetEncoder(BaseEstimator, util.TransformerWithTargetMixin):
"""Estimate of likelihood for nested data.
This is a generalization of the m-probability estimate. The main difference
is that instead of using a global prior, it can use a more fine-tuned prior.
This only works for nested data. For instance, I have individuals who live
in counties, that are inside states. If I want to estimate the likelihood
encoding for a county, it is better to use as prior the estimate for the
state instead of the global estimate.
Parameters
----------
verbose: int
integer indicating verbosity of the output. 0 for none.
cols: list
a list of columns to encode, if None, all string columns will be encoded.
drop_invariant: bool
boolean for whether or not to drop encoded columns with 0 variance.
feature_mapping: dict
dictionary representing the child - parent relationship. keys are children.
return_df: bool
boolean for whether to return a pandas DataFrame from transform (otherwise it will be a numpy array).
handle_missing: str
options are 'return_nan', 'error' and 'value', defaults to 'value', which returns the prior probability.
handle_unknown: str
options are 'return_nan', 'error' and 'value', defaults to 'value', which returns the prior probability.
randomized: bool,
adds normal (Gaussian) distribution noise into training data in order to decrease overfitting (testing data are untouched).
sigma: float
standard deviation (spread or "width") of the normal distribution.
m_prior: float
this is the "m" in the m-probability estimate for the global mean. Higher value of m results into stronger shrinking.
It is used whenever we estimate a likelihood using the global mean as a prior.
M is non-negative.
m_parent: float
this is the "m" in the m-probability estimate. Higher value of m results into stronger shrinking.
It is used whenever we estimate a likelihood using the parent mean as a prior.
M is non-negative.
Example
-------
>>> from sktools import NestedTargetEncoder
>>> import pandas as pd
>>> X = pd.DataFrame(
>>> {
>>> "child": ["a", "a", "b", "b", "b", "c", "c", "d", "d", "d"],
>>> "parent": ["e", "e", "e", "e", "e", "f", "f", "f", "f", "f",]
>>> }
>>> )
>>> y = pd.Series([1, 2, 3, 1, 2, 4, 4, 5, 4, 4.5])
>>> ne = NestedTargetEncoder(feature_mapping={"child": "parent"}, m_prior=0)
>>> ne.fit_transform(X, y)
child parent
0 2.016667 1.8
1 2.016667 1.8
2 2.262500 1.8
3 2.262500 1.8
4 2.262500 1.8
5 3.683333 4.3
6 3.683333 4.3
7 4.137500 4.3
8 4.137500 4.3
9 4.137500 4.3
References
----------
.. [1] Additive smoothing, from https://en.wikipedia.org/wiki/Additive_smoothing#Generalized_to_the_case_of_known_incidence_rates
"""
def __init__(
self,
verbose=0,
cols=None,
drop_invariant=False,
feature_mapping={},
return_df=True,
handle_unknown="value",
handle_missing="value",
random_state=None,
randomized=False,
sigma=0.05,
m_prior=1.0,
m_parent=1.0,
):
self.verbose = verbose
self.return_df = return_df
self.drop_invariant = drop_invariant
self.drop_cols = []
self.cols = cols
self.feature_mapping = feature_mapping
self.ordinal_encoder = None
self._dim = None
self.mapping = None
self.handle_unknown = handle_unknown
self.handle_missing = handle_missing
self._sum = None
self._count = None
self.random_state = random_state
self.randomized = randomized
self.sigma = sigma
self.m_prior = m_prior
self.m_parent = m_parent
self.feature_names = None
self.parent_cols = None
self.parent_encoder = None
# noinspection PyUnusedLocal
[docs] def fit(self, X, y, **kwargs):
"""Fit encoder according to X and binary y.
Parameters
----------
X : array-like, shape = [n_samples, n_features]
Training vectors, where n_samples is the number of samples
and n_features is the number of features.
y : array-like, shape = [n_samples]
Binary target values.
Returns
-------
self : encoder
Returns self.
"""
# Create parent encoder and fit it
self.parent_cols = list(self.feature_mapping.values())
self.parent_encoder = MEstimateEncoder(
verbose=self.verbose,
cols=self.parent_cols,
drop_invariant=self.drop_invariant,
return_df=self.return_df,
handle_unknown=self.handle_unknown,
handle_missing=self.handle_missing,
random_state=self.random_state,
randomized=self.randomized,
sigma=self.sigma,
m=self.m_prior,
)
self.parent_encoder.fit(X, y)
# Unite parameters into pandas types
X = util.convert_input(X)
y = util.convert_input_vector(y, X.index).astype(float)
# The lengths must be equal
if X.shape[0] != y.shape[0]:
raise ValueError(
"The length of X is "
+ str(X.shape[0])
+ " but length of y is "
+ str(y.shape[0])
+ "."
)
self._dim = X.shape[1]
# If columns aren't passed, just use every string column
if self.cols is None:
self.cols = util.get_obj_cols(X)
else:
self.cols = util.convert_cols_to_list(self.cols)
if self.handle_missing == "error":
if X[self.cols].isnull().any().any():
raise ValueError("Columns to be encoded can not contain null")
# Check that children and parents are disjoint
children = set(self.feature_mapping.keys())
parents = set(self.feature_mapping.values())
if len(children.intersection(parents)) > 0:
raise ValueError("No column should be a child and a parent")
self.ordinal_encoder = OrdinalEncoder(
verbose=self.verbose,
cols=self.cols,
handle_unknown="value",
handle_missing="value",
)
self.ordinal_encoder = self.ordinal_encoder.fit(X)
X_ordinal = self.ordinal_encoder.transform(X)
# Training
self.mapping = self._train(X_ordinal, y)
X_temp = self.transform(X, override_return_df=True)
self.feature_names = X_temp.columns.tolist()
# Store column names with approximately constant variance on the training data
if self.drop_invariant:
self.drop_cols = []
generated_cols = util.get_generated_cols(X, X_temp, self.cols)
self.drop_cols = [x for x in generated_cols if X_temp[x].var() <= 10e-5]
try:
[self.feature_names.remove(x) for x in self.drop_cols]
except KeyError as e:
if self.verbose > 0:
print(
"Could not remove column from feature names."
"Not found in generated cols.\n{}".format(e)
)
return self
def _train(self, X, y):
# Initialize the output
mapping = {}
# Calculate global statistics
self._sum = y.sum()
self._count = y.count()
prior = self._sum / self._count
for switch in self.ordinal_encoder.category_mapping:
col = switch.get("col")
values = switch.get("mapping")
# Easy case, the child is not in the child - parent dictionary.
# We just use the plain m-estimator with the global prior
if col not in self.feature_mapping:
stats = y.groupby(X[col]).agg(["sum", "count", "mean"])
estimate = (stats["sum"] + prior * self.m_prior) / (
stats["count"] + self.m_prior
)
# Not so easy case, we have to deal with the parent
else:
parent_col = self.feature_mapping[col]
# Check son-parent unique relation
unique_parents = X.groupby([col]).agg({parent_col: "nunique"})[
parent_col
]
more_1_parent = unique_parents[unique_parents > 1]
if any(unique_parents > 1) and more_1_parent.index >= 0:
raise ValueError(
f"There are children with more than one parent, {more_1_parent}"
)
# Get parent stats
te_parent = self.parent_encoder.transform(X)[parent_col]
parent_mapping = pd.DataFrame(
{"te_parent": te_parent, parent_col: X[parent_col]}
).drop_duplicates()
# Compute child statistics
stats = y.groupby(X[col]).agg(["sum", "count", "mean"])
# Relate parent and child stats
groups = X.loc[:, [parent_col, col]].drop_duplicates()
stats = stats.merge(groups, how="left", on=col).merge(
parent_mapping, how="left", on=parent_col
)
# In case of numpy array
stats = stats.rename(columns={"key_0": col})
stats = stats.set_index(col)
# Calculate the m-probability estimate using the parent prior
estimate = (stats["sum"] + stats["te_parent"] * self.m_parent) / (
stats["count"] + self.m_parent
)
# Ignore unique columns. This helps to prevent overfitting on id-like columns
if len(stats["count"]) == self._count:
estimate[:] = prior
# Column doesn't have parent - handle imputation as always
if col not in self.feature_mapping:
if self.handle_unknown == "return_nan":
estimate.loc[-1] = np.nan
elif self.handle_unknown == "value":
estimate.loc[-1] = prior
if self.handle_missing == "return_nan":
estimate.loc[values.loc[np.nan]] = np.nan
elif self.handle_missing == "value":
estimate.loc[-2] = prior
# With parents - we leave the imputation for afterwards
else:
# Unknown
estimate.loc[-1] = np.nan
# Missing
estimate.loc[values.loc[np.nan]] = np.nan
# Store the m-probability estimate for transform() function
mapping[col] = estimate
return mapping
def _score(self, X, y):
X_parents = self.parent_encoder.transform(X)
for col in self.cols:
# Easy case - not having parents (as m estimator)
if col not in self.feature_mapping:
# Score the column
X[col] = X[col].map(self.mapping[col])
# Harder case - having parents
else:
# Split missing and unknown values
unknown = X[col] == -1
missing = X[col] == -2
# Apply regular transformation
X[col] = X[col].map(self.mapping[col].drop_duplicates())
# Impute unknown with parent
parent_col = self.feature_mapping[col]
if self.handle_unknown == "value":
X[col] = X[col].mask(unknown, X_parents[parent_col])
# Impute missing with parent
if self.handle_missing == "value":
X[col] = X[col].mask(missing, X_parents[parent_col])
# Randomization is meaningful only for training data -> we do it only if y is present
if self.randomized and y is not None:
random_state_generator = check_random_state(self.random_state)
X[col] = X[col] * random_state_generator.normal(
1.0, self.sigma, X[col].shape[0]
)
return X
[docs] def get_feature_names(self):
"""
Returns the names of all transformed / added columns.
Returns
-------
feature_names: list
A list with all feature names transformed or added.
Note: potentially dropped features are not included!
"""
if not isinstance(self.feature_names, list):
raise ValueError("Estimator has to be fitted to return feature names.")
else:
return self.feature_names
[docs]class QuantileEncoder(BaseEstimator, util.TransformerWithTargetMixin):
"""Quantile Encoding for categorical features.
This a statistically modified version of target MEstimate encoder where selected features
are replaced the statistical quantile instead than the mean. Replacing with the
median is a particular case where self.quantile = 0.5. In comparison to MEstimateEncoder
it has two tunable parameter `m` and `quantile`
Parameters
----------
verbose: int
integer indicating verbosity of the output. 0 for none.
quantile: int
integer indicating statistical quantile. ´0.5´ for median.
m: int
integer indicating the smoothing parameter. 0 for no smoothing.
cols: list
a list of columns to encode, if None, all string columns will be encoded.
drop_invariant: bool
boolean for whether or not to drop columns with 0 variance.
return_df: bool
boolean for whether to return a pandas DataFrame from transform (otherwise it will be a numpy array).
handle_missing: str
options are 'error', 'return_nan' and 'value', defaults to 'value', which returns the target quantile.
handle_unknown: str
options are 'error', 'return_nan' and 'value', defaults to 'value', which returns the target quantile.
Example
-------
>>> from sktools import QuantileEncoder
>>> import pandas as pd
>>> from sklearn.datasets import load_boston
>>> bunch = load_boston()
>>> y = bunch.target
>>> X = pd.DataFrame(bunch.data, columns=bunch.feature_names)
>>> enc = QuantileEncoder(cols=['CHAS', 'RAD']).fit(X, y)
>>> numeric_dataset = enc.transform(X)
>>> print(numeric_dataset.info())
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 506 entries, 0 to 505
Data columns (total 13 columns):
CRIM 506 non-null float64
ZN 506 non-null float64
INDUS 506 non-null float64
CHAS 506 non-null float64
NOX 506 non-null float64
RM 506 non-null float64
AGE 506 non-null float64
DIS 506 non-null float64
RAD 506 non-null float64
TAX 506 non-null float64
PTRATIO 506 non-null float64
B 506 non-null float64
LSTAT 506 non-null float64
dtypes: float64(13)
memory usage: 51.5 KB
None
References
----------
.. [1] Quantile Encoder: Tackling High Cardinality Categorical Features in Regression Problems, https://arxiv.org/abs/2105.13783
.. [2] A Preprocessing Scheme for High-Cardinality Categorical Attributes in Classification and Prediction Problems, equation 7, from https://dl.acm.org/citation.cfm?id=507538
.. [3] On estimating probabilities in tree pruning, equation 1, from https://link.springer.com/chapter/10.1007/BFb0017010
.. [4] Additive smoothing, from https://en.wikipedia.org/wiki/Additive_smoothing#Generalized_to_the_case_of_known_incidence_rates
.. [5] Target encoding done the right way https://maxhalford.github.io/blog/target-encoding/
"""
def __init__(
self,
verbose=0,
cols=None,
drop_invariant=False,
return_df=True,
handle_missing="value",
handle_unknown="value",
quantile=0.5,
m=1.0,
):
self.return_df = return_df
self.drop_invariant = drop_invariant
self.drop_cols = []
self.verbose = verbose
self.cols = cols
self.ordinal_encoder = None
self._dim = None
self.mapping = None
self.handle_unknown = handle_unknown
self.handle_missing = handle_missing
self.feature_names = None
self.quantile = quantile
self.m = m
[docs] def fit(self, X, y, **kwargs):
"""Fit encoder according to X and y.
Parameters
----------
X : array-like, shape = [n_samples, n_features]
Training vectors, where n_samples is the number of samples
and n_features is the number of features.
y : array-like, shape = [n_samples]
Target values.
Returns
-------
self : encoder
Returns self.
"""
# unite the input into pandas types
X = util.convert_input(X)
y = util.convert_input_vector(y, X.index)
if X.shape[0] != y.shape[0]:
raise ValueError(
"The length of X is "
+ str(X.shape[0])
+ " but length of y is "
+ str(y.shape[0])
+ "."
)
self._dim = X.shape[1]
# if columns aren't passed, just use every string column
if self.cols is None:
self.cols = util.get_obj_cols(X)
else:
self.cols = util.convert_cols_to_list(self.cols)
if self.handle_missing == "error":
if X[self.cols].isnull().any().any():
raise ValueError("Columns to be encoded can not contain null")
self.ordinal_encoder = OrdinalEncoder(
verbose=self.verbose,
cols=self.cols,
handle_unknown="value",
handle_missing="value",
)
self.ordinal_encoder = self.ordinal_encoder.fit(X)
X_ordinal = self.ordinal_encoder.transform(X)
self.mapping = self.fit_quantile_encoding(X_ordinal, y)
X_temp = self.transform(X, override_return_df=True)
self.feature_names = list(X_temp.columns)
if self.drop_invariant:
self.drop_cols = []
X_temp = self.transform(X)
generated_cols = util.get_generated_cols(X, X_temp, self.cols)
self.drop_cols = [x for x in generated_cols if X_temp[x].var() <= 10e-5]
try:
[self.feature_names.remove(x) for x in self.drop_cols]
except KeyError as e:
if self.verbose > 0:
print(
"Could not remove column from feature names."
"Not found in generated cols.\n{}".format(e)
)
return self
[docs] def fit_quantile_encoding(self, X, y):
mapping = {}
# Calculate global statistics
prior = self._quantile = np.quantile(y, self.quantile)
self._sum = y.sum()
self._count = y.count()
for switch in self.ordinal_encoder.category_mapping:
col = switch.get("col")
values = switch.get("mapping")
# Calculate sum, count and quantile of the target for each unique value in the feature col
stats = y.groupby(X[col]).agg(
[lambda x: np.quantile(x, self.quantile), "sum", "count"]
)
stats.columns = ["quantile", "sum", "count"]
# Calculate the m-probability estimate of the quantile
estimate = (stats["count"] * stats["quantile"] + prior * self.m) / (
stats["count"] + self.m
)
if self.handle_unknown == "return_nan":
estimate.loc[-1] = np.nan
elif self.handle_unknown == "value":
estimate.loc[-1] = prior
if self.handle_missing == "return_nan":
estimate.loc[values.loc[np.nan]] = np.nan
elif self.handle_missing == "value":
estimate.loc[-2] = prior
mapping[col] = estimate
return mapping
[docs] def quantile_encode(self, X_in):
X = X_in.copy(deep=True)
for col in self.cols:
X[col] = X[col].map(self.mapping[col])
return X
[docs] def get_feature_names(self):
"""
Returns the names of all transformed / added columns.
Returns
-------
feature_names: list
A list with all feature names transformed or added.
Note: potentially dropped features are not included!
"""
if not isinstance(self.feature_names, list):
raise ValueError(
"Must fit data first. Affected feature names are not known " "before."
)
else:
return self.feature_names
[docs]class SummaryEncoder(BaseEstimator, util.TransformerWithTargetMixin):
def __init__(self, cols, quantiles, m=1.0):
self.cols = cols
self.quantiles = quantiles
self.m = m
self.encoder_list = None
[docs] def fit(self, X, y):
X = X.copy()
for quantile in self.quantiles:
for col in self.cols:
percentile = round(quantile * 100)
X[f"{col}_{percentile}"] = X[col]
encoder_list = []
for quantile in self.quantiles:
col_names = []
for col in self.cols:
percentile = round(quantile * 100)
col_names.append(f"{col}_{percentile}")
enc = QuantileEncoder(cols=col_names, quantile=quantile, m=self.m)
enc.fit(X, y)
encoder_list.append(enc)
self.encoder_list = encoder_list
return self