Source code for stormlog.gap_analysis

"""Shared hidden-memory gap analysis utilities."""

import logging
from dataclasses import dataclass
from typing import Any, Callable, List, Mapping, Sequence, cast

import numpy as np
from scipy import stats

from .derived_fields import compute_event_fields

try:
    from .phases import PhaseAttribution, PhaseReplayIndex
except ImportError:  # pragma: no cover - phase package may land in another slice
    PhaseAttribution = Any  # type: ignore[assignment,misc]
    PhaseReplayIndex = Any  # type: ignore[assignment,misc]
from .telemetry import TelemetryEventV2

_LOGGER = logging.getLogger(__name__)


[docs] @dataclass class GapFinding: """A classified finding from hidden-memory gap analysis.""" classification: str # 'transient_spike' | 'persistent_drift' | 'fragmentation_like' severity: str # 'info', 'warning', 'critical' confidence: float # 0.0 to 1.0 evidence: dict[str, Any] description: str remediation: List[str] evidence_timestamp_ns: int | None = None phase_attribution: PhaseAttribution | None = None
[docs] def analyze_hidden_memory_gaps( events: Sequence[TelemetryEventV2], thresholds: Mapping[str, float], format_memory: Callable[[int], str], remediation_by_classification: Mapping[str, List[str]], phase_resolver: PhaseReplayIndex | None = None, ) -> List[GapFinding]: """Classify allocator-vs-device hidden memory gaps over time.""" gaps: List[float] = [] normalized: List[float] = [] timestamps_ns: List[int] = [] usable_events: List[TelemetryEventV2] = [] for event in events: if str(event.event_type).strip().lower() != "sample": continue if event.device_total_bytes is None or event.device_total_bytes <= 0: continue gap = event.device_used_bytes - event.allocator_reserved_bytes gaps.append(float(gap)) normalized.append(gap / event.device_total_bytes) timestamps_ns.append(event.timestamp_ns) usable_events.append(event) if len(gaps) < 3: return [] threshold = thresholds["gap_ratio_threshold"] if max(abs(value) for value in normalized) < threshold: return [] _LOGGER.debug( "Running hidden-memory gap classifiers on %d samples", len(usable_events) ) findings: List[GapFinding] = [] findings.extend( _detect_gap_transient_spikes( events=usable_events, gaps=gaps, z_threshold=thresholds["gap_spike_zscore"], remediation=remediation_by_classification.get("transient_spike", []), phase_resolver=phase_resolver, ) ) findings.extend( _detect_gap_persistent_drift( events=usable_events, gaps=gaps, timestamps_ns=timestamps_ns, drift_r_squared_threshold=thresholds["gap_drift_r_squared"], format_memory=format_memory, remediation=remediation_by_classification.get("persistent_drift", []), phase_resolver=phase_resolver, ) ) findings.extend( _detect_gap_fragmentation_pattern( events=usable_events, gaps=gaps, fragmentation_threshold=thresholds["gap_fragmentation_ratio"], remediation=remediation_by_classification.get("fragmentation_like", []), phase_resolver=phase_resolver, ) ) severity_order = {"critical": 0, "warning": 1, "info": 2} findings.sort( key=lambda finding: ( severity_order.get(finding.severity, 9), -finding.confidence, ) ) _LOGGER.debug("Hidden-memory gap analysis produced %d finding(s)", len(findings)) return findings
def _detect_gap_transient_spikes( events: Sequence[TelemetryEventV2], gaps: List[float], z_threshold: float, remediation: List[str], phase_resolver: PhaseReplayIndex | None, ) -> List[GapFinding]: """Detect transient spikes in the gap series using z-score.""" arr = np.asarray(gaps, dtype=float) mean = float(np.mean(arr)) std = float(np.std(arr, ddof=1)) if std == 0 or np.isnan(std): return [] spike_indices = [ index for index, gap_value in enumerate(arr) if (gap_value - mean) / std > z_threshold ] if not spike_indices: return [] max_idx = int(np.argmax(arr)) max_gap = float(arr[max_idx]) max_z = (max_gap - mean) / std phase_event = events[max_idx] severity = "critical" if max_z > 2 * z_threshold else "warning" confidence = min(1.0, max_z / (3 * z_threshold)) return [ GapFinding( classification="transient_spike", severity=severity, confidence=round(confidence, 3), evidence={ "spike_count": len(spike_indices), "max_gap_bytes": max_gap, "max_zscore": round(max_z, 3), "mean_gap_bytes": round(mean, 1), "std_gap_bytes": round(std, 1), }, description=( f"Detected {len(spike_indices)} transient spike(s) in the " f"device-vs-allocator gap (max z-score {max_z:.1f})." ), remediation=list(remediation), evidence_timestamp_ns=phase_event.timestamp_ns, phase_attribution=_resolve_phase_attribution(phase_resolver, phase_event), ) ] def _detect_gap_persistent_drift( events: Sequence[TelemetryEventV2], gaps: List[float], timestamps_ns: List[int], drift_r_squared_threshold: float, format_memory: Callable[[int], str], remediation: List[str], phase_resolver: PhaseReplayIndex | None, ) -> List[GapFinding]: """Detect persistent upward drift in the gap via linear regression.""" if len(gaps) < 5: return [] x = np.asarray(timestamps_ns, dtype=float) x = x - x[0] # relative time y = np.asarray(gaps, dtype=float) slope, intercept, r_value, p_value, std_err = stats.linregress(x, y) r_squared = r_value**2 if slope <= 0 or r_squared < drift_r_squared_threshold: return [] severity = "critical" if r_squared > 0.85 else "warning" confidence = round(min(1.0, r_squared), 3) slope_bytes_per_sec = slope * 1e9 phase_event = events[-1] return [ GapFinding( classification="persistent_drift", severity=severity, confidence=confidence, evidence={ "slope_bytes_per_sec": round(slope_bytes_per_sec, 1), "r_squared": round(r_squared, 4), "p_value": round(p_value, 6), "gap_start_bytes": round(float(y[0]), 1), "gap_end_bytes": round(float(y[-1]), 1), }, description=( f"Device-vs-allocator gap is drifting upward at " f"{format_memory(int(abs(slope_bytes_per_sec)))}/s " f"(R²={r_squared:.2f})." ), remediation=list(remediation), evidence_timestamp_ns=phase_event.timestamp_ns, phase_attribution=_resolve_phase_attribution(phase_resolver, phase_event), ) ] def _detect_gap_fragmentation_pattern( events: Sequence[TelemetryEventV2], gaps: List[float], fragmentation_threshold: float, remediation: List[str], phase_resolver: PhaseReplayIndex | None, ) -> List[GapFinding]: """Detect fragmentation-like behaviour: high reserved-allocated ratio.""" frag_ratios: List[float] = [] frag_events: List[TelemetryEventV2] = [] for event in events: derived = compute_event_fields(event) frag = derived.get("fragmentation_ratio") if frag is not None: frag_ratios.append(frag) frag_events.append(event) if not frag_ratios: return [] avg_frag = float(np.mean(frag_ratios)) max_frag = float(np.max(frag_ratios)) if avg_frag < fragmentation_threshold: return [] severity = "critical" if avg_frag > 0.5 else "warning" confidence = round(min(1.0, avg_frag / 0.6), 3) phase_event = frag_events[int(np.argmax(frag_ratios))] return [ GapFinding( classification="fragmentation_like", severity=severity, confidence=confidence, evidence={ "avg_fragmentation_ratio": round(avg_frag, 4), "max_fragmentation_ratio": round(max_frag, 4), "avg_gap_bytes": round(float(np.mean(gaps)), 1), "sample_count": len(frag_ratios), }, description=( f"Allocator fragmentation averaging {avg_frag:.0%} suggests " f"reserved-but-unused memory is inflating the device footprint." ), remediation=list(remediation), evidence_timestamp_ns=phase_event.timestamp_ns, phase_attribution=_resolve_phase_attribution(phase_resolver, phase_event), ) ] def _resolve_phase_attribution( phase_resolver: PhaseReplayIndex | None, event: TelemetryEventV2, ) -> PhaseAttribution | None: if phase_resolver is None: return None if not hasattr(phase_resolver, "resolve_for_event"): return None return cast("PhaseAttribution | None", phase_resolver.resolve_for_event(event))