# src/bdf/repair.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
# Optional SciPy robust stats (preferred), with graceful fallback
try:
from scipy import stats as sps # type: ignore
except Exception:
sps = None # type: ignore
TIME_COL = "Test Time / s"
DEFAULT_OUTLIER_COLS = ("Voltage / V", "Current / A")
__all__ = ["fix_time", "clean", "CleanReport"]
# -----------------------------
# Reporting
# -----------------------------
@dataclass
class CleanReport:
n_rows_in: int
n_rows_out: int
time_method: str
n_time_resets: int
outlier_method: str
z_thresh: float
per_column_outliers: Dict[str, int]
notes: List[str]
def __str__(self) -> str:
lines = [
f"Rows: {self.n_rows_in} -> {self.n_rows_out}",
f"Time fix: {self.time_method} (resets={self.n_time_resets})",
f"Outliers: {self.outlier_method} (z>{self.z_thresh:g})",
]
if self.per_column_outliers:
lines.append("Per-column outliers: " + ", ".join(f"{k}={v}" for k, v in self.per_column_outliers.items()))
if self.notes:
lines.append("Notes:")
lines += [f" - {n}" for n in self.notes]
return "\n".join(lines)
# -----------------------------
# Time helpers
# -----------------------------
def _compute_eps_from_diffs(diffs: np.ndarray) -> float:
"""Auto epsilon = 0.1 * median(positive diffs), floored at 1e-9."""
pos = diffs[diffs > 0]
med = float(np.nanmedian(pos)) if pos.size else 0.0
return max(1e-9, 0.1 * med)
def _median_positive_dt(ts: np.ndarray) -> float:
diffs = np.diff(ts)
pos = diffs[diffs > 0]
if pos.size == 0:
return 1.0
return float(np.nanmedian(pos))
def _fix_time_between_neighbors(t: pd.Series, eps: float | str = "auto") -> Tuple[pd.Series, int]:
"""
Make time monotonic by placing each non-monotonic block strictly between its
two monotonic neighbors. Keeps all rows and preserves ordering.
For a block starting at i where t[i] < t[i-1]-eps and ending before the
first r where t[r] >= t[i-1]+eps, linearly interpolate times for i..r-1
between t[i-1] and t[r]. If no r exists, use median_dt to synthesize a right neighbor.
"""
ts = pd.to_numeric(t, errors="coerce").to_numpy(dtype="float64")
n = ts.size
if n <= 1:
return pd.Series(ts, index=t.index), 0
diffs = np.diff(ts, prepend=ts[0])
eps_val = _compute_eps_from_diffs(diffs) if eps == "auto" else float(eps)
median_dt = _median_positive_dt(ts)
tc = ts.copy()
i = 1
resets = 0
while i < n:
if tc[i] >= tc[i - 1] - eps_val:
i += 1
continue
# start of non-monotonic block
left_time = tc[i - 1]
j = i
# find first index where we recover past left_time (by eps)
while j < n and ts[j] < left_time + eps_val:
j += 1
block_len = j - i
if block_len <= 0:
i += 1
continue
if j < n:
right_time = ts[j]
span = max(right_time - left_time, median_dt * (block_len + 1))
else:
span = median_dt * (block_len + 1)
right_time = left_time + span
step = span / (block_len + 1)
for k in range(block_len):
tc[i + k] = left_time + step * (k + 1)
resets += 1
i = j
return pd.Series(tc, index=t.index), resets
def _fix_time_sort(df: pd.DataFrame) -> pd.DataFrame:
"""Stable sort by time and drop exact duplicate timestamps (keep first)."""
d = df.sort_values(TIME_COL, kind="mergesort").copy()
d = d.loc[~d[TIME_COL].duplicated(keep="first")].reset_index(drop=True)
return d
# -----------------------------
# Outlier helpers (SciPy-aware)
# -----------------------------
def _window_len_from_seconds(time_s: pd.Series, seconds: float, fallback: int = 41) -> int:
t = pd.to_numeric(time_s, errors="coerce")
if t.notna().sum() > 1:
dt = np.median(np.diff(t.dropna().to_numpy()))
if np.isfinite(dt) and dt > 0:
w = int(round(seconds / dt))
if w % 2 == 0:
w += 1 # prefer odd window
return max(5, w)
return fallback
def _global_mad_z(x: np.ndarray) -> tuple[np.ndarray, float, float]:
"""Robust z via MAD (σ ≈ MAD*1.4826). Returns (z, median, madn)."""
med = float(np.nanmedian(x))
if sps is not None:
madn = float(sps.median_abs_deviation(x, nan_policy="omit", scale="normal"))
else:
mad = float(np.nanmedian(np.abs(x - med)))
madn = 1.4826 * mad
if not np.isfinite(madn) or madn <= 0:
return np.zeros_like(x), med, 0.0
return (x - med) / madn, med, madn
def _global_huber_z(x: np.ndarray, c: float = 1.345) -> tuple[np.ndarray, float, float]:
"""
Robust z via Huber M-estimator (requires SciPy). Returns (z, loc, scale).
If SciPy missing or scale <= 0, returns zeros.
"""
if sps is None or not hasattr(sps, "huber"):
return np.zeros_like(x), float("nan"), 0.0
try:
loc, scale = sps.huber(x, c=c)
except Exception:
return np.zeros_like(x), float("nan"), 0.0
if not np.isfinite(scale) or scale <= 0:
return np.zeros_like(x), loc, 0.0
return (x - loc) / scale, loc, scale
def _local_robust_z(s: pd.Series, *, time_s: pd.Series, seconds: float, z: float) -> pd.Series:
"""
Local robust z using rolling IQR (σ ≈ IQR/1.349).
Flags |z_local| > z within the window.
"""
w = _window_len_from_seconds(time_s, seconds)
x = pd.to_numeric(s, errors="coerce")
med = x.rolling(w, center=True, min_periods=max(3, w // 3)).median()
q1 = x.rolling(w, center=True, min_periods=max(3, w // 3)).quantile(0.25)
q3 = x.rolling(w, center=True, min_periods=max(3, w // 3)).quantile(0.75)
sigma = (q3 - q1) / 1.349
rz = (x - med) / sigma.replace(0, np.nan)
return (rz.abs() > z).fillna(False)
def _hampel_mask(s: pd.Series, *, time_s: pd.Series, seconds: float, k: float = 6.0) -> pd.Series:
"""
Hampel filter: rolling median ± k * MADN.
Flags samples deviating more than k scaled MAD from rolling median.
"""
w = _window_len_from_seconds(time_s, seconds)
x = pd.to_numeric(s, errors="coerce")
med = x.rolling(w, center=True, min_periods=max(3, w // 3)).median()
abs_dev = (x - med).abs()
mad = abs_dev.rolling(w, center=True, min_periods=max(3, w // 3)).median()
madn = 1.4826 * mad
return ((x - med).abs() / madn.replace(0, np.nan) > k).fillna(False)
def _slope_mask(s: pd.Series, *, time_s: pd.Series, z: float = 8.0) -> pd.Series:
"""
Slope gate: robust z on derivative ds/dt using global MAD.
Catches single-sample spikes that might pass level-based gates.
"""
x = pd.to_numeric(s, errors="coerce").to_numpy(dtype="float64")
t = pd.to_numeric(time_s, errors="coerce").to_numpy(dtype="float64")
dx = np.diff(x, prepend=np.nan)
dt = np.diff(t, prepend=np.nan)
with np.errstate(divide="ignore", invalid="ignore"):
deriv = dx / dt
zder, _, madn = _global_mad_z(deriv)
m = (np.abs(zder) > z) if madn > 0 else np.zeros_like(deriv, dtype=bool)
m[~np.isfinite(deriv)] = False
return pd.Series(m, index=s.index)
def _robust_outlier_mask(
s: pd.Series,
*,
z_mad: float = 8.0,
z_huber: float = 6.0,
local_seconds: float | None = 600.0, # default ON (10 min)
local_z: float = 6.0,
hampel_seconds: float | None = 300.0, # default ON (5 min)
hampel_k: float = 6.0,
slope_gate: bool = True,
slope_z: float = 8.0,
method: str = "hybrid", # 'mad' | 'huber' | 'hybrid'
min_n: int = 30,
time_s: pd.Series | None = None,
) -> pd.Series:
"""
Robust outlier mask using (global) MAD & optional Huber, plus neighborhood gates:
- Local rolling IQR z
- Hampel filter
- Slope z on derivative
Combine as: (GLOBAL AND (LOCAL OR HAMPEL)) OR SLOPE.
"""
x = pd.to_numeric(s, errors="coerce").to_numpy(dtype="float64", na_value=np.nan)
valid = np.isfinite(x)
if valid.sum() < min_n:
return pd.Series(False, index=s.index)
xv = x.copy()
xv[~valid] = np.nan
z1, _, madn = _global_mad_z(xv)
if method == "mad":
m_global = (np.abs(z1) > z_mad) if madn > 0 else np.zeros_like(x, dtype=bool)
elif method == "huber":
z2, _, scale = _global_huber_z(xv)
m_global = (np.abs(z2) > z_huber) if scale > 0 else np.zeros_like(x, dtype=bool)
else: # 'hybrid'
z2, _, scale = _global_huber_z(xv)
m1 = (np.abs(z1) > z_mad) if madn > 0 else np.zeros_like(x, dtype=bool)
m2 = (np.abs(z2) > z_huber) if scale > 0 else m1 # fall back to MAD if Huber unavailable
m_global = m1 & m2 # conservative: both must agree
# neighborhood gates
m_neigh = None
if time_s is not None:
m_local = _local_robust_z(s, time_s=time_s, seconds=local_seconds, z=local_z) if local_seconds else None
m_hampel = _hampel_mask(s, time_s=time_s, seconds=hampel_seconds, k=hampel_k) if hampel_seconds else None
if m_local is not None and m_hampel is not None:
m_neigh = m_local | m_hampel
elif m_local is not None:
m_neigh = m_local
elif m_hampel is not None:
m_neigh = m_hampel
m = m_global if m_neigh is None else (m_global & m_neigh)
if slope_gate and time_s is not None:
m = m | _slope_mask(s, time_s=time_s, z=slope_z).values
out = np.zeros_like(x, dtype=bool)
out[:] = m
return pd.Series(out, index=s.index)
def _interp_inplace(y: pd.Series, x: Optional[pd.Series]) -> pd.Series:
if x is not None:
yi = pd.to_numeric(y, errors="coerce").astype("float64")
xi = pd.to_numeric(x, errors="coerce")
return yi.interpolate(method="values", x=xi, limit_direction="both")
return y.interpolate(limit_direction="both")
# -----------------------------
# Public API - simple time repair
# -----------------------------
def fix_time(
df: pd.DataFrame,
*,
method: str = "auto", # 'auto'|'segment'|'sort'|'drop'|'recompute'
time_col: str = TIME_COL,
date_col: str = "Date Time ISO",
eps: float | str = "auto",
inplace: bool = False,
) -> pd.DataFrame:
"""
Repair non-monotonic test time.
Methods:
- 'auto': if Date Time ISO exists & usable, recompute from timestamps; else 'segment'.
- 'segment': preserve order; interpolate within each decreasing block.
- 'sort': stable sort by time ascending; drop exact duplicate timestamps.
- 'drop': drop rows where time decreases by more than 'eps'.
- 'recompute': force recompute from Date Time ISO; raises if no valid timestamps.
"""
g = df if inplace else df.copy()
if time_col not in g.columns:
return g
if method in ("auto", "recompute"):
if date_col in g.columns:
t = pd.to_datetime(g[date_col], errors="coerce")
if t.notna().any():
t0 = t[t.notna()].iloc[0]
g[time_col] = (t - t0).dt.total_seconds()
return g
if method == "recompute":
raise ValueError(f"Cannot recompute from '{date_col}': no valid timestamps.")
if method in ("auto", "segment"):
g[time_col], _ = _fix_time_between_neighbors(g[time_col], eps=eps)
return g
if method == "sort":
g.sort_values(by=[time_col], kind="mergesort", inplace=True)
g.drop_duplicates(subset=[time_col], keep="first", inplace=True)
g.reset_index(drop=True, inplace=True)
return g
if method == "drop":
s = pd.to_numeric(g[time_col], errors="coerce")
d = s.diff().fillna(0.0)
if eps == "auto":
eps = _compute_eps_from_diffs(d.to_numpy())
keep = d >= -float(eps)
keep.iloc[0] = True
g = g.loc[keep].reset_index(drop=True)
return g
raise ValueError(f"Unknown method: {method!r}")
# -----------------------------
# Public API - full cleaner
# -----------------------------
[docs]
def clean(
df: pd.DataFrame,
*,
time_fix: str = "segment", # 'segment' | 'sort' | 'drop' | 'none'
outlier: str = "none", # 'none' | 'drop' | 'clip' | 'interp'
z_thresh: float = 8.0, # used for MAD/global & clip bounds
columns: Optional[List[str]] = None, # columns to outlier-clean
time_eps: float | str = "auto", # threshold for detecting time drops
# robust detection knobs
outlier_detect: str = "hybrid", # 'mad' | 'huber' | 'hybrid'
local_seconds: Optional[float] = 600.0, # local window (sec) for neighborhood z (None to disable)
local_z: float = 6.0,
z_huber: float = 6.0,
hampel_seconds: Optional[float] = 300.0,
hampel_k: float = 6.0,
slope_gate: bool = True,
slope_z: float = 8.0,
) -> Tuple[pd.DataFrame, CleanReport]:
"""
Clean a BDF-normalized DataFrame.
- time_fix:
'segment' -> place non-monotonic blocks between neighbors (keeps rows; default)
'sort' -> stable sort by time; drop duplicate timestamps
'drop' -> drop rows where time decreases beyond 'time_eps'
'none' -> leave time as-is
- outlier (action on flagged rows/values):
'drop' -> drop any row where selected columns are flagged as outliers
'clip' -> winsorize flagged values back to robust bounds
'interp' -> replace flagged values with NaN and linearly interpolate
'none' -> no outlier clean
- outlier_detect (how to flag):
'mad' -> global MAD z-score only
'huber' -> global Huber z-score only (SciPy; falls back to MAD if unavailable)
'hybrid' -> BOTH global MAD and Huber must flag (reduces false positives).
- local_seconds / hampel_seconds / slope_gate:
Neighborhood & derivative gates to catch single-sample spikes and suppress
false positives on slow drifts. Combined as: (GLOBAL AND (LOCAL OR HAMPEL)) OR SLOPE.
"""
if TIME_COL not in df.columns:
raise ValueError(f"Missing '{TIME_COL}'. Did you normalize to BDF?")
notes: List[str] = []
d = df.copy()
n_in = len(d)
cols = [c for c in (columns or DEFAULT_OUTLIER_COLS) if c in d.columns]
# ---- Fix time ----
t_numeric = pd.to_numeric(d[TIME_COL], errors="coerce")
diffs = np.diff(t_numeric.to_numpy(dtype="float64"), prepend=t_numeric.iloc[0])
eps_val = _compute_eps_from_diffs(diffs) if time_eps == "auto" else float(time_eps)
n_resets_detected = int((diffs < -eps_val).sum())
if time_fix == "segment":
d[TIME_COL], n_resets_detected = _fix_time_between_neighbors(d[TIME_COL], eps=time_eps)
time_method_used = "segment"
elif time_fix == "sort":
d = _fix_time_sort(d)
time_method_used = "sort"
n_resets_detected = 0
elif time_fix == "drop":
keep = np.concatenate(([True], diffs[1:] >= -eps_val))
dropped = int((~keep).sum())
if dropped:
notes.append(f"Dropped {dropped} rows due to time decreases.")
d = d.loc[keep].reset_index(drop=True)
time_method_used = "drop"
elif time_fix == "none":
time_method_used = "none"
else:
raise ValueError("time_fix must be one of: 'segment','sort','drop','none'")
# Rebase to start at zero if positive
tmin = pd.to_numeric(d[TIME_COL], errors="coerce").min()
if np.isfinite(tmin) and tmin > 0:
d[TIME_COL] = pd.to_numeric(d[TIME_COL], errors="coerce") - float(tmin)
# ---- Outliers ----
per_col: Dict[str, int] = {}
if outlier != "none" and cols:
masks: Dict[str, pd.Series] = {}
for c in cols:
masks[c] = _robust_outlier_mask(
d[c],
z_mad=z_thresh,
z_huber=z_huber,
local_seconds=local_seconds,
local_z=local_z,
hampel_seconds=hampel_seconds,
hampel_k=hampel_k,
slope_gate=slope_gate,
slope_z=slope_z,
method=outlier_detect,
min_n=30,
time_s=d[TIME_COL],
)
per_col[c] = int(masks[c].sum())
if outlier == "drop":
any_bad = (
np.logical_or.reduce([m.values for m in masks.values()]) if masks else np.zeros(len(d), dtype=bool)
)
d = d.loc[~any_bad].reset_index(drop=True)
notes.append(f"Dropped {int(any_bad.sum())} rows due to outliers in {', '.join(cols)}.")
elif outlier == "clip":
# robust bounds via MAD (SciPy if available), fallback to IQR
for c, _m in masks.items():
s = pd.to_numeric(d[c], errors="coerce")
med = float(np.nanmedian(s))
if sps is not None:
madn = float(sps.median_abs_deviation(s.to_numpy(), nan_policy="omit", scale="normal"))
else:
mad = float(np.nanmedian(np.abs(s - med)))
madn = 1.4826 * mad
if madn and madn > 0:
lo, hi = med - z_thresh * madn, med + z_thresh * madn
else:
# fallback to IQR
q1, q3 = np.nanpercentile(s, [25, 75])
iqr = q3 - q1
if iqr == 0:
continue
sigma = iqr / 1.349
lo, hi = med - z_thresh * sigma, med + z_thresh * sigma
d[c] = s.clip(lo, hi)
notes.append("Clipped outliers to robust bounds (MAD/IQR).")
elif outlier == "interp":
tx = pd.to_numeric(d[TIME_COL], errors="coerce")
for c, m in masks.items():
s = pd.to_numeric(d[c], errors="coerce")
s = s.mask(m, np.nan)
d[c] = _interp_inplace(s, tx)
notes.append("Interpolated outliers linearly over time.")
else:
raise ValueError("outlier must be one of: 'none','drop','clip','interp'")
rep = CleanReport(
n_rows_in=n_in,
n_rows_out=len(d),
time_method=time_method_used,
n_time_resets=n_resets_detected,
outlier_method=outlier,
z_thresh=z_thresh,
per_column_outliers=per_col,
notes=notes,
)
return d, rep