Monotonic WOE Binning Algorithm for Credit Scoring
About
The following WOE binning class is by far the most stable woe binning algorithm I have ever used. I get it from https://github.com/PedroHBM/woe-monotonic-binning
And it is based on this paper: https://www.researchgate.net/publication/322520135_Monotone_optimal_binning_algorithm_for_credit_risk_modeling
The following rules are important for binning in credit scoring applications:
-
1) Binning should be monotonic, i.e. the feature should be binned in such a way that it can be ordinal. By doing this, we can prepare and present our scorecards more effectively to non-tech audiences. When a feature increases, we have to see a linear correlation between the feature and the target. It has several benefits but the most important point is we can handle outliers well. And we can build better generalizing model.
-
2) Missing values are binned separetely. But sometimes we can edit this according to target ratio, being null may include similar information as being numerically low. For example, in a housing price dataset, if a value is missing for feature like PoolArea. It might mean that the house doesn’t have a pool. In this case we can group the missing with zeroes.
-
3) Each bin should contain at least 5% per-cent observations. This is again to reduce overfitting. Extreme values are handled better this way.
This class applies all of this rules. It also runs fast and supports parallel processing.
import numpy as np
import pandas as pd
from scipy import stats
import multiprocessing as mp
from joblib import Parallel, delayed
def unpack_woe(args):
return woe_binning(*args)
def merge_bins(df, bins_index):
bins_index.sort()
interval_start_include = df.loc[bins_index[0]].interval_start_include
interval_end_exclude = df.loc[bins_index[-1]].interval_end_exclude
df_indexed = df[bins_index[0]: bins_index[-1] + 1]
size = df_indexed['size'].sum()
bads = df_indexed.bads.sum()
goods = df_indexed.goods.sum()
mean = bads / size
dist_good = goods / df.goods.sum()
dist_bad = bads / df.bads.sum()
woe = np.log(dist_bad / dist_good)
iv = (dist_bad - dist_good) * woe
df = df.drop(bins_index)
df.loc[bins_index[0]] = [df.variable.values[0], interval_start_include, interval_end_exclude, size, mean, bads, goods, dist_good, dist_bad, woe, iv]
return df.sort_index().reset_index(drop=True)
def woe_binning_sep(target, column, dataset, sep_value, n_threshold, n_occurences=1, p_threshold=0.1,
merge_threshold=None):
nan = None
a = woe_binning(target, dataset[dataset[column] == sep_value][[target, column]], n_threshold,
n_occurences=n_occurences, p_threshold=p_threshold, merge_threshold=merge_threshold)
dist_bad = a.loc[0].bads / dataset[target].sum()
dist_good = a.loc[0].goods / (dataset.shape[0] - dataset[target].sum())
a.at[0, 'woe'] = np.log(dist_bad / dist_good)
a.at[0, 'dist_good'] = dist_good
a.at[0, 'dist_bad'] = dist_bad
a.at[0, 'iv_components'] = (dist_bad - dist_good) * a.at[0, 'woe']
b = woe_binning(target, dataset[dataset[column] != sep_value][[target, column]], n_threshold,
n_occurences=n_occurences, p_threshold=p_threshold, merge_threshold=merge_threshold)
if np.isnan(b.loc[b.shape[0] - 1, 'interval_start_include']):
nan_line = b.loc[b.shape[0] - 1]
b = b[:-1]
nan = 1
if b.loc[0, 'interval_start_include'] < b.loc[b.shape[0] - 1, 'interval_start_include']:
if sep_value < b.loc[0, 'interval_end_exclude']:
a.at[0, 'interval_end_exclude'] = sep_value + 1e-5
a.at[0, 'interval_start_include'] = -np.inf
b.at[0, 'interval_start_include'] = sep_value + 1e-5
ret = pd.concat([a, b]).reset_index(drop=True)
else:
print(1)
a.at[0, 'interval_start_include'] = sep_value
a.at[0, 'interval_end_exclude'] = np.inf
b.at[b.shape[0] - 1, 'interval_end_exclude'] = sep_value
ret = pd.concat([b, a]).reset_index(drop=True)
else:
if sep_value < b.loc[0, 'interval_end_exclude']:
a.at[0, 'interval_start_include'] = sep_value
a.at[0, 'interval_end_exclude'] = -np.inf
b.at[b.shape[0] - 1, 'interval_end_exclude'] = sep_value
ret = pd.concat([b, a]).reset_index(drop=True)
else:
a.at[0, 'interval_end_exclude'] = sep_value - 1e-5
a.at[0, 'interval_start_include'] = np.inf
b.at[0, 'interval_start_include'] = sep_value - 1e-5
ret = pd.concat([a, b]).reset_index(drop=True)
if nan:
ret.loc[ret.shape[0]] = nan_line
return ret
def batch_woe_binning(target, dataset, n_threshold=None, n_occurences=1, p_threshold=0.1, sep_value=None,
merge_threshold=None):
from math import ceil
#nprocs = int(mp.cpu_count()/2)
nprocs = 1
columns = dataset.columns[dataset.columns != target]
if n_threshold is None:
min_bin_size = ceil(dataset.shape[0] / 20)
else:
min_bin_size = n_threshold
parallel = Parallel(n_jobs=nprocs, verbose=5)
if sep_value:
df_list = parallel(delayed(woe_binning_sep)
(target, column, dataset[[column, target]], sep_value=sep_value,
n_threshold=min_bin_size, n_occurences=n_occurences,
p_threshold=p_threshold, merge_threshold=merge_threshold)
for column in columns)
else:
df_list = parallel(delayed(woe_binning)
(target, dataset[[column, target]],
n_threshold=min_bin_size, n_occurences=n_occurences,
p_threshold=p_threshold, merge_threshold=merge_threshold)
for column in columns)
del parallel
return {i.variable[0]: i for i in df_list}
def woe_binning(target, dataset, n_threshold, n_occurences=1, p_threshold=0.1, sort_overload=None,
merge_threshold=None):
column = dataset.columns[dataset.columns != target][0]
sorted_dataset = dataset.sort_values(by=[column])
size = sorted_dataset.shape[0]
if sorted_dataset[:int(size / 4)][target].sum() > sorted_dataset[int(size * 3 / 4):][target].sum():
order = True
interval_end = np.inf
else:
order = False
interval_end = -np.inf
summary = dataset.dropna().groupby([column]).agg(["mean", "size", "std"])
summary.columns = summary.columns.droplevel(level=0)
summary = summary[["mean", "size", "std"]]
summary = summary.reset_index()
summary["del_flag"] = 0
summary["std"] = summary["std"].fillna(0)
summary = summary.sort_values(by=[column], ascending=(sort_overload or order)).reset_index(drop=True)
while True:
i = 0
summary = summary[summary.del_flag == 0]
summary = summary.reset_index(drop=True)
while True:
j = i + 1
if j >= len(summary):
break
if summary.iloc[j]['mean'] < summary.iloc[i]['mean']:
i = i + 1
continue
else:
while True:
n = summary.iloc[j]['size'] + summary.iloc[i]['size']
m = (summary.iloc[j]['size'] * summary.iloc[j]['mean'] +
summary.iloc[i]['size'] * summary.iloc[i]['mean']) / n
if n == 2:
s = np.std([summary.iloc[j]['mean'], summary.iloc[i]['mean']])
else:
s = np.sqrt((summary.iloc[j]['size'] * ((summary.iloc[j]['std']) ** 2) +
summary.iloc[i]['size'] * ((summary.iloc[i]['std']) ** 2)) / n)
summary.loc[i, "size"] = n
summary.loc[i, "mean"] = m
summary.loc[i, "std"] = s
summary.loc[j, "del_flag"] = 1
j = j + 1
if j >= len(summary):
break
if summary.loc[j, "mean"] < summary.loc[i, "mean"]:
i = j
break
if j >= len(summary):
break
dels = np.sum(summary["del_flag"])
if dels == 0:
break
while True:
summary["next_mean"] = summary["mean"].shift(-1)
summary["next_size"] = summary["size"].shift(-1)
summary["next_std"] = summary["std"].shift(-1)
summary["updated_size"] = summary["next_size"] + summary["size"]
summary["updated_mean"] = (summary["next_mean"] * summary["next_size"] +
summary["mean"] * summary["size"]) / summary["updated_size"]
summary["updated_std"] = (summary["next_size"] * summary["next_std"] ** 2 +
summary["size"] * summary["std"] ** 2) / (summary["updated_size"] - 2)
summary["z_value"] = (summary["mean"] - summary["next_mean"]) / np.sqrt(
summary["updated_std"] * (1 / summary["size"] + 1 / summary["next_size"]))
summary["p_value"] = 1 - stats.norm.cdf(summary["z_value"])
condition = (summary["size"] < n_threshold) | (summary["next_size"] < n_threshold) | (
summary["mean"] * summary["size"] < n_occurences) | (
summary["next_mean"] * summary["next_size"] < n_occurences)
summary[condition].p_value = summary[condition].p_value + 1
summary["p_value"] = summary.apply(
lambda row: row["p_value"] + 1 if (row["size"] < n_threshold) | (row["next_size"] < n_threshold) |
(row["mean"] * row["size"] < n_occurences) |
(row["next_mean"] * row["next_size"] < n_occurences)
else row["p_value"], axis=1)
max_p = max(summary["p_value"])
row_of_maxp = summary['p_value'].idxmax()
row_delete = row_of_maxp + 1
if max_p > p_threshold:
summary = summary.drop(summary.index[row_delete])
summary = summary.reset_index(drop=True)
else:
break
summary["mean"] = summary.apply(lambda row: row["updated_mean"] if row["p_value"] == max_p else row["mean"],
axis=1)
summary["size"] = summary.apply(lambda row: row["updated_size"] if row["p_value"] == max_p else row["size"],
axis=1)
summary["std"] = summary.apply(
lambda row: np.sqrt(row["updated_std"]) if row["p_value"] == max_p else row["std"], axis=1)
woe_summary = summary[[column, "size", "mean"]]
woe_summary.columns = ["interval_start_include", "size", "mean"]
woe_summary["interval_end_exclude"] = woe_summary.interval_start_include.shift(-1).fillna(interval_end)
woe_summary.interval_start_include.loc[0] = interval_end * -1
woe_summary["variable"] = column
woe_summary = woe_summary[["variable", "interval_start_include", "interval_end_exclude", "size", "mean"]]
if dataset[column].isna().sum() > 0:
nan_line = list(
dataset[dataset[column].isna()].fillna(0).groupby([column]).agg(["size", "mean"]).reset_index(
drop=True).loc[0].fillna(0).values)
nan_line = [column, np.nan, np.nan] + nan_line
woe_summary.loc[woe_summary.index.max() + 1] = nan_line
woe_summary["bads"] = woe_summary["mean"] * woe_summary["size"]
woe_summary["goods"] = woe_summary["size"] - woe_summary["bads"]
total_goods = np.sum(woe_summary["goods"])
total_bads = np.sum(woe_summary["bads"])
woe_summary["dist_good"] = woe_summary["goods"] / total_goods
woe_summary["dist_bad"] = woe_summary["bads"] / total_bads
woe_summary["woe"] = np.log(woe_summary["dist_bad"] / woe_summary["dist_good"])
woe_summary["iv_components"] = (woe_summary["dist_bad"] - woe_summary["dist_good"]) * woe_summary["woe"]
##NEW PART
if merge_threshold:
while True:
if woe_summary.dropna().shape[0] <= 1:
break
for i in range(woe_summary.dropna().shape[0] - 1):
if abs(abs(woe_summary.loc[i, 'woe']) - abs(woe_summary.loc[i + 1, 'woe'])) \
/ abs(woe_summary.loc[i, 'woe']) <= merge_threshold:
woe_summary = merge_bins(woe_summary, [i, i + 1])
break
if i == woe_summary.dropna().shape[0] - 2:
break
return woe_summary
def apply_bins(dataset, dict_woe, iv_threshold=0.02, bin_threshold=2, is_df=False, remove_100_corr=True):
df_bin = pd.DataFrame()
ivs_list = []
if is_df:
values = dict_woe.variable.values
else:
values = dict_woe.values()
for df_col in values:
if is_df:
df_col = dict_woe[dict_woe.variable == df_col]
iv_total = df_col.dropna().iv_components.sum()
if iv_total < iv_threshold or df_col.shape[0] < bin_threshold or iv_total == np.inf:
continue
column = df_col.variable.loc[0]
df_col_dropped = df_col.dropna()
bin_cuts = list(df_col_dropped.interval_start_include.values) + [
df_col_dropped.interval_end_exclude.values[-1]]
labels_woe = list(df_col_dropped.woe.values)
if bin_cuts[0] > bin_cuts[-1]:
bin_cuts.reverse()
labels_woe.reverse()
include_left = False
include_right = True
else:
include_left = True
include_right = False
if remove_100_corr:
if iv_total not in ivs_list:
df_bin[column + '_bin'] = pd.to_numeric(
pd.cut(dataset[column].fillna(dataset[column].median()), bin_cuts, include_lowest=include_left,
right=include_right, labels=labels_woe))
ivs_list.append(iv_total)
else:
df_bin[column + '_bin'] = pd.to_numeric(
pd.cut(dataset[column].fillna(dataset[column].median()), bin_cuts, include_lowest=include_left,
right=include_right, labels=labels_woe))
return df_bin
I integrated this algorithm to my credit scoring class like the following:
def woe_bin(self, use_train_set=True, cols = None, thresh=25000):
print("WOE Monotonic binning started...\n")
start_time = time.time()
#global df, woe_dict, target_name
if self.traintest_used == True:
local_df = pd.concat([self.x_train,self.y_train],axis=1)
if local_df.shape[0]<thresh:
df_sample = local_df
else:
df_sample = local_df.sample(frac=1).reset_index(drop=True).loc[0:thresh]
if cols==None:
self.woe_dict = batch_woe_binning(self.target_name, df_sample)
self.x_train = apply_bins(self.x_train,self.woe_dict)
self.x_test = apply_bins(self.x_test,self.woe_dict)
else:
self.woe_dict = batch_woe_binning(self.target_name, df_sample[cols+[self.target_name]])
self.x_train[cols] = apply_bins(self.x_train,self.woe_dict)
self.x_test[cols] = apply_bins(self.x_test,self.woe_dict)
print("WOE Monotonic binning completed.\n")