clophfit.fitting.utils ====================== .. py:module:: clophfit.fitting.utils .. autoapi-nested-parse:: Utility functions for fitting modules. Functions --------- .. autoapisummary:: clophfit.fitting.utils.parse_remove_outliers clophfit.fitting.utils.identify_outliers_zscore clophfit.fitting.utils.reweight_from_residuals clophfit.fitting.utils.flag_trend_outliers clophfit.fitting.utils.fit_trendline clophfit.fitting.utils.smoothness clophfit.fitting.utils.roughness clophfit.fitting.utils.outlier_scores_extended clophfit.fitting.utils.apply_outlier_mask clophfit.fitting.utils.assign_error_model clophfit.fitting.utils.fit_rel_error_from_residuals clophfit.fitting.utils.fit_noise_model_from_residuals clophfit.fitting.utils.fit_gain_and_rel_error_from_residuals Module Contents --------------- .. py:function:: parse_remove_outliers(spec) Parse outlier specification ``"method:threshold:min_keep"``. :param spec: The string to parse. :type spec: str :returns: A tuple of `method`, `threshold`, `min_keep`. :rtype: tuple[str, float, int] .. rubric:: Examples - ``"zscore:2.5:5"`` -> ("zscore", 2.5, 5) - ``"method"`` -> ("method", 2.0, 1) .. py:function:: identify_outliers_zscore(residuals, threshold = 2.0) Identify outliers using the Z-score method on a 1D array of residuals. :param residuals: The residuals to analyze. :type residuals: np.ndarray :param threshold: The Z-score threshold beyond which a point is considered an outlier. :type threshold: float :returns: A boolean mask where True indicates an outlier. :rtype: ArrayMask .. py:function:: reweight_from_residuals(ds, residuals) Update y_errc in a Dataset from the mean absolute residuals of each label. :param ds: The input dataset. :type ds: Dataset :param residuals: The combined 1D array of residuals for all labels in the dataset, in the order of ds.values(). :type residuals: np.ndarray :returns: A new dataset with updated y_err. :rtype: Dataset .. py:function:: flag_trend_outliers(x, y, threshold = 3.0) Flag outliers using robust Theil-Sen regression of y on x. A point is flagged if its residual is far from the trendline (Z-score < -threshold) OR if its x-value is extremely low compared to the population (Z-score < -threshold). :param x: The independent variable (e.g., maximum signal, mean). :type x: pd.Series :param y: The dependent variable (e.g., signal span, std, or dynamic range). :type y: pd.Series :param threshold: The Z-score threshold for flagging an outlier. :type threshold: float :returns: A boolean Series of the same length as x, True for outliers. :rtype: pd.Series .. py:function:: fit_trendline(x, y) Fit a robust Theil-Sen regression line. :param x: The independent variable. :type x: pd.Series :param y: The dependent variable. :type y: pd.Series :returns: Slope and intercept. :rtype: tuple[float, float] .. py:function:: smoothness(y) Calculate the smoothness of a curve. Sum of \|consecutive diffs\| / total span. = 1 for perfectly monotone, > 1 for noisy/non-monotone. :param y: The signal array. :type y: np.ndarray :returns: The smoothness value. :rtype: float .. py:function:: roughness(y) Calculate the roughness of a curve. Excess path fraction: 0 = perfectly monotone, 1 = all noise, flat-safe. roughness = (consec - span) / consec. :param y: The signal array. :type y: np.ndarray :returns: The roughness value. :rtype: float .. py:function:: outlier_scores_extended(x, y) Compute outlier scores for each point using geometric deviation. Uses a hybrid approach for edge points: - If |edge_step| > 2 * |local_step|: anomalously large jump → use full projection deviation - Elif wrong direction (reversal): use projection deviation - Else (correct direction / plateau approach): score = 0 For internal points: triangle inequality score. :param x: x-values (e.g. pH or concentration). :type x: np.ndarray :param y: Observed y-values. :type y: np.ndarray :returns: Per-point outlier scores (non-negative; higher = more anomalous). :rtype: np.ndarray .. rubric:: Examples >>> import numpy as np >>> x = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) >>> y = np.array([10.0, 8.0, 15.0, 4.0, 2.0]) >>> scores = outlier_scores_extended(x, y) >>> bool(scores[2] > 0.4) True .. py:function:: apply_outlier_mask(ds, threshold = 0.2, min_keep = 3) Mask outlier points iteratively in each DataArray of a Dataset. Removes the single worst outlier (if above threshold) and recomputes scores, repeating until no score exceeds the threshold or fewer than min_keep unmasked points remain. :param ds: Dataset to process (deep-copied; input is not modified). :type ds: Dataset :param threshold: Outlier score above which a point is masked. Default is 0.2. :type threshold: float, optional :param min_keep: Minimum number of unmasked points to retain. Default is 3. :type min_keep: int, optional :returns: A new Dataset with outlier points masked. :rtype: Dataset .. py:function:: assign_error_model(ds, sigma_floor = 1.0, gain = None, rel_error = 0.03) Assign heteroscedastic weights based on a physical detector noise model. Supports two model variants depending on which parameters are supplied: - **Full model** (``gain`` provided): ``sigma_i = sqrt(floor² + gain * max(y_i, 0) + (rel_error * y_i)²)`` - **Simplified model** (``gain=0`` or omitted, proportional-only): ``sigma_i = sqrt(floor² + (rel_error * y_i)²)`` When *gain* is ``None`` a per-label heuristic is used (y1: 1.8, y2: 0.7). Pass ``gain=0`` explicitly to use the simplified proportional model. :param ds: The dataset to update. :type ds: Dataset :param sigma_floor: Baseline noise floor. Can be a single value or a per-label dict (e.g. ``{f"y{lbl}": float(np.mean(v)) for lbl, v in tit.bg_noise.items()}``). :type sigma_floor: float | ArrayF | dict[str, float | ArrayF] :param gain: Poisson shot-noise scaling factor. ``None`` -> per-label defaults (y1: 1.8, y2: 0.7). Pass ``0`` to disable the Poisson term entirely. :type gain: float | dict[str, float] | None, optional :param rel_error: Proportional error coefficient. Can be a per-label dict when estimated separately per label (e.g. from :func:`fit_rel_error_from_residuals`). Default is 0.03 (3 %). :type rel_error: float | dict[str, float], optional :returns: A deep copy with physically modelled ``y_errc`` weights. :rtype: Dataset .. rubric:: Examples >>> import numpy as np >>> from clophfit.fitting.data_structures import Dataset, DataArray >>> y = np.array([100.0, 200.0, 300.0]) >>> da = DataArray(xc=np.array([1.0, 2.0, 3.0]), yc=y, y_errc=np.ones_like(y)) >>> ds = Dataset({"y1": da}) >>> ds_new = assign_error_model(ds, sigma_floor=10.0, gain=0.0, rel_error=0.05) >>> np.round(ds_new["y1"].y_errc, 2) array([11.18, 14.14, 18.03]) .. py:function:: fit_rel_error_from_residuals(df, sigma_floor) Estimate proportional error (alpha) per label via moment estimator. Assumes the simplified noise model ``sigma^2 = floor^2 + alpha^2 * that^2`` (no Poisson gain term). With ``floor`` known from buffer measurements and using model-predicted values ``that`` in the denominator to avoid noise-in-variables bias, the closed-form moment estimator is: .. math:: \hat{\alpha}^2 = \frac{\overline{r^2} - \sigma_{\text{floor}}^2}{\overline{\hat{y}^2}} :param df: DataFrame with columns ``label`` (str), ``resid_raw`` (float), and ``predicted`` (float -- the model-predicted signal at each point). Typically from :func:`clophfit.fitting.residuals.collect_multi_residuals`. :type df: pd.DataFrame :param sigma_floor: Known read-noise floor per label, e.g. from ``tit.bg_noise``. :type sigma_floor: dict[str, float] :returns: Per-label proportional error estimate ``alpha`` (non-negative). :rtype: dict[str, float] .. rubric:: Examples >>> import numpy as np, pandas as pd >>> rng = np.random.default_rng(0) >>> y_pred = np.linspace(50, 500, 200) >>> floor, true_alpha = 5.0, 0.02 >>> sigma = np.sqrt(floor**2 + (true_alpha * y_pred) ** 2) >>> resid = sigma * rng.standard_normal(200) >>> df = pd.DataFrame({"label": "y1", "resid_raw": resid, "predicted": y_pred}) >>> alpha = fit_rel_error_from_residuals(df, sigma_floor={"y1": floor}) >>> round(alpha["y1"], 2) # should be close to true_alpha=0.02 0.02 .. py:function:: fit_noise_model_from_residuals(df, rel_error = 0.003) Fit per-label noise model parameters from first-pass residuals. With ``rel_error`` fixed, the noise equation becomes linear in two unknowns. Rearranging: ``r_i^2 - (rel_error * y_i)^2 = sigma_read^2 + gain * y_i`` → OLS on the adjusted residuals. :param df: DataFrame with columns ``label``, ``resid_raw``, ``y`` from a first-pass fit. :type df: pd.DataFrame :param rel_error: Fixed proportional error (default 0.003). :type rel_error: float, optional :returns: ``(sigma_floor_dict, gain_dict)`` per label (non-negative, clamped). :rtype: tuple[dict[str, float], dict[str, float]] .. py:function:: fit_gain_and_rel_error_from_residuals(df, sigma_floor) Fit gain and rel_error per label from residuals with known floor. No-intercept OLS on ``adjusted = gain * y + rel_error^2 * y^2`` where ``adjusted = r^2 - floor^2``. :param df: DataFrame with columns ``label``, ``resid_raw``, ``y``. :type df: pd.DataFrame :param sigma_floor: Known noise floor per label. :type sigma_floor: dict[str, float] :returns: ``(gain_dict, rel_error_dict)`` per label (non-negative, clamped). :rtype: tuple[dict[str, float], dict[str, float]]