diff --git a/hashprep/checks/__init__.py b/hashprep/checks/__init__.py index cf63f83..9e5daaa 100644 --- a/hashprep/checks/__init__.py +++ b/hashprep/checks/__init__.py @@ -12,6 +12,7 @@ _check_high_missing_values, _check_missing_patterns, ) +from .mutual_info import _check_low_mutual_information from .outliers import ( _check_constant_length, _check_datetime_skew, @@ -60,6 +61,7 @@ def _check_dataset_drift(analyzer): "empty_dataset": _check_empty_dataset, "normality": _check_normality, "variance_homogeneity": _check_variance_homogeneity, + "low_mutual_information": _check_low_mutual_information, } CORRELATION_CHECKS = {"feature_correlation", "categorical_correlation", "mixed_correlation"} diff --git a/hashprep/checks/mutual_info.py b/hashprep/checks/mutual_info.py new file mode 100644 index 0000000..625b69c --- /dev/null +++ b/hashprep/checks/mutual_info.py @@ -0,0 +1,52 @@ +""" +Check for features with near-zero mutual information with the target column. +Near-zero MI means the feature carries almost no information about the target +and is likely useless (or worse — noise) for a predictive model. +""" + +from ..config import DEFAULT_CONFIG +from ..summaries.mutual_info import summarize_mutual_information +from .core import Issue + +_MI = DEFAULT_CONFIG.mutual_info + + +def _check_low_mutual_information(analyzer) -> list[Issue]: + """ + Flag features whose mutual information with the target column is below + the configured warning threshold. Requires target_col to be set. + """ + if analyzer.target_col is None: + return [] + + mi_result = summarize_mutual_information(analyzer.df, analyzer.target_col, analyzer.column_types) + if not mi_result or not mi_result.get("scores"): + return [] + + issues = [] + scores = mi_result["scores"] + task = mi_result["task"] + + for col, score in scores.items(): + if score < _MI.low_mi_warning: + issues.append( + Issue( + category="low_mutual_information", + severity="warning", + column=col, + description=( + f"Column '{col}' has near-zero mutual information with target " + f"'{analyzer.target_col}' (MI={score:.4f} nats, task={task})" + ), + impact_score="medium", + quick_fix=( + "Options:\n" + "- Drop feature: Near-zero MI suggests no predictive signal for the target.\n" + "- Investigate interactions: Feature may be useful combined with others.\n" + "- Check encoding: Categorical features may need different encoding.\n" + "- Retain for now: MI is marginal; feature interactions may matter." + ), + ) + ) + + return issues diff --git a/hashprep/config.py b/hashprep/config.py index 8305e3a..805eb3b 100644 --- a/hashprep/config.py +++ b/hashprep/config.py @@ -126,6 +126,20 @@ class ImbalanceThresholds: majority_class_ratio: float = 0.9 +@dataclass(frozen=True) +class MutualInfoThresholds: + """Thresholds for mutual information and entropy checks.""" + + # MI score below this value (nats) flags a feature as potentially uninformative + low_mi_warning: float = 0.01 + # Maximum number of categories to include a column in MI computation + max_categories_for_mi: int = 50 + # Minimum number of samples required to compute MI + min_samples_for_mi: int = 20 + # Number of bins used to discretize numeric columns when computing entropy + entropy_bins: int = 10 + + @dataclass(frozen=True) class StatisticalTestThresholds: """Thresholds for normality and variance homogeneity tests.""" @@ -206,6 +220,7 @@ class HashPrepConfig: drift: DriftThresholds = field(default_factory=DriftThresholds) distribution: DistributionThresholds = field(default_factory=DistributionThresholds) imbalance: ImbalanceThresholds = field(default_factory=ImbalanceThresholds) + mutual_info: MutualInfoThresholds = field(default_factory=MutualInfoThresholds) statistical_tests: StatisticalTestThresholds = field(default_factory=StatisticalTestThresholds) datetime: DateTimeThresholds = field(default_factory=DateTimeThresholds) type_inference: TypeInferenceConfig = field(default_factory=TypeInferenceConfig) diff --git a/hashprep/core/analyzer.py b/hashprep/core/analyzer.py index 2c44047..a7fe3a2 100644 --- a/hashprep/core/analyzer.py +++ b/hashprep/core/analyzer.py @@ -17,6 +17,7 @@ summarize_variable_types, summarize_variables, ) +from ..summaries.mutual_info import summarize_mutual_information from ..utils.sampling import DatasetSampler, SamplingConfig from ..utils.type_inference import infer_types from .visualizations import ( @@ -61,6 +62,7 @@ class DatasetAnalyzer: "constant_length", "normality", "variance_homogeneity", + "low_mutual_information", ] def __init__( @@ -125,6 +127,11 @@ def analyze(self) -> dict: self.summaries.update(summarize_interactions(self.df)) self.summaries.update(summarize_missing_values(self.df)) + if self.target_col is not None: + mi_result = summarize_mutual_information(self.df, self.target_col, self.column_types) + if mi_result: + self.summaries["mutual_information"] = mi_result + if self.sampler: self.summaries["sampling_info"] = self.sampler.get_sampling_info() diff --git a/hashprep/summaries/__init__.py b/hashprep/summaries/__init__.py index fb3da47..761638c 100644 --- a/hashprep/summaries/__init__.py +++ b/hashprep/summaries/__init__.py @@ -18,4 +18,5 @@ ) from .interactions import summarize_interactions as summarize_interactions from .missing import summarize_missing_values as summarize_missing_values +from .mutual_info import summarize_mutual_information as summarize_mutual_information from .variables import summarize_variables as summarize_variables diff --git a/hashprep/summaries/mutual_info.py b/hashprep/summaries/mutual_info.py new file mode 100644 index 0000000..514e79d --- /dev/null +++ b/hashprep/summaries/mutual_info.py @@ -0,0 +1,103 @@ +""" +Mutual information between each feature and the target column. + +Uses sklearn's mutual_info_classif (categorical target) or +mutual_info_regression (numeric target). Categorical features are +label-encoded before scoring. +""" + +import pandas as pd +from sklearn.feature_selection import mutual_info_classif, mutual_info_regression +from sklearn.preprocessing import LabelEncoder + +from ..config import DEFAULT_CONFIG +from ..utils.logging import get_logger + +_log = get_logger("summaries.mutual_info") +_MI = DEFAULT_CONFIG.mutual_info + + +def summarize_mutual_information( + df: pd.DataFrame, + target_col: str, + column_types: dict[str, str], +) -> dict: + """ + Compute mutual information between every feature and the target column. + + Returns a dict: + { + "target": target_col, + "task": "classification" | "regression", + "scores": {col: mi_score, ...}, # nats, sorted descending + } + or an empty dict when MI cannot be computed (too few samples, bad target, etc.). + """ + if target_col not in df.columns: + return {} + + target_type = column_types.get(target_col, "Unsupported") + n = len(df.dropna(subset=[target_col])) + if n < _MI.min_samples_for_mi: + return {} + + # Determine task type + if target_type in ("Numeric",): + task = "regression" + mi_fn = mutual_info_regression + else: + task = "classification" + mi_fn = mutual_info_classif + + # Build feature matrix — include Numeric and low-cardinality Categorical cols + feature_cols = [] + discrete_mask = [] + + for col in df.columns: + if col == target_col: + continue + typ = column_types.get(col, "Unsupported") + if typ == "Numeric": + feature_cols.append(col) + discrete_mask.append(False) + elif typ == "Categorical" and df[col].nunique() <= _MI.max_categories_for_mi: + feature_cols.append(col) + discrete_mask.append(True) + + if not feature_cols: + return {} + + # Build X: label-encode categoricals, drop rows missing target + sub = df[feature_cols + [target_col]].dropna(subset=[target_col]) + X = sub[feature_cols].copy() + + for col, is_discrete in zip(feature_cols, discrete_mask): + if is_discrete: + le = LabelEncoder() + filled = X[col].fillna("__missing__").astype(str) + X[col] = le.fit_transform(filled) + else: + X[col] = X[col].fillna(X[col].median()) + + y_raw = sub[target_col] + if task == "classification": + le_y = LabelEncoder() + y = le_y.fit_transform(y_raw.fillna("__missing__").astype(str)) + else: + y = y_raw.values + + try: + mi_scores = mi_fn(X.values, y, discrete_features=discrete_mask, random_state=0) + except Exception as e: + _log.debug("Mutual information computation failed: %s", e) + return {} + + scores = {col: float(score) for col, score in zip(feature_cols, mi_scores)} + # Sort descending by MI score + scores = dict(sorted(scores.items(), key=lambda kv: kv[1], reverse=True)) + + return { + "target": target_col, + "task": task, + "scores": scores, + } diff --git a/hashprep/summaries/variables.py b/hashprep/summaries/variables.py index afac225..d87a14b 100644 --- a/hashprep/summaries/variables.py +++ b/hashprep/summaries/variables.py @@ -10,6 +10,32 @@ _SUMMARY = DEFAULT_CONFIG.summaries _ST = DEFAULT_CONFIG.statistical_tests +_MI = DEFAULT_CONFIG.mutual_info + + +def _shannon_entropy(series: pd.Series, bins: int | None = None) -> dict | None: + """ + Compute Shannon entropy (bits) for a series. + - Categorical/text: uses value-count probabilities directly. + - Numeric: discretises into `bins` equal-width bins first. + Returns a dict with 'entropy_bits' and 'normalized_entropy' (0–1), + or None when there are fewer than 2 distinct values. + """ + if series.empty: + return None + if bins is not None: + # Discretise numeric series into bins + try: + series = pd.cut(series, bins=bins, labels=False, duplicates="drop") + except Exception: + return None + probs = series.dropna().value_counts(normalize=True) + if len(probs) < 2: + return None + entropy_bits = float(-np.sum(probs * np.log2(probs))) + max_entropy = float(np.log2(len(probs))) + normalized = entropy_bits / max_entropy if max_entropy > 0 else 0.0 + return {"entropy_bits": entropy_bits, "normalized_entropy": normalized} def get_monotonicity(series: pd.Series) -> str: @@ -159,6 +185,7 @@ def _summarize_numeric(df, col): "common_values": common_values, "extreme_values": extremes, "normality": normality, + "entropy": _shannon_entropy(finite, bins=_MI.entropy_bins), } return stats @@ -341,6 +368,7 @@ def _summarize_categorical(df, col): }, "words": text_summary["words"], "characters": text_summary["characters"], + "entropy": _shannon_entropy(series), } return stats diff --git a/tests/test_mutual_info.py b/tests/test_mutual_info.py new file mode 100644 index 0000000..0aa0e1c --- /dev/null +++ b/tests/test_mutual_info.py @@ -0,0 +1,275 @@ +"""Tests for mutual information, entropy, and the low_mutual_information check.""" + +import numpy as np +import pandas as pd +import pytest + +from hashprep import DatasetAnalyzer +from hashprep.checks.mutual_info import _check_low_mutual_information +from hashprep.summaries.mutual_info import summarize_mutual_information +from hashprep.summaries.variables import _summarize_categorical, _summarize_numeric +from hashprep.utils.type_inference import infer_types + +rng = np.random.default_rng(0) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class _FakeAnalyzer: + def __init__(self, df, target_col=None): + self.df = df + self.target_col = target_col + self.column_types = infer_types(df) + + +# --------------------------------------------------------------------------- +# Shannon entropy in variable summaries +# --------------------------------------------------------------------------- + + +class TestEntropyInSummaries: + def test_categorical_has_entropy(self): + df = pd.DataFrame({"cat": ["A", "B", "C", "D"] * 25}) + result = _summarize_categorical(df, "cat") + assert "entropy" in result + assert result["entropy"] is not None + + def test_uniform_categorical_max_entropy(self): + # 4 equally likely classes → entropy = log2(4) = 2 bits + df = pd.DataFrame({"cat": ["A", "B", "C", "D"] * 50}) + ent = _summarize_categorical(df, "cat")["entropy"] + assert abs(ent["entropy_bits"] - 2.0) < 0.01 + assert abs(ent["normalized_entropy"] - 1.0) < 0.01 + + def test_constant_categorical_entropy_none(self): + df = pd.DataFrame({"cat": ["A"] * 50}) + ent = _summarize_categorical(df, "cat")["entropy"] + assert ent is None + + def test_skewed_categorical_lower_entropy(self): + # 90% A, 10% B → lower entropy than uniform + df = pd.DataFrame({"cat": ["A"] * 90 + ["B"] * 10}) + ent_skewed = _summarize_categorical(df, "cat")["entropy"]["entropy_bits"] + df_uniform = pd.DataFrame({"cat": ["A"] * 50 + ["B"] * 50}) + ent_uniform = _summarize_categorical(df_uniform, "cat")["entropy"]["entropy_bits"] + assert ent_skewed < ent_uniform + + def test_numeric_has_entropy(self): + df = pd.DataFrame({"x": rng.standard_normal(200)}) + result = _summarize_numeric(df, "x") + assert "entropy" in result + assert result["entropy"] is not None + + def test_numeric_entropy_fields(self): + df = pd.DataFrame({"x": rng.uniform(0, 1, 200)}) + ent = _summarize_numeric(df, "x")["entropy"] + assert "entropy_bits" in ent + assert "normalized_entropy" in ent + assert 0.0 <= ent["normalized_entropy"] <= 1.0 + + def test_numeric_constant_entropy_none(self): + df = pd.DataFrame({"x": [5.0] * 50}) + result = _summarize_numeric(df, "x") + # constant column: normality is None; entropy is also None (only 1 bin) + assert result["entropy"] is None + + +# --------------------------------------------------------------------------- +# summarize_mutual_information +# --------------------------------------------------------------------------- + + +class TestSummarizeMutualInformation: + def _classification_df(self, n=200): + x_signal = rng.standard_normal(n) + x_noise = rng.standard_normal(n) + y = (x_signal > 0).astype(int) + return pd.DataFrame({"signal": x_signal, "noise": x_noise, "target": y}) + + def _regression_df(self, n=200): + x_signal = rng.standard_normal(n) + x_noise = rng.standard_normal(n) + y = x_signal * 3 + rng.standard_normal(n) * 0.1 + return pd.DataFrame({"signal": x_signal, "noise": x_noise, "target": y}) + + def test_returns_dict_with_scores(self): + df = self._classification_df() + types = infer_types(df) + result = summarize_mutual_information(df, "target", types) + assert "scores" in result + assert "signal" in result["scores"] + assert "noise" in result["scores"] + + def test_classification_task_detected(self): + df = self._classification_df() + types = infer_types(df) + result = summarize_mutual_information(df, "target", types) + assert result["task"] == "classification" + + def test_regression_task_detected(self): + df = self._regression_df() + types = infer_types(df) + result = summarize_mutual_information(df, "target", types) + assert result["task"] == "regression" + + def test_signal_higher_mi_than_noise(self): + df = self._classification_df() + types = infer_types(df) + scores = summarize_mutual_information(df, "target", types)["scores"] + assert scores["signal"] > scores["noise"] + + def test_scores_sorted_descending(self): + df = self._classification_df() + types = infer_types(df) + scores = summarize_mutual_information(df, "target", types)["scores"] + vals = list(scores.values()) + assert vals == sorted(vals, reverse=True) + + def test_scores_non_negative(self): + df = self._classification_df() + types = infer_types(df) + scores = summarize_mutual_information(df, "target", types)["scores"] + assert all(v >= 0 for v in scores.values()) + + def test_missing_target_col_returns_empty(self): + df = pd.DataFrame({"x": rng.standard_normal(100)}) + types = infer_types(df) + assert summarize_mutual_information(df, "nonexistent", types) == {} + + def test_too_few_samples_returns_empty(self): + df = pd.DataFrame({"x": [1.0, 2.0, 3.0], "target": [0, 1, 0]}) + types = infer_types(df) + assert summarize_mutual_information(df, "target", types) == {} + + def test_categorical_features_included(self): + df = pd.DataFrame( + { + "cat": ["A", "B"] * 100, + "num": rng.standard_normal(200), + "target": rng.integers(0, 2, 200), + } + ) + types = infer_types(df) + result = summarize_mutual_information(df, "target", types) + assert "cat" in result.get("scores", {}) + + def test_high_cardinality_cat_excluded(self): + # 300 unique categories → should be excluded from MI + df = pd.DataFrame( + { + "high_card": [f"cat_{i}" for i in range(300)], + "num": rng.standard_normal(300), + "target": rng.integers(0, 2, 300), + } + ) + types = infer_types(df) + result = summarize_mutual_information(df, "target", types) + assert "high_card" not in result.get("scores", {}) + + +# --------------------------------------------------------------------------- +# low_mutual_information check +# --------------------------------------------------------------------------- + + +class TestLowMutualInformationCheck: + def test_no_target_returns_empty(self): + df = pd.DataFrame({"x": rng.standard_normal(100)}) + issues = _check_low_mutual_information(_FakeAnalyzer(df, target_col=None)) + assert issues == [] + + def test_noise_feature_flagged(self): + # Independent seed + n=2000 so the KNN estimator reliably gives noise ≈ 0 + _rng = np.random.default_rng(123) + n = 2000 + x_signal = _rng.standard_normal(n) + noise = _rng.standard_normal(n) + target = (x_signal > 0).astype(int) + df = pd.DataFrame({"signal": x_signal, "noise": noise, "target": target}) + issues = _check_low_mutual_information(_FakeAnalyzer(df, target_col="target")) + flagged = [i.column for i in issues] + assert "noise" in flagged + + def test_strong_signal_not_flagged(self): + x = rng.standard_normal(300) + target = (x > 0).astype(int) + df = pd.DataFrame({"signal": x, "target": target}) + issues = _check_low_mutual_information(_FakeAnalyzer(df, target_col="target")) + flagged = [i.column for i in issues] + assert "signal" not in flagged + + def test_issue_fields_correct(self): + noise = rng.standard_normal(300) + target = rng.integers(0, 2, 300) + df = pd.DataFrame({"noise": noise, "target": target}) + issues = _check_low_mutual_information(_FakeAnalyzer(df, target_col="target")) + if issues: + issue = issues[0] + assert issue.category == "low_mutual_information" + assert issue.severity == "warning" + assert "MI=" in issue.description + + def test_target_col_not_flagged_against_itself(self): + df = pd.DataFrame({"x": rng.standard_normal(100), "target": rng.integers(0, 2, 100)}) + issues = _check_low_mutual_information(_FakeAnalyzer(df, target_col="target")) + assert all(i.column != "target" for i in issues) + + +# --------------------------------------------------------------------------- +# Integration: DatasetAnalyzer end-to-end +# --------------------------------------------------------------------------- + + +class TestMutualInfoIntegration: + def test_mi_summary_present_when_target_set(self): + x = rng.standard_normal(200) + df = pd.DataFrame({"x": x, "target": (x > 0).astype(int)}) + analyzer = DatasetAnalyzer(df, target_col="target", auto_sample=False) + summary = analyzer.analyze() + assert "mutual_information" in summary["summaries"] + mi = summary["summaries"]["mutual_information"] + assert "scores" in mi + assert "x" in mi["scores"] + + def test_mi_summary_absent_when_no_target(self): + df = pd.DataFrame({"x": rng.standard_normal(100)}) + analyzer = DatasetAnalyzer(df, auto_sample=False) + summary = analyzer.analyze() + assert "mutual_information" not in summary["summaries"] + + def test_low_mi_check_runs_in_full_analysis(self): + # Independent seed so this test is not sensitive to rng state from prior tests + _rng = np.random.default_rng(999) + n = 2000 + signal = _rng.standard_normal(n) + noise = _rng.standard_normal(n) + target = (signal > 0).astype(int) + df = pd.DataFrame({"signal": signal, "noise": noise, "target": target}) + analyzer = DatasetAnalyzer( + df, target_col="target", selected_checks=["low_mutual_information"], auto_sample=False + ) + summary = analyzer.analyze() + categories = [i["category"] for i in summary["issues"]] + assert "low_mutual_information" in categories + + def test_entropy_in_variable_summary(self): + df = pd.DataFrame({"cat": ["A", "B", "C"] * 50, "num": rng.standard_normal(150)}) + analyzer = DatasetAnalyzer(df, auto_sample=False) + summary = analyzer.analyze() + cat_var = summary["summaries"]["variables"]["cat"] + num_var = summary["summaries"]["variables"]["num"] + assert "entropy" in cat_var + assert "entropy" in num_var + + def test_low_mi_in_all_checks(self): + assert "low_mutual_information" in DatasetAnalyzer.ALL_CHECKS + + @pytest.mark.parametrize("check", ["low_mutual_information"]) + def test_check_selectable(self, check): + df = pd.DataFrame({"x": rng.standard_normal(100), "target": rng.integers(0, 2, 100)}) + analyzer = DatasetAnalyzer(df, target_col="target", selected_checks=[check], auto_sample=False) + summary = analyzer.analyze() + assert "issues" in summary