Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hashprep/checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
_check_high_missing_values,
_check_missing_patterns,
)
from .mutual_info import _check_low_mutual_information
from .outliers import (
_check_constant_length,
_check_datetime_skew,
Expand Down Expand Up @@ -60,6 +61,7 @@ def _check_dataset_drift(analyzer):
"empty_dataset": _check_empty_dataset,
"normality": _check_normality,
"variance_homogeneity": _check_variance_homogeneity,
"low_mutual_information": _check_low_mutual_information,
}

CORRELATION_CHECKS = {"feature_correlation", "categorical_correlation", "mixed_correlation"}
Expand Down
52 changes: 52 additions & 0 deletions hashprep/checks/mutual_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
Check for features with near-zero mutual information with the target column.
Near-zero MI means the feature carries almost no information about the target
and is likely useless (or worse — noise) for a predictive model.
"""

from ..config import DEFAULT_CONFIG
from ..summaries.mutual_info import summarize_mutual_information
from .core import Issue

_MI = DEFAULT_CONFIG.mutual_info


def _check_low_mutual_information(analyzer) -> list[Issue]:
"""
Flag features whose mutual information with the target column is below
the configured warning threshold. Requires target_col to be set.
"""
if analyzer.target_col is None:
return []

mi_result = summarize_mutual_information(analyzer.df, analyzer.target_col, analyzer.column_types)
if not mi_result or not mi_result.get("scores"):
return []

issues = []
scores = mi_result["scores"]
task = mi_result["task"]

for col, score in scores.items():
if score < _MI.low_mi_warning:
issues.append(
Issue(
category="low_mutual_information",
severity="warning",
column=col,
description=(
f"Column '{col}' has near-zero mutual information with target "
f"'{analyzer.target_col}' (MI={score:.4f} nats, task={task})"
),
impact_score="medium",
quick_fix=(
"Options:\n"
"- Drop feature: Near-zero MI suggests no predictive signal for the target.\n"
"- Investigate interactions: Feature may be useful combined with others.\n"
"- Check encoding: Categorical features may need different encoding.\n"
"- Retain for now: MI is marginal; feature interactions may matter."
),
)
)

return issues
15 changes: 15 additions & 0 deletions hashprep/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ class ImbalanceThresholds:
majority_class_ratio: float = 0.9


@dataclass(frozen=True)
class MutualInfoThresholds:
"""Thresholds for mutual information and entropy checks."""

# MI score below this value (nats) flags a feature as potentially uninformative
low_mi_warning: float = 0.01
# Maximum number of categories to include a column in MI computation
max_categories_for_mi: int = 50
# Minimum number of samples required to compute MI
min_samples_for_mi: int = 20
# Number of bins used to discretize numeric columns when computing entropy
entropy_bins: int = 10


@dataclass(frozen=True)
class StatisticalTestThresholds:
"""Thresholds for normality and variance homogeneity tests."""
Expand Down Expand Up @@ -206,6 +220,7 @@ class HashPrepConfig:
drift: DriftThresholds = field(default_factory=DriftThresholds)
distribution: DistributionThresholds = field(default_factory=DistributionThresholds)
imbalance: ImbalanceThresholds = field(default_factory=ImbalanceThresholds)
mutual_info: MutualInfoThresholds = field(default_factory=MutualInfoThresholds)
statistical_tests: StatisticalTestThresholds = field(default_factory=StatisticalTestThresholds)
datetime: DateTimeThresholds = field(default_factory=DateTimeThresholds)
type_inference: TypeInferenceConfig = field(default_factory=TypeInferenceConfig)
Expand Down
7 changes: 7 additions & 0 deletions hashprep/core/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
summarize_variable_types,
summarize_variables,
)
from ..summaries.mutual_info import summarize_mutual_information
from ..utils.sampling import DatasetSampler, SamplingConfig
from ..utils.type_inference import infer_types
from .visualizations import (
Expand Down Expand Up @@ -61,6 +62,7 @@ class DatasetAnalyzer:
"constant_length",
"normality",
"variance_homogeneity",
"low_mutual_information",
]

def __init__(
Expand Down Expand Up @@ -125,6 +127,11 @@ def analyze(self) -> dict:
self.summaries.update(summarize_interactions(self.df))
self.summaries.update(summarize_missing_values(self.df))

if self.target_col is not None:
mi_result = summarize_mutual_information(self.df, self.target_col, self.column_types)
if mi_result:
self.summaries["mutual_information"] = mi_result

if self.sampler:
self.summaries["sampling_info"] = self.sampler.get_sampling_info()

Expand Down
1 change: 1 addition & 0 deletions hashprep/summaries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
)
from .interactions import summarize_interactions as summarize_interactions
from .missing import summarize_missing_values as summarize_missing_values
from .mutual_info import summarize_mutual_information as summarize_mutual_information
from .variables import summarize_variables as summarize_variables
103 changes: 103 additions & 0 deletions hashprep/summaries/mutual_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""
Mutual information between each feature and the target column.

Uses sklearn's mutual_info_classif (categorical target) or
mutual_info_regression (numeric target). Categorical features are
label-encoded before scoring.
"""

import pandas as pd
from sklearn.feature_selection import mutual_info_classif, mutual_info_regression
from sklearn.preprocessing import LabelEncoder

from ..config import DEFAULT_CONFIG
from ..utils.logging import get_logger

_log = get_logger("summaries.mutual_info")
_MI = DEFAULT_CONFIG.mutual_info


def summarize_mutual_information(
df: pd.DataFrame,
target_col: str,
column_types: dict[str, str],
) -> dict:
"""
Compute mutual information between every feature and the target column.

Returns a dict:
{
"target": target_col,
"task": "classification" | "regression",
"scores": {col: mi_score, ...}, # nats, sorted descending
}
or an empty dict when MI cannot be computed (too few samples, bad target, etc.).
"""
if target_col not in df.columns:
return {}

target_type = column_types.get(target_col, "Unsupported")
n = len(df.dropna(subset=[target_col]))
if n < _MI.min_samples_for_mi:
return {}

# Determine task type
if target_type in ("Numeric",):
task = "regression"
mi_fn = mutual_info_regression
else:
task = "classification"
mi_fn = mutual_info_classif

# Build feature matrix — include Numeric and low-cardinality Categorical cols
feature_cols = []
discrete_mask = []

for col in df.columns:
if col == target_col:
continue
typ = column_types.get(col, "Unsupported")
if typ == "Numeric":
feature_cols.append(col)
discrete_mask.append(False)
elif typ == "Categorical" and df[col].nunique() <= _MI.max_categories_for_mi:
feature_cols.append(col)
discrete_mask.append(True)

if not feature_cols:
return {}

# Build X: label-encode categoricals, drop rows missing target
sub = df[feature_cols + [target_col]].dropna(subset=[target_col])
X = sub[feature_cols].copy()

for col, is_discrete in zip(feature_cols, discrete_mask):
if is_discrete:
le = LabelEncoder()
filled = X[col].fillna("__missing__").astype(str)
X[col] = le.fit_transform(filled)
else:
X[col] = X[col].fillna(X[col].median())

y_raw = sub[target_col]
if task == "classification":
le_y = LabelEncoder()
y = le_y.fit_transform(y_raw.fillna("__missing__").astype(str))
else:
y = y_raw.values

try:
mi_scores = mi_fn(X.values, y, discrete_features=discrete_mask, random_state=0)
except Exception as e:
_log.debug("Mutual information computation failed: %s", e)
return {}

scores = {col: float(score) for col, score in zip(feature_cols, mi_scores)}
# Sort descending by MI score
scores = dict(sorted(scores.items(), key=lambda kv: kv[1], reverse=True))

return {
"target": target_col,
"task": task,
"scores": scores,
}
28 changes: 28 additions & 0 deletions hashprep/summaries/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,32 @@

_SUMMARY = DEFAULT_CONFIG.summaries
_ST = DEFAULT_CONFIG.statistical_tests
_MI = DEFAULT_CONFIG.mutual_info


def _shannon_entropy(series: pd.Series, bins: int | None = None) -> dict | None:
"""
Compute Shannon entropy (bits) for a series.
- Categorical/text: uses value-count probabilities directly.
- Numeric: discretises into `bins` equal-width bins first.
Returns a dict with 'entropy_bits' and 'normalized_entropy' (0–1),
or None when there are fewer than 2 distinct values.
"""
if series.empty:
return None
if bins is not None:
# Discretise numeric series into bins
try:
series = pd.cut(series, bins=bins, labels=False, duplicates="drop")
except Exception:
return None
probs = series.dropna().value_counts(normalize=True)
if len(probs) < 2:
return None
entropy_bits = float(-np.sum(probs * np.log2(probs)))
max_entropy = float(np.log2(len(probs)))
normalized = entropy_bits / max_entropy if max_entropy > 0 else 0.0
return {"entropy_bits": entropy_bits, "normalized_entropy": normalized}


def get_monotonicity(series: pd.Series) -> str:
Expand Down Expand Up @@ -159,6 +185,7 @@ def _summarize_numeric(df, col):
"common_values": common_values,
"extreme_values": extremes,
"normality": normality,
"entropy": _shannon_entropy(finite, bins=_MI.entropy_bins),
}
return stats

Expand Down Expand Up @@ -341,6 +368,7 @@ def _summarize_categorical(df, col):
},
"words": text_summary["words"],
"characters": text_summary["characters"],
"entropy": _shannon_entropy(series),
}
return stats

Expand Down
Loading