8. Cost-sensitive and imbalanced learning

Now another of the key ways that modeling fraud differs from modeling other things. Fraud is imbalanced in two ways: Fraud is rare. And the cost of false negatives (missing fraud) is typically much greater than the cost of false positives (flagging legit transactions as fraud).

8.1 Cost-sensitive learning

Suppose you estimate each false positive to cost $C_{FP}$ dollars and each false negative to cost $C_{FN}$. An obvious way to incorporate these costs into the loss function is to set the class weights $s_i$ for $i\in\mathcal{D}$ to be $C_{FN}$ for fraudulent transactions and $C_{FP}$ for legitimate transactions. That is:

\[s_i := \begin{cases} C_{FN} & \text{if } y_i = 1\\ C_{FP} & \text{if } y_i = 0 \end{cases}\]

With these class weights, the cost-adjusted regularized log-loss function becomes:

\[\text{RegLogLoss}(f_{\mathbf{w}}, \mathbf{\lambda}) = \Omega (\mathbf{w},\mathbf{\lambda}) - \frac{\sum_{i=1}^n \left( C_{FN} \ y_i \ln f_{\mathbf{w}}(\mathbf{X}_i) + C_{FP} (1 - y_i) \ln(1 - f_{\mathbf{w}}(\mathbf{X}_i)) \right)}{\sum_{i=1}^n \left( C_{FN} \ y_i + C_{FP} (1 - y_i)\right)}\]

When training the model from already-tuned hyperparameters, the differential costs make assigning a fraud case a low probability a more costly error, all alse being equal, than assigning a legit transaction a high probability. And if the measure of validation loss also incorporates these costs, they will similarly impact the hyperparameter tuning.

Suppose, for instance, that a false negative costs 20 times as much as a false positive. For simplicity, let’s say $C_{FN}=20$ and $C_{FP}=1$ and let’s ignore the normalizing factor (which depends on the number of samples). Then predicting a fraudulent transaction to have only a 10% chance of being fraudulent adds 46 to the log-loss. But predicting a legitimate transaction to have a 90% chance of being fraudulent adds only 2 to the log-loss.

Click to expand/hide Python code to generate the table and plot
 ```python
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
from matplotlib.colors import ListedColormap
import matplotlib.ticker as ticker

p = np.linspace(0.001, 0.999, 500)
cost_ratios = [1, 2, 5, 10, 20]

viridis_full = mpl.colormaps['viridis'](np.linspace(0, 1, 256))
start = int(0.1 * 256)
end = int(0.85 * 256)
viridis_trimmed = viridis_full[start:end]
custom_cmap = ListedColormap(viridis_trimmed)
colors = custom_cmap(np.linspace(0, 1, len(cost_ratios)))

plt.figure(figsize=(8, 5))
handles = []
labels = []

# Plot fraud curves and collect handles and labels
for cost_ratio, color in zip(cost_ratios, colors):
    log_loss_fraud = -np.log(p) * cost_ratio
    h, = plt.plot(p * 100, log_loss_fraud, label=f'Fraud (cost ratio={cost_ratio})', linestyle='--', color=color)
    handles.append(h)
    labels.append(f'Fraud (cost ratio={cost_ratio})')

# Plot legit curve
log_loss_legit = -np.log(1 - p)
h_legit, = plt.plot(p * 100, log_loss_legit, label='Legit (weight=1)', color='brown')

# Append the legit curve handle and label at the end
handles.append(h_legit)
labels.append('Legit (weight=1)')

# Create an ordering list:
# Indices of fraud curves sorted by decreasing cost ratio (since cost_ratios is increasing, reverse the order)
fraud_indices_desc = list(range(len(cost_ratios)-1, -1, -1))

# Append the legit curve index last
legend_order = fraud_indices_desc + [len(cost_ratios)]  # legit is last

# Reorder handles and labels according to desired legend order
handles_ordered = [handles[i] for i in legend_order]
labels_ordered = [labels[i] for i in legend_order]

plt.legend(handles_ordered, labels_ordered)
plt.title('Cost-Weighted Log-Loss Curves vs Predicted Probability')
plt.xlabel('Predicted Probability of Fraud (%)')
plt.ylabel('Log-Loss')
plt.grid(True)

# Add percent signs on the x-axis tick labels
plt.gca().xaxis.set_major_formatter(ticker.PercentFormatter(xmax=100))

plt.savefig("cost-wgted-log-loss-curves-vs-pred-prob.png", bbox_inches='tight')
plt.show()

p_values = [0.1, 0.9]
cost_ratios = [1, 2, 5, 10, 20]

def log_loss_fraud(p, cost_ratio):
    return -np.log(p) * cost_ratio

def log_loss_legit(p):
    return -np.log(1 - p)

rows = []
headers = ["Class / Cost Ratio", "Log-Loss at p=10%", "Log-Loss at p=90%"]

for cost_ratio in cost_ratios:
    row = [
        f"Fraud (cost ratio={cost_ratio})",
        f"{log_loss_fraud(0.1, cost_ratio):.4f}",
        f"{log_loss_fraud(0.9, cost_ratio):.4f}"
    ]
    rows.append(row)

# Add legit row
rows.append([
    "Legit (weight=1)",
    f"{log_loss_legit(0.1):.4f}",
    f"{log_loss_legit(0.9):.4f}"
])

# Print markdown table
print("| " + " | ".join(headers) + " |")
print("|" + "|".join(["---"] * len(headers)) + "|")
for row in rows:
    print("| " + " | ".join(row) + " |")

``` 


Class / Cost Ratio Log-Loss at P(fraud)=10% Log-Loss at P(fraud)=90%
Fraud (cost ratio=1) 2.3026 0.1054
Fraud (cost ratio=2) 4.6052 0.2107
Fraud (cost ratio=5) 11.5129 0.5268
Fraud (cost ratio=10) 23.0259 1.0536
Fraud (cost ratio=20) 46.0517 2.1072
Legit (weight=1) 0.1054 2.3026


cost weighted log loss curves vs predicted probability

The Handbook says that it is difficult to estimate the costs of false positives and false negatives. (I’m guessing a card issuer would have a good sense of the cost of missing fraud, at least in terms of refunding cardholders for transactions they didn’t authorize. And I’m guessing card issuers have a good sense of investigating wrongly flagged transactions that turn out to be legitimate. But maybe it’s hard to estimate the costs of losing customers who get annoyed by declined transactions and holds placed on their cards?)

When these costs can’t be reasonably reliably estimated, the Handbook notes that a popular heuristic is to assume that false negatives cost $1/IR$ times as much as false positives, where the imbalance ratio $IR$ is defined as the ratio of fraudulent transactions to legitimate transactions.

This heuristic doesn’t sound unreasonable, but I don’t want to mix up imbalanced learning and cost-sensitive learning. That is, one way to handle class imbalance is to upweight the minority class, regardless of whether you also incorporate the differential costs of misclassification. But you can also incorporate both concepts at once, learning in a manner that addresses both class imbalance and cost imbalance. So let’s explore this.

8.2 Imbalanced learning

The Handbook explains various imbalance techniques, including:

  • oversampling: enlarging the training data with random duplicates of fraudulent transactions
  • undersampling: shrinking the training data by randomly removing legitimate transactions
  • imbalance ratio: assigning sample weights of 1 to legitimate transactions and $1/IR$ to fraudulent transactions
  • SMOTE: enlarging the training data with synthetically-generated fraud cases. The synthetic cases are made by selecting a fraudulent transaction and interpolating between it and one of its k nearest neighbor fraudulent transactions.

As an experiment, I appled each of these to XGBoost, both alone (called “vanilla” in the plots) and together with a cost of 1,500 dollars per false negative and 75 dollars per false positive (called “cost_sensitive” in the plots). For comparison, I also added XGBoost with just the cost-sensitive adjustment and XGBoost with neither adjustment. So, there were a total of 10 models.

In terms of AUC, average precision, CardPrecision@30, and CardRecall@30, most combinations outperformed the version without these imbalance adjustments. But not always, and not by slam-dunk improvements.

Click to expand/hide Python code to fit XGBoost with the 10 combinations of imbalance adjustment and cost-sensitive learning
 ```python
""" Run XGBoost 10 combinations of imbalance adjustment and cost-sensitive learning """

from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler
from scipy.special import expit as sigmoid  # Sigmoid function
from sklearn.base import clone

# Define sampler mapping
def get_sampler(method):
    if method == 'undersampling':
        return RandomUnderSampler(random_state=0)
    elif method == 'oversampling':
        return RandomOverSampler(random_state=0)
    elif method == 'smote':
        return SMOTE(random_state=0)
    else:
        return None  # For 'imbalance_ratio' and 'none'

# Setup pipelines (no sampling in pipeline yet)
pipelines = {
    'XGBoost': Pipeline([
        ('preprocessor', preprocessor),
        ('clf', XGBClassifier(random_state=0, use_label_encoder=False, eval_metric='logloss', n_jobs=-1))
    ])
}

# Function to calculate sample weights
def calc_sample_weights(y, cost_FP, cost_FN):
    return np.where(y == 1, cost_FN, cost_FP)


def run_imbalance_and_cost_sensitive_exp(
    df, pipelines, input_features, output_col,
    imbalance_methods=['imbalance_ratio', 'undersampling', 'oversampling', 'smote', 'none'],
    cost_FP=75, cost_FN=1500,
    start_date=datetime.datetime(2018, 7, 25),
    delta_train=7, delta_delay=7, delta_assessment=7,
    n_folds=4,
    n_iter=20,
    random_state=0
):
    rng = check_random_state(random_state)
    optimized_pipelines = {}

    for imb_method in imbalance_methods:
        for cost_sensitive in [False, True]:
            run_name = f"{imb_method}_{'cost_sensitive' if cost_sensitive else 'vanilla'}"
            print(f"Running experiment: {run_name}")

            pipeline_template = pipelines['XGBoost']  # Only XGB

            param_dist = {
                'clf__n_estimators': randint(50, 101),       # 50 to 100
                'clf__max_depth': randint(3, 8),             # 3 to 7
                'clf__learning_rate': uniform(0.01, 0.19),  # 0.01 to 0.2 approx
            }

            def fit_and_score(params):
                fold_scores = []
                fold_start = start_date
                for fold in range(n_folds):
                    train_df, val_df = get_train_val_split(df, fold, fold_start, delta_train, delta_delay, delta_assessment)
                    if train_df.empty or val_df.empty:
                        fold_start += datetime.timedelta(days=delta_assessment)
                        continue

                    X_train = train_df[input_features]
                    y_train = train_df[output_col]
                    X_val = val_df[input_features]
                    y_val = val_df[output_col]

                    model = clone(pipeline_template)
                    model.set_params(**params)

                    # Set scale_pos_weight or sample weights according to imb_method and cost_sensitive
                    if imb_method == 'none':
                        if cost_sensitive:
                            sample_weights = np.where(y_train==1, cost_FN, cost_FP)
                            model.set_params(clf__scale_pos_weight=1)
                            model.fit(X_train, y_train, clf__sample_weight=sample_weights)
                        else:
                            model.fit(X_train, y_train)
                    elif imb_method == 'imbalance_ratio':
                        n_pos = np.sum(y_train==1)
                        n_neg = np.sum(y_train==0)
                        spw = n_neg / max(1,n_pos)
                        spw *= (cost_FP / cost_FN if cost_sensitive else 1)
                        model.set_params(clf__scale_pos_weight=spw)
                        model.fit(X_train, y_train)
                    else:
                        # Use sampling methods
                        sampler = get_sampler(imb_method)
                        X_res, y_res = sampler.fit_resample(X_train, y_train)
                        sample_weights = np.where(y_res==1, cost_FN, cost_FP) if cost_sensitive else None
                        spw = (np.sum(y_res==0)/max(1,np.sum(y_res==1)))*(cost_FP/cost_FN if cost_sensitive else 1)
                        model.set_params(clf__scale_pos_weight=spw)
                        model.fit(X_res, y_res, clf__sample_weight=sample_weights)

                    y_pred_prob = model.predict_proba(X_val)
                    sample_weights_val = np.where(y_val==1, cost_FN, cost_FP) if cost_sensitive else None
                    loss = log_loss(y_val, y_pred_prob, sample_weight=sample_weights_val)
                    fold_scores.append(loss)
                    fold_start += datetime.timedelta(days=delta_assessment)

                return np.mean(fold_scores) if fold_scores else float('inf')

            best_score = float('inf')
            best_params = None
            best_model = None

            for params in list(ParameterSampler(param_dist, n_iter=n_iter, random_state=rng)):
                score = fit_and_score(params)
                print(f"Params {params} -> Score {score:.5f}")
                if score < best_score:
                    best_score = score
                    best_params = params

            # Fit model on full data
            final_model = clone(pipeline_template)
            final_model.set_params(**best_params)
            if imb_method == 'none':
                if cost_sensitive:
                    sample_weights_full = np.where(df[output_col]==1, cost_FN, cost_FP)
                    final_model.set_params(clf__scale_pos_weight=1)
                    final_model.fit(df[input_features], df[output_col], clf__sample_weight=sample_weights_full)
                else:
                    final_model.fit(df[input_features], df[output_col])
            elif imb_method == 'imbalance_ratio':
                n_pos_full = np.sum(df[output_col]==1)
                n_neg_full = np.sum(df[output_col]==0)
                spw_full = n_neg_full / max(1,n_pos_full)
                spw_full *= (cost_FP / cost_FN if cost_sensitive else 1)
                final_model.set_params(clf__scale_pos_weight=spw_full)
                final_model.fit(df[input_features], df[output_col])
            else:
                sampler = get_sampler(imb_method)
                X_res_full, y_res_full = sampler.fit_resample(df[input_features], df[output_col])
                sample_weights_full = np.where(y_res_full==1, cost_FN, cost_FP) if cost_sensitive else None
                spw_full = (np.sum(y_res_full==0)/max(1,np.sum(y_res_full==1)))*(cost_FP/cost_FN if cost_sensitive else 1)
                final_model.set_params(clf__scale_pos_weight=spw_full)
                final_model.fit(X_res_full, y_res_full, clf__sample_weight=sample_weights_full)

            optimized_pipelines[run_name] = {
                'model': final_model,
                'best_params': best_params,
                'best_score': best_score
            }

    return optimized_pipelines

result_pipelines = run_imbalance_and_cost_sensitive_exp(
    transactions_df,
    pipelines,
    input_features,
    'TX_FRAUD',
    imbalance_methods=['imbalance_ratio', 'undersampling', 'oversampling', 'smote', 'none'],
    cost_FP=75,
    cost_FN=1500,
    start_date=datetime.datetime(2018,7,25),
    delta_train=7,
    delta_delay=7,
    delta_assessment=7,
    n_folds=4,
    n_iter=20
)


``` 


Click to expand/hide Python code to plot the 10 ROC curves and 10 precision-recall curves
 ```python
X_test, y_test = test_df[input_features], test_df['TX_FRAUD']

model_name = 'XGBoost'
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score

def plot_roc_pr_curves(pipelines, X_test, y_test):
    plt.figure(figsize=(14, 6))

    # Gather ROC data and sort by decreasing AUC
    roc_data = []
    for name, pipe_info in pipelines.items():
        model = pipe_info['model']  # Extract the actual model
        probs = model.predict_proba(X_test)[:, 1]
        fpr, tpr, _ = roc_curve(y_test, probs)
        roc_auc = auc(fpr, tpr)
        roc_data.append((name, fpr, tpr, roc_auc))
    roc_data.sort(key=lambda x: x[3], reverse=True)

    # Plot ROC Curve
    plt.subplot(1, 2, 1)
    for name, fpr, tpr, roc_auc in roc_data:
        plt.plot(fpr, tpr, label=f'{name} (AUC = {roc_auc:.2f})')
    random_roc_auc = 0.5
    plt.plot([0, 1], [0, 1], 'r--', label=f'Random Guessing (AUC = {random_roc_auc:.2f})')  
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend(loc='lower right')

    # Gather PR data and sort by decreasing AP
    pr_data = []
    for name, pipe_info in pipelines.items():
        model = pipe_info['model']
        probs = model.predict_proba(X_test)[:, 1]
        precision, recall, _ = precision_recall_curve(y_test, probs)
        ap = average_precision_score(y_test, probs)
        pr_data.append((name, recall, precision, ap))
    pr_data.sort(key=lambda x: x[3], reverse=True)

    # Plot Precision-Recall Curve
    plt.subplot(1, 2, 2)
    for name, recall, precision, ap in pr_data:
        plt.plot(recall, precision, label=f'{name} (AP = {ap:.2f})')
    # Add dashed line for random guessing (constant model)
    positive_rate = y_test.mean()
    random_ap = positive_rate
    plt.plot([0, 1], [positive_rate, positive_rate], 'r--', label=f'Random Guessing (AP = {random_ap:.2f})')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    # Place legend in lower left for PR curve with sorted entries
    plt.legend(loc='lower left')

    plt.tight_layout(rect=[0, 0, 0.85, 1])  # Make room on right for ROC legend
    plt.savefig(f"ROC-and-PR-curves-for-{model_name}-w-imb-from-test-data.png", bbox_inches='tight')
    plt.show()

plot_roc_pr_curves(result_pipelines, X_test, y_test)
``` 


ROC and PR curves for XGBoost with imbalance techniques


Click to expand/hide Python code to plot the 10 ROC curves and 10 precision-recall curves
 ```python
import matplotlib.patches as patches

def plot_confidence_ellipse(
    x, y, ax, n_std=1.96, edgecolor='black', **kwargs
):
    if x.size != y.size:
        raise ValueError("x and y must be the same size")
    cov = np.cov(x, y)
    mean_x, mean_y = np.mean(x), np.mean(y)
    eigvals, eigvecs = np.linalg.eigh(cov)
    order = eigvals.argsort()[::-1]
    eigvals, eigvecs = eigvals[order], eigvecs[:, order]
    theta = np.degrees(np.arctan2(*eigvecs[:, 0][::-1]))
    width, height = 2 * n_std * np.sqrt(eigvals)
    ellipse = patches.Ellipse(
        (mean_x, mean_y), width, height, angle=theta,
        edgecolor=edgecolor, facecolor='none', **kwargs
    )
    ax.add_patch(ellipse)
    ax.scatter(mean_x, mean_y, color=edgecolor, s=60, edgecolors='k', zorder=10)

def plot_card_precision_recall_ellipse_by_time_and_model(
    optimized_pipelines,
    X_test,
    y_test,
    A,
    n_a,
    m_a,
    time_unit
):
    time_column_map = {
        'days': 'TX_TIME_DAYS',
        'hours': 'TX_TIME_HOURS',
        'minutes': 'TX_TIME_MINUTES',
        'seconds': 'TX_TIME_SECONDS',
    }
    time_unit_divisor = {
        'days': 30,
        'hours': 30 * 24,
        'minutes': 30 * 24 * 60,
        'seconds': 30 * 24 * 60 * 60
    }
    if time_unit not in time_column_map:
        raise ValueError(f"time_unit must be one of {list(time_column_map.keys())}")

    time_col = time_column_map[time_unit]
    divisor = time_unit_divisor.get(time_unit, 30)
    k_card = max(1, int((A * n_a) / divisor))

    df = pd.DataFrame({
        'time_period': X_test[time_col],
        'y_true': y_test,
    })

    has_customer = 'CUSTOMER_ID' in X_test.columns
    if has_customer:
        df['customer_id'] = X_test['CUSTOMER_ID']

    results_time = []
    drop_cols = time_columns + (['CUSTOMER_ID'] if has_customer else [])

    for name, pipe_info in optimized_pipelines.items():
        model = pipe_info['model']  # Extract the actual model
        X_pred = X_test.drop(columns=drop_cols)
        if hasattr(model, "predict_proba"):
            scores = model.predict_proba(X_pred)[:, 1]
        elif hasattr(model, "decision_function"):
            scores = model.decision_function(X_pred)
        else:
            scores = model.predict(X_pred)
        df['score'] = scores
        for t_val, grp in df.groupby('time_period'):
            if has_customer:
                agg = grp.groupby('customer_id').agg(
                    max_score=('score', 'max'),
                    card_true=('y_true', 'max')
                ).reset_index()
                p_c = card_precision_at_k(agg['card_true'], agg['max_score'], k_card)
                r_c = card_recall_at_k(agg['card_true'], agg['max_score'], k_card)
            else:
                p_c, r_c = np.nan, np.nan
            results_time.append({
                'time_period': t_val,
                'model': name,
                'card_precision@k': p_c,
                'card_recall@k': r_c
            })

    df_time = pd.DataFrame(results_time)
    models = df_time['model'].unique()

    # Collect mean recall and precision, compute distance for sorting
    mean_values = []
    for model in models:
        subset = df_time[df_time['model'] == model]
        mean_recall = np.mean(subset['card_recall@k'])
        mean_precision = np.mean(subset['card_precision@k'])
        dist = np.sqrt(mean_recall**2 + mean_precision**2)
        mean_values.append((model, mean_recall, mean_precision, dist))

    # Sort by distance descending
    mean_values.sort(key=lambda x: x[3], reverse=True)

    cmap = plt.get_cmap('tab10')
    colors = {mv[0]: cmap(i) for i, mv in enumerate(mean_values)}

    fig, ax1 = plt.subplots(1, 1, figsize=(10, 8))
    for model, mean_recall, mean_precision, _ in mean_values:
        subset = df_time[df_time['model'] == model]
        if len(subset) >= 2:
            plot_confidence_ellipse(
                subset['card_recall@k'].values,
                subset['card_precision@k'].values,
                ax1,
                n_std=1.96,
                edgecolor=colors[model]
            )
        ax1.scatter(
            mean_recall,
            mean_precision,
            color=colors[model],
            edgecolors='k',
            s=60,
            zorder=10,
            label=model
        )
        print(f"{model}: recall = {mean_recall:.4f}, precision = {mean_precision:.4f}")

    ax1.set_xlabel(f"CardRecall@{k_card}")
    ax1.set_ylabel(f"CardPrecision@{k_card}")
    ax1.set_title(f"95%-Confidence Ellipse for CardPrecision@{k_card} vs CardRecall@{k_card}, "
                  f"by Model (for a model deployed every {time_unit[:-1]})")
    handles, labels = ax1.get_legend_handles_labels()
    ax1.legend(handles, labels)
    ax1.grid(True)

    plt.savefig("Card-precision-and-recall-at-30-conf-ellipse-for-{model_name}-w-imb.png", bbox_inches='tight')
    plt.tight_layout()
    plt.show()

m_a = 2300                 # Average num txs reviewed per analyst per month (example)
n_a = 2160                 # Average num cards reviewed per analyst per month (example)
A = 10                    # Number of analysts employed 24/7

# Add TX_TIME_HOURS and TX_TIME_MINUTES (// rounds down, i.e. "floor division"), the hours & minutes from midnight on the day of the 1st tx
transactions_df['TX_TIME_MINUTES'] = transactions_df['TX_TIME_SECONDS'] // 60
transactions_df['TX_TIME_HOURS'] = transactions_df['TX_TIME_MINUTES'] // 60

(train_df, test_df)=get_train_test_set(transactions_df,start_date_training,
                                       delta_train=7,delta_delay=7,delta_test=7)

time_columns = ['TX_DATETIME', 'TX_TIME_DAYS', 'TX_TIME_HOURS', 'TX_TIME_MINUTES', 'TX_TIME_SECONDS']

X_test, y_test = test_df[input_features + time_columns + ['CUSTOMER_ID']], test_df['TX_FRAUD']


plot_card_precision_recall_ellipse_by_time_and_model(
    result_pipelines,
    X_test,
    y_test,
    A=A, n_a=n_a, m_a=m_a, time_unit='hours'
)

``` 


Card precision and recall at 30 confidence ellipsoids for XGBoost with imbalance techniques