"""Local query API for Stormlog artifact directories and telemetry files."""
from __future__ import annotations
import builtins
import csv
import json
from collections import defaultdict
from collections.abc import Iterable, Mapping, Sequence
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Literal, cast
from .gap_analysis import analyze_hidden_memory_gaps
from .issues import (
ISSUE_STATE_OPEN,
IssueEvidenceLink,
IssueFingerprint,
IssueKind,
IssueState,
StormlogIssue,
categorize_alert_context,
normalize_issue_state,
normalize_text_dimension,
normalized_error_stem,
)
from .phases import (
PhaseReplayIndex,
phase_attribution_to_payload,
summarize_phase_attribution,
)
from .session import (
SESSION_STATUS_INTERRUPTED,
SessionSummary,
infer_session_summary_from_events,
session_summary_from_dict,
session_summary_to_dict,
stable_legacy_session_id,
)
from .telemetry import (
LoadedTelemetrySession,
TelemetryEvent,
TelemetryEventV2,
load_telemetry_sessions,
telemetry_event_from_record,
telemetry_event_to_dict,
)
from .telemetry_sink import (
MANIFEST_FILENAME,
SEGMENT_SUFFIX,
TelemetrySinkManifest,
read_telemetry_sink_manifest,
resolve_telemetry_sink_segment_paths,
)
from .utils import format_bytes
SourceKind = Literal[
"sink",
"telemetry_json",
"telemetry_jsonl",
"telemetry_csv",
"diagnose_bundle",
"oom_bundle",
]
SummaryMetric = Literal[
"session_count_by_status",
"peak_allocator_allocated_bytes",
"peak_allocator_reserved_bytes",
"peak_device_used_bytes",
"alert_count",
"collector_degradation_transitions",
"interrupted_sessions_with_oom_bundles",
"hidden_memory_gap_growth",
]
SummaryGroupBy = Literal["session", "session-rank", "rank", "status"]
_ALERT_EVENT_TYPES = frozenset({"warning", "critical", "error"})
_COLLECTOR_TRANSITION_TYPES = frozenset({"collector_degraded", "collector_recovered"})
_TELEMETRY_FILE_NAME_PARTS = ("event", "events", "track", "telemetry")
_COLLECTOR_DEGRADED_STATUSES = frozenset({"degraded", "unhealthy"})
_GAP_ANALYSIS_THRESHOLDS = {
"gap_ratio_threshold": 0.05,
"gap_spike_zscore": 2.0,
"gap_drift_r_squared": 0.6,
"gap_fragmentation_ratio": 0.3,
}
_GAP_REMEDIATION_BY_CLASSIFICATION: Mapping[str, list[str]] = {}
_SEVERITY_RANK = {"critical": 0, "error": 0, "warning": 1, "info": 2}
[docs]
@dataclass(frozen=True)
class CatalogWarning:
"""Non-fatal discovery or loading warning."""
path: str
message: str
[docs]
def as_dict(self) -> dict[str, Any]:
return {"path": self.path, "message": self.message}
[docs]
@dataclass(frozen=True)
class CatalogSource:
"""One discovered source of queryable artifact data."""
path: Path
source_kind: SourceKind
event_paths: tuple[Path, ...] = ()
manifest_path: Path | None = None
[docs]
def as_dict(self) -> dict[str, Any]:
return {
"path": str(self.path),
"source_kind": self.source_kind,
"event_paths": [str(path) for path in self.event_paths],
"manifest_path": (
str(self.manifest_path) if self.manifest_path is not None else None
),
}
[docs]
@dataclass(frozen=True)
class CatalogOOMBundle:
"""Manifest-backed OOM dump bundle discovered during cataloging."""
bundle_path: Path
manifest_path: Path
created_at_utc: str | None
backend: str | None
reason: str | None
event_count: int | None
session_id: str | None
session_status: str | None
exception_type: str | None = None
exception_module: str | None = None
[docs]
def as_dict(self) -> dict[str, Any]:
return {
"bundle_path": str(self.bundle_path),
"manifest_path": str(self.manifest_path),
"created_at_utc": self.created_at_utc,
"backend": self.backend,
"reason": self.reason,
"event_count": self.event_count,
"session_id": self.session_id,
"session_status": self.session_status,
"exception_type": self.exception_type,
"exception_module": self.exception_module,
}
[docs]
@dataclass(frozen=True)
class SessionFilter:
"""Filters for catalog/session rows."""
session_id: str | None = None
status: str | None = None
job_id: str | None = None
rank: int | None = None
world_size: int | None = None
has_oom_bundle: bool | None = None
source_kind: str | None = None
[docs]
@dataclass(frozen=True)
class EventFilter:
"""Filters for canonical telemetry event rows."""
session_id: str | None = None
event_type: str | None = None
rank: int | None = None
collector: str | None = None
status: str | None = None
time_start_ns: int | None = None
time_end_ns: int | None = None
has_alert: bool | None = None
collector_health_status: str | None = None
backend: str | None = None
limit: int | None = None
[docs]
@dataclass(frozen=True)
class OOMBundleFilter:
"""Filters for OOM bundle rows."""
session_id: str | None = None
backend: str | None = None
reason: str | None = None
created_after: str | None = None
created_before: str | None = None
[docs]
@dataclass(frozen=True)
class IssueFilter:
"""Filters for grouped issue rows."""
fingerprint_id: str | None = None
kind: IssueKind | None = None
state: IssueState | None = None
severity: str | None = None
session_id: str | None = None
[docs]
@dataclass(frozen=True)
class SessionRow:
"""Query row describing one loaded or manifest-backed session."""
session_id: str
status: str
started_at_ns: int
ended_at_ns: int | None
host: str
pid: int
job_id: str | None
rank: int
local_rank: int
world_size: int
source: str
source_path: str
source_kind: str
source_count: int
warning_count: int
event_count: int | None
oom_bundle_count: int
[docs]
def as_dict(self) -> dict[str, Any]:
return {
"session_id": self.session_id,
"status": self.status,
"started_at_ns": self.started_at_ns,
"ended_at_ns": self.ended_at_ns,
"host": self.host,
"pid": self.pid,
"job_id": self.job_id,
"rank": self.rank,
"local_rank": self.local_rank,
"world_size": self.world_size,
"source": self.source,
"source_path": self.source_path,
"source_kind": self.source_kind,
"source_count": self.source_count,
"warning_count": self.warning_count,
"event_count": self.event_count,
"oom_bundle_count": self.oom_bundle_count,
}
[docs]
@dataclass(frozen=True)
class EventRow:
"""Query row wrapping a canonical telemetry event and provenance."""
event: TelemetryEvent
source_path: str
source_kind: str
session_status: str
[docs]
def as_dict(self) -> dict[str, Any]:
row = telemetry_event_to_dict(self.event)
row["source_path"] = self.source_path
row["source_kind"] = self.source_kind
row["session_status"] = self.session_status
return row
[docs]
@dataclass(frozen=True)
class OOMBundleRow:
"""Query row for one OOM bundle manifest."""
bundle_path: str
created_at_utc: str | None
backend: str | None
reason: str | None
event_count: int | None
session_id: str | None
session_status: str | None
exception_type: str | None
exception_module: str | None
[docs]
def as_dict(self) -> dict[str, Any]:
return {
"bundle_path": self.bundle_path,
"created_at_utc": self.created_at_utc,
"backend": self.backend,
"reason": self.reason,
"event_count": self.event_count,
"session_id": self.session_id,
"session_status": self.session_status,
"exception_type": self.exception_type,
"exception_module": self.exception_module,
}
[docs]
@dataclass(frozen=True)
class SummaryRow:
"""Built-in summary result row."""
metric: str
group_by: str
value: int | float | str | None
session_id: str | None = None
rank: int | None = None
status: str | None = None
details: Mapping[str, Any] = field(default_factory=dict)
[docs]
def as_dict(self) -> dict[str, Any]:
row = {
"metric": self.metric,
"group_by": self.group_by,
"session_id": self.session_id,
"rank": self.rank,
"status": self.status,
"value": self.value,
}
row.update(dict(self.details))
return row
[docs]
class ArtifactCatalog:
"""Manifest-first catalog of local Stormlog artifacts."""
def __init__(self, paths: Sequence[str | Path]) -> None:
self.paths = tuple(Path(path) for path in paths)
self.sources: list[CatalogSource] = []
self.oom_bundles: list[CatalogOOMBundle] = []
self.warnings: list[CatalogWarning] = []
self._source_keys: set[tuple[Path, SourceKind]] = set()
self._oom_bundle_paths: set[Path] = set()
self._covered_event_paths: set[Path] = set()
self._discover()
[docs]
def as_dict(self) -> dict[str, Any]:
return {
"paths": [str(path) for path in self.paths],
"sources": [source.as_dict() for source in self.sources],
"oom_bundles": [bundle.as_dict() for bundle in self.oom_bundles],
"warnings": [warning.as_dict() for warning in self.warnings],
}
def _discover(self) -> None:
for path in self.paths:
self._discover_path(path)
def _discover_path(self, path: Path) -> None:
if not path.exists():
self._warn(path, "path does not exist")
return
if path.is_file():
self._discover_file(path)
return
if not path.is_dir():
self._warn(path, "path is neither a file nor a directory")
return
self._discover_directory(path)
def _discover_directory(self, directory: Path) -> None:
manifest_path = directory / MANIFEST_FILENAME
manifest_payload = _read_json_object(manifest_path)
if manifest_payload is not None:
if _is_oom_manifest(manifest_payload):
self._add_oom_bundle(directory, manifest_path, manifest_payload)
return
if _is_diagnose_manifest(manifest_payload):
self._add_source(
CatalogSource(
path=directory,
source_kind="diagnose_bundle",
manifest_path=manifest_path,
)
)
if _is_sink_manifest(manifest_payload):
segment_paths = tuple(resolve_telemetry_sink_segment_paths(directory))
self._covered_event_paths.update(segment_paths)
self._add_source(
CatalogSource(
path=directory,
source_kind="sink",
event_paths=segment_paths,
manifest_path=manifest_path,
)
)
for nested_manifest in sorted(directory.rglob(MANIFEST_FILENAME)):
if nested_manifest == manifest_path:
continue
payload = _read_json_object(nested_manifest)
if payload is None:
continue
parent = nested_manifest.parent
if _is_oom_manifest(payload):
self._add_oom_bundle(parent, nested_manifest, payload)
elif _is_diagnose_manifest(payload):
self._add_source(
CatalogSource(
path=parent,
source_kind="diagnose_bundle",
manifest_path=nested_manifest,
)
)
elif _is_sink_manifest(payload):
segment_paths = tuple(resolve_telemetry_sink_segment_paths(parent))
self._covered_event_paths.update(segment_paths)
self._add_source(
CatalogSource(
path=parent,
source_kind="sink",
event_paths=segment_paths,
manifest_path=nested_manifest,
)
)
for candidate in self._discover_candidate_files(directory):
if candidate in self._covered_event_paths:
continue
self._discover_file(candidate)
def _discover_file(self, path: Path) -> None:
if path.name == MANIFEST_FILENAME:
payload = _read_json_object(path)
if payload is None:
self._warn(path, "manifest is not a JSON object")
return
if _is_oom_manifest(payload):
self._add_oom_bundle(path.parent, path, payload)
return
if _is_diagnose_manifest(payload):
self._add_source(
CatalogSource(
path=path.parent,
source_kind="diagnose_bundle",
manifest_path=path,
)
)
return
if _is_sink_manifest(payload):
segment_paths = tuple(resolve_telemetry_sink_segment_paths(path.parent))
self._covered_event_paths.update(segment_paths)
self._add_source(
CatalogSource(
path=path.parent,
source_kind="sink",
event_paths=segment_paths,
manifest_path=path,
)
)
return
self._warn(path, "unrecognized manifest shape")
return
suffix = path.suffix.lower()
if suffix == SEGMENT_SUFFIX:
self._add_source(
CatalogSource(
path=path,
source_kind="telemetry_jsonl",
event_paths=(path,),
)
)
elif suffix == ".json":
self._add_source(
CatalogSource(
path=path,
source_kind="telemetry_json",
event_paths=(path,),
)
)
elif suffix == ".csv":
self._add_source(
CatalogSource(
path=path,
source_kind="telemetry_csv",
event_paths=(path,),
)
)
def _discover_candidate_files(self, directory: Path) -> list[Path]:
candidates: set[Path] = set()
for suffix in ("*.json", "*.jsonl", "*.csv"):
for path in directory.rglob(suffix):
if not path.is_file() or path.name == MANIFEST_FILENAME:
continue
if self._is_inside_discovered_bundle(path):
continue
if path.suffix.lower() == SEGMENT_SUFFIX:
candidates.add(path)
continue
lowered = path.name.lower()
if any(part in lowered for part in _TELEMETRY_FILE_NAME_PARTS):
candidates.add(path)
return sorted(candidates)
def _is_inside_discovered_bundle(self, path: Path) -> bool:
return any(
bundle_path in path.parents for bundle_path in self._oom_bundle_paths
)
def _add_source(self, source: CatalogSource) -> None:
key = (source.path.resolve(), source.source_kind)
if key in self._source_keys:
return
self._source_keys.add(key)
self.sources.append(source)
def _add_oom_bundle(
self,
bundle_path: Path,
manifest_path: Path,
payload: Mapping[str, Any],
) -> None:
resolved = bundle_path.resolve()
if resolved in self._oom_bundle_paths:
return
self._oom_bundle_paths.add(resolved)
metadata = _read_json_object(bundle_path / "metadata.json") or {}
self.oom_bundles.append(
CatalogOOMBundle(
bundle_path=bundle_path,
manifest_path=manifest_path,
created_at_utc=_string_or_none(payload.get("created_at_utc")),
backend=_string_or_none(payload.get("backend")),
reason=_string_or_none(payload.get("reason")),
event_count=_int_or_none(payload.get("event_count")),
session_id=_string_or_none(payload.get("session_id")),
session_status=_string_or_none(payload.get("session_status")),
exception_type=_string_or_none(metadata.get("exception_type")),
exception_module=_string_or_none(metadata.get("exception_module")),
)
)
def _warn(self, path: Path, message: str) -> None:
self.warnings.append(CatalogWarning(path=str(path), message=message))
[docs]
class QueryStore:
"""Reusable local query surface over Stormlog artifacts."""
def __init__(self, catalog: ArtifactCatalog) -> None:
self.catalog = catalog
self._loaded_sessions_by_source: dict[
tuple[Path, SourceKind], list[LoadedTelemetrySession]
] = {}
[docs]
def list_sessions(self, filters: SessionFilter | None = None) -> list[SessionRow]:
"""Return session rows from manifest metadata or loaded flat files."""
filters = filters or SessionFilter()
oom_counts = _count_oom_bundles_by_session(self.catalog.oom_bundles)
rows: list[SessionRow] = []
seen: set[tuple[str, str, str]] = set()
for source in self.catalog.sources:
for row in self._session_rows_for_source(source, oom_counts):
key = (row.session_id, row.source_path, row.source_kind)
if key in seen:
continue
seen.add(key)
if _session_matches(row, filters):
rows.append(row)
rows.sort(key=lambda row: (row.started_at_ns, row.session_id), reverse=True)
return rows
[docs]
def query_events(self, filters: EventFilter | None = None) -> list[EventRow]:
"""Return filtered canonical telemetry event rows."""
filters = filters or EventFilter()
rows: list[EventRow] = []
for source in self.catalog.sources:
if source.source_kind in {"diagnose_bundle", "oom_bundle"}:
continue
for loaded in self._load_sessions_for_source(source):
summary = loaded.summary
if filters.session_id is not None and (
summary.session_id != filters.session_id
):
continue
if filters.status is not None and summary.status != filters.status:
continue
for event in loaded.events:
row = EventRow(
event=event,
source_path=_event_source_path(loaded, source),
source_kind=source.source_kind,
session_status=summary.status,
)
if _event_matches(row, filters):
rows.append(row)
rows.sort(key=lambda row: (row.event.timestamp_ns, row.event.session_id))
if filters.limit is not None:
return rows[: filters.limit]
return rows
[docs]
def list_oom_bundles(
self,
filters: OOMBundleFilter | None = None,
) -> list[OOMBundleRow]:
"""Return filtered OOM bundle rows."""
filters = filters or OOMBundleFilter()
session_status_by_id = self._manifest_session_status_by_id()
rows = [
OOMBundleRow(
bundle_path=str(bundle.bundle_path),
created_at_utc=bundle.created_at_utc,
backend=bundle.backend,
reason=bundle.reason,
event_count=bundle.event_count,
session_id=bundle.session_id,
session_status=(
bundle.session_status
or (
session_status_by_id.get(bundle.session_id)
if bundle.session_id is not None
else None
)
),
exception_type=bundle.exception_type,
exception_module=bundle.exception_module,
)
for bundle in self.catalog.oom_bundles
]
rows = [row for row in rows if _oom_matches(row, filters)]
rows.sort(key=lambda row: (row.created_at_utc or "", row.bundle_path))
return rows
[docs]
def list_issues(
self,
filters: IssueFilter | None = None,
*,
state_overrides: Mapping[str, str] | None = None,
) -> list[StormlogIssue]:
"""Return grouped issues derived from discovered artifacts."""
filters = filters or IssueFilter()
state_by_fingerprint = {
fingerprint_id: normalize_issue_state(state)
for fingerprint_id, state in (state_overrides or {}).items()
}
accumulator: dict[str, _IssueAccumulator] = {}
for oom_row in self.list_oom_bundles():
_accumulate_oom_bundle_issue(accumulator, oom_row)
event_rows = self.query_events(EventFilter())
for event_row in event_rows:
if _is_oom_event(event_row.event):
_accumulate_oom_event_issue(accumulator, event_row)
elif _is_collector_degradation_event(event_row.event):
_accumulate_collector_issue(accumulator, event_row)
elif _is_alert_event(event_row.event):
_accumulate_alert_issue(accumulator, event_row)
for source in self.catalog.sources:
for loaded in self._load_sessions_for_source(source):
_accumulate_hidden_memory_issues(accumulator, loaded, source)
issues = [
item.to_issue(
state=state_by_fingerprint.get(
item.fingerprint.fingerprint_id,
ISSUE_STATE_OPEN,
)
)
for item in accumulator.values()
]
issues = [issue for issue in issues if _issue_matches(issue, filters)]
issues.sort(key=_issue_sort_key)
return issues
def _manifest_session_status_by_id(self) -> dict[str, str]:
statuses: dict[str, str] = {}
for source in self.catalog.sources:
if source.source_kind == "sink":
manifest = read_telemetry_sink_manifest(source.path)
if manifest is None:
continue
for summary in manifest.sessions:
statuses.setdefault(summary.session_id, summary.status)
elif source.source_kind == "diagnose_bundle":
diagnose_summary = _diagnose_session_summary(source.manifest_path)
if diagnose_summary is not None:
statuses.setdefault(
diagnose_summary.session_id,
diagnose_summary.status,
)
return statuses
[docs]
def summarize(
self,
metric: SummaryMetric,
*,
group_by: SummaryGroupBy | None = None,
) -> list[SummaryRow]:
"""Run one built-in summary query."""
if metric == "session_count_by_status":
return self._summarize_session_count_by_status()
if metric == "interrupted_sessions_with_oom_bundles":
return self._summarize_interrupted_sessions_with_oom_bundles()
resolved_group_by: SummaryGroupBy = group_by or "session"
events = self.query_events(EventFilter())
if metric in {
"peak_allocator_allocated_bytes",
"peak_allocator_reserved_bytes",
"peak_device_used_bytes",
}:
field_name = {
"peak_allocator_allocated_bytes": "allocator_allocated_bytes",
"peak_allocator_reserved_bytes": "allocator_reserved_bytes",
"peak_device_used_bytes": "device_used_bytes",
}[metric]
return _summarize_peak(events, metric, field_name, resolved_group_by)
if metric == "alert_count":
alert_events = [row for row in events if _is_alert_event(row.event)]
return _summarize_count(alert_events, metric, resolved_group_by)
if metric == "collector_degradation_transitions":
transition_events = [
row
for row in events
if row.event.event_type in _COLLECTOR_TRANSITION_TYPES
]
return _summarize_count(transition_events, metric, resolved_group_by)
if metric == "hidden_memory_gap_growth":
return _summarize_hidden_memory_gap_growth(events, resolved_group_by)
raise ValueError(f"unsupported summary metric: {metric}")
def _session_rows_for_source(
self,
source: CatalogSource,
oom_counts: Mapping[str, int],
) -> list[SessionRow]:
if source.source_kind == "sink":
manifest = read_telemetry_sink_manifest(source.path)
if manifest is not None and manifest.sessions:
return [
_session_row_from_manifest(
summary=summary,
manifest=manifest,
source=source,
oom_count=oom_counts.get(summary.session_id, 0),
)
for summary in manifest.sessions
]
if source.source_kind == "diagnose_bundle":
summary = _diagnose_session_summary(source.manifest_path)
if summary is None:
return []
return [
_session_row_from_summary(
summary=summary,
source=source,
source_count=1,
warning_count=0,
event_count=None,
oom_count=oom_counts.get(summary.session_id, 0),
)
]
rows: list[SessionRow] = []
for loaded in self._load_sessions_for_source(source):
rows.append(
_session_row_from_summary(
summary=loaded.summary,
source=source,
source_count=len(loaded.sources_loaded) or 1,
warning_count=len(loaded.warnings),
event_count=len(loaded.events),
oom_count=oom_counts.get(loaded.summary.session_id, 0),
)
)
return rows
def _load_sessions_for_source(
self,
source: CatalogSource,
) -> list[LoadedTelemetrySession]:
key = (source.path.resolve(), source.source_kind)
cached = self._loaded_sessions_by_source.get(key)
if cached is not None:
return cached
try:
if source.source_kind == "telemetry_csv":
loaded = _load_csv_sessions(source.path)
elif source.source_kind in {"sink", "telemetry_json", "telemetry_jsonl"}:
loaded = load_telemetry_sessions(source.path, permissive_legacy=True)
else:
loaded = []
except Exception as exc:
self.catalog.warnings.append(
CatalogWarning(path=str(source.path), message=f"load failed: {exc}")
)
loaded = []
self._loaded_sessions_by_source[key] = loaded
return loaded
def _summarize_session_count_by_status(self) -> list[SummaryRow]:
counts: dict[str, int] = defaultdict(int)
for row in self.list_sessions(SessionFilter()):
counts[row.status] += 1
return [
SummaryRow(
metric="session_count_by_status",
group_by="status",
status=status,
value=count,
)
for status, count in sorted(counts.items())
]
def _summarize_interrupted_sessions_with_oom_bundles(self) -> list[SummaryRow]:
oom_counts = _count_oom_bundles_by_session(self.catalog.oom_bundles)
rows: list[SummaryRow] = []
for session in self.list_sessions(
SessionFilter(
status=SESSION_STATUS_INTERRUPTED,
has_oom_bundle=True,
)
):
rows.append(
SummaryRow(
metric="interrupted_sessions_with_oom_bundles",
group_by="session",
session_id=session.session_id,
status=session.status,
value=oom_counts.get(session.session_id, 0),
)
)
return rows
[docs]
def open(paths: Sequence[str | Path]) -> QueryStore:
"""Open one or more local artifact paths for in-process querying."""
return QueryStore(ArtifactCatalog(paths))
def _read_json_object(path: Path) -> dict[str, Any] | None:
if not path.exists() or not path.is_file():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
return dict(payload) if isinstance(payload, Mapping) else None
def _is_sink_manifest(payload: Mapping[str, Any]) -> bool:
fmt = payload.get("format")
return fmt == "stormlog.append_only_telemetry_sink" or "segments" in payload
def _is_oom_manifest(payload: Mapping[str, Any]) -> bool:
return (
"bundle_name" in payload
and "reason" in payload
and "backend" in payload
and "event_count" in payload
)
def _is_diagnose_manifest(payload: Mapping[str, Any]) -> bool:
return (
"command_line" in payload
and "files" in payload
and "risk_detected" in payload
and "session_id" in payload
)
def _diagnose_session_summary(manifest_path: Path | None) -> SessionSummary | None:
if manifest_path is None:
return None
payload = _read_json_object(manifest_path)
if payload is None:
return None
session_payload = payload.get("session")
if isinstance(session_payload, Mapping):
try:
return session_summary_from_dict(session_payload)
except Exception:
pass
session_id = _string_or_none(payload.get("session_id"))
if session_id is None:
return None
try:
return session_summary_from_dict(
{
"session_id": session_id,
"status": payload.get("session_status", "incomplete"),
"started_at_ns": 0,
"ended_at_ns": None,
"host": "unknown",
"pid": -1,
"job_id": None,
"rank": 0,
"local_rank": 0,
"world_size": 1,
"source": "stormlog.diagnose",
}
)
except Exception:
return None
def _session_row_from_manifest(
*,
summary: SessionSummary,
manifest: TelemetrySinkManifest,
source: CatalogSource,
oom_count: int,
) -> SessionRow:
session_segments = [
segment
for segment in manifest.segments
if segment.session_id == summary.session_id
]
event_count = sum(segment.event_count for segment in session_segments)
return _session_row_from_summary(
summary=summary,
source=source,
source_count=len(session_segments),
warning_count=0,
event_count=event_count,
oom_count=oom_count,
)
def _session_row_from_summary(
*,
summary: SessionSummary,
source: CatalogSource,
source_count: int,
warning_count: int,
event_count: int | None,
oom_count: int,
) -> SessionRow:
payload = session_summary_to_dict(summary)
return SessionRow(
session_id=str(payload["session_id"]),
status=str(payload["status"]),
started_at_ns=int(payload["started_at_ns"]),
ended_at_ns=(
int(payload["ended_at_ns"])
if payload.get("ended_at_ns") is not None
else None
),
host=str(payload["host"]),
pid=int(payload["pid"]),
job_id=_string_or_none(payload.get("job_id")),
rank=int(payload["rank"]),
local_rank=int(payload["local_rank"]),
world_size=int(payload["world_size"]),
source=str(payload["source"]),
source_path=str(source.path),
source_kind=source.source_kind,
source_count=source_count,
warning_count=warning_count,
event_count=event_count,
oom_bundle_count=oom_count,
)
def _load_csv_sessions(path: Path) -> list[LoadedTelemetrySession]:
warnings: list[str] = []
events: list[TelemetryEvent] = []
default_session_id = stable_legacy_session_id(str(path.resolve()), "csv")
with builtins.open(path, "r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle)
for line_number, row in enumerate(reader, start=2):
try:
event = telemetry_event_from_record(
_normalize_csv_record(row),
permissive_legacy=True,
default_collector="legacy.csv",
default_sampling_interval_ms=0,
default_session_id=default_session_id,
)
events.append(event)
except Exception as exc:
warnings.append(f"CSV parse error {path}:{line_number}: {exc}")
grouped: dict[str, list[TelemetryEvent]] = defaultdict(list)
for event in events:
grouped[event.session_id].append(event)
sessions: list[LoadedTelemetrySession] = []
for session_id, session_events in grouped.items():
session_events.sort(key=lambda event: event.timestamp_ns)
summary = infer_session_summary_from_events(
session_id=session_id,
events=session_events,
source=f"artifact:{path.name or path.resolve()}",
)
sessions.append(
LoadedTelemetrySession(
summary=summary,
events=session_events,
sources_loaded=[str(path.resolve())],
warnings=warnings,
)
)
return sessions
def _normalize_csv_record(row: Mapping[str, str]) -> dict[str, Any]:
int_fields = {
"schema_version",
"timestamp_ns",
"sampling_interval_ms",
"pid",
"rank",
"local_rank",
"world_size",
"device_id",
"allocator_allocated_bytes",
"allocator_reserved_bytes",
"allocator_active_bytes",
"allocator_inactive_bytes",
"allocator_change_bytes",
"device_used_bytes",
"device_free_bytes",
"device_total_bytes",
"memory_allocated",
"memory_reserved",
"memory_change",
"total_memory",
}
float_fields = {"timestamp"}
normalized: dict[str, Any] = {}
for key, raw_value in row.items():
value: Any = raw_value.strip() if isinstance(raw_value, str) else raw_value
if value == "":
normalized[key] = None
elif key == "metadata" and isinstance(value, str):
try:
parsed = json.loads(value)
except json.JSONDecodeError:
parsed = {}
normalized[key] = parsed if isinstance(parsed, dict) else {}
elif key in int_fields:
text_value = str(value).strip()
try:
normalized[key] = int(text_value)
except ValueError:
normalized[key] = int(float(text_value))
elif key in float_fields:
normalized[key] = float(value)
else:
normalized[key] = value
return normalized
def _count_oom_bundles_by_session(
bundles: Iterable[CatalogOOMBundle],
) -> dict[str, int]:
counts: dict[str, int] = defaultdict(int)
for bundle in bundles:
if bundle.session_id is not None:
counts[bundle.session_id] += 1
return counts
def _session_matches(row: SessionRow, filters: SessionFilter) -> bool:
if filters.session_id is not None and row.session_id != filters.session_id:
return False
if filters.status is not None and row.status != filters.status:
return False
if filters.job_id is not None and row.job_id != filters.job_id:
return False
if filters.rank is not None and row.rank != filters.rank:
return False
if filters.world_size is not None and row.world_size != filters.world_size:
return False
if filters.has_oom_bundle is not None and (
(row.oom_bundle_count > 0) != filters.has_oom_bundle
):
return False
if filters.source_kind is not None and row.source_kind != filters.source_kind:
return False
return True
def _event_matches(row: EventRow, filters: EventFilter) -> bool:
event = row.event
if filters.event_type is not None and event.event_type != filters.event_type:
return False
if filters.rank is not None and event.rank != filters.rank:
return False
if filters.collector is not None and event.collector != filters.collector:
return False
if filters.time_start_ns is not None and event.timestamp_ns < filters.time_start_ns:
return False
if filters.time_end_ns is not None and event.timestamp_ns > filters.time_end_ns:
return False
if filters.has_alert is not None and (_is_alert_event(event) != filters.has_alert):
return False
metadata = event.metadata
if filters.collector_health_status is not None and (
metadata.get("collector_health_status") != filters.collector_health_status
):
return False
if filters.backend is not None and metadata.get("backend") != filters.backend:
return False
return True
def _oom_matches(row: OOMBundleRow, filters: OOMBundleFilter) -> bool:
if filters.session_id is not None and row.session_id != filters.session_id:
return False
if filters.backend is not None and row.backend != filters.backend:
return False
if filters.reason is not None and row.reason != filters.reason:
return False
created = _parse_datetime(row.created_at_utc)
after = _parse_datetime(filters.created_after)
before = _parse_datetime(filters.created_before)
if after is not None and (created is None or created < after):
return False
if before is not None and (created is None or created > before):
return False
return True
def _parse_datetime(value: str | None) -> datetime | None:
if not value:
return None
text = value.strip()
if text.endswith("Z"):
text = f"{text[:-1]}+00:00"
try:
parsed = datetime.fromisoformat(text)
except ValueError:
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _datetime_to_ns(value: datetime | None) -> int | None:
if value is None:
return None
return int(value.timestamp() * 1_000_000_000)
def _is_alert_event(event: TelemetryEvent) -> bool:
if event.event_type in _ALERT_EVENT_TYPES:
return True
severity = event.metadata.get("severity")
return severity in {"warning", "critical", "error"}
def _is_oom_event(event: TelemetryEvent) -> bool:
if event.event_type != "error":
return False
metadata = event.metadata
return any(key in metadata for key in ("oom_reason", "oom_dump_path"))
def _is_collector_degradation_event(event: TelemetryEvent) -> bool:
metadata = event.metadata
health_status = normalize_text_dimension(metadata.get("collector_health_status"))
return (
event.event_type == "collector_degraded"
or health_status in _COLLECTOR_DEGRADED_STATUSES
)
def _event_severity(event: TelemetryEvent) -> str:
metadata_severity = event.metadata.get("severity")
if isinstance(metadata_severity, str) and metadata_severity.strip():
return normalize_text_dimension(metadata_severity)
if event.event_type in {"critical", "error"}:
return "critical"
if event.event_type == "warning":
return "warning"
return "info"
def _event_backend(event: TelemetryEvent) -> str:
return normalize_text_dimension(event.metadata.get("backend"))
def _event_source_path(loaded: LoadedTelemetrySession, source: CatalogSource) -> str:
if loaded.sources_loaded:
return loaded.sources_loaded[0]
return str(source.path)
def _summary_group_key(
row: EventRow,
group_by: SummaryGroupBy,
) -> tuple[str | None, int | None, str | None]:
if group_by == "rank":
return None, row.event.rank, None
if group_by == "session-rank":
return row.event.session_id, row.event.rank, None
if group_by == "status":
return None, None, row.session_status
return row.event.session_id, None, None
@dataclass
class _IssueAccumulator:
fingerprint: IssueFingerprint
title: str
severity: str
details: dict[str, Any] = field(default_factory=dict)
evidence: list[IssueEvidenceLink] = field(default_factory=list)
affected_sessions: set[str] = field(default_factory=set)
first_seen_ns: int | None = None
last_seen_ns: int | None = None
def add(
self,
*,
evidence: IssueEvidenceLink,
seen_ns: int | None,
session_id: str | None,
severity: str | None = None,
details: Mapping[str, Any] | None = None,
) -> None:
"""Add one evidence hit to this accumulator."""
self.evidence.append(evidence)
if session_id is not None:
self.affected_sessions.add(session_id)
if seen_ns is not None:
if self.first_seen_ns is None or seen_ns < self.first_seen_ns:
self.first_seen_ns = seen_ns
if self.last_seen_ns is None or seen_ns > self.last_seen_ns:
self.last_seen_ns = seen_ns
if severity is not None:
self.severity = _max_severity(self.severity, severity)
if details:
self.details.update(details)
def to_issue(self, *, state: IssueState) -> StormlogIssue:
"""Build an immutable issue row from accumulated hits."""
representative = self.evidence[0]
return StormlogIssue(
fingerprint=self.fingerprint,
title=self.title,
state=state,
severity=self.severity,
hit_count=len(self.evidence),
first_seen_ns=self.first_seen_ns,
last_seen_ns=self.last_seen_ns,
affected_sessions=tuple(self.affected_sessions),
representative_evidence=representative,
evidence=tuple(self.evidence),
details=dict(self.details),
)
def _accumulate_issue(
accumulator: dict[str, _IssueAccumulator],
*,
fingerprint: IssueFingerprint,
title: str,
severity: str,
evidence: IssueEvidenceLink,
seen_ns: int | None,
session_id: str | None,
details: Mapping[str, Any] | None = None,
) -> None:
fingerprint_id = fingerprint.fingerprint_id
issue = accumulator.get(fingerprint_id)
if issue is None:
issue = _IssueAccumulator(
fingerprint=fingerprint,
title=title,
severity=severity,
details=dict(details or {}),
)
accumulator[fingerprint_id] = issue
issue.add(
evidence=evidence,
seen_ns=seen_ns,
session_id=session_id,
severity=severity,
details=details,
)
def _accumulate_oom_bundle_issue(
accumulator: dict[str, _IssueAccumulator],
row: OOMBundleRow,
) -> None:
fingerprint = IssueFingerprint(
kind="oom",
dimensions={
"backend": row.backend,
"reason": row.reason,
},
)
seen_ns = _datetime_to_ns(_parse_datetime(row.created_at_utc))
evidence = IssueEvidenceLink(
session_id=row.session_id,
timestamp_ns=seen_ns,
source_path=row.bundle_path,
source_kind="oom_bundle",
bundle_path=row.bundle_path,
metadata={
"created_at_utc": row.created_at_utc,
"event_count": row.event_count,
"session_status": row.session_status,
},
)
_accumulate_issue(
accumulator,
fingerprint=fingerprint,
title="OOM captured by flight recorder",
severity="critical",
evidence=evidence,
seen_ns=seen_ns,
session_id=row.session_id,
details={
"backend": row.backend,
"reason": row.reason,
"exception_type": row.exception_type,
"exception_module": row.exception_module,
},
)
def _accumulate_oom_event_issue(
accumulator: dict[str, _IssueAccumulator],
row: EventRow,
) -> None:
event = row.event
metadata = event.metadata
reason = metadata.get("oom_reason")
bundle_path = _string_or_none(metadata.get("oom_dump_path"))
fingerprint = IssueFingerprint(
kind="oom",
dimensions={
"backend": _event_backend(event),
"reason": reason,
},
)
evidence = IssueEvidenceLink(
session_id=event.session_id,
timestamp_ns=event.timestamp_ns,
rank=event.rank,
source_path=row.source_path,
source_kind=row.source_kind,
event_type=event.event_type,
bundle_path=bundle_path,
metadata={"context": event.context, "session_status": row.session_status},
)
_accumulate_issue(
accumulator,
fingerprint=fingerprint,
title="OOM telemetry event",
severity="critical",
evidence=evidence,
seen_ns=event.timestamp_ns,
session_id=event.session_id,
details={
"backend": _event_backend(event),
"reason": reason,
"collector": event.collector,
"device_id": event.device_id,
},
)
def _accumulate_collector_issue(
accumulator: dict[str, _IssueAccumulator],
row: EventRow,
) -> None:
event = row.event
metadata = event.metadata
health_status = normalize_text_dimension(
metadata.get("collector_health_status"),
default="degraded",
)
partial_fields = metadata.get("collector_partial_fields")
if not isinstance(partial_fields, Sequence) or isinstance(partial_fields, str):
partial_fields = ()
last_error = metadata.get("collector_last_error")
fingerprint = IssueFingerprint(
kind="collector_degradation",
dimensions={
"collector": event.collector,
"backend": _event_backend(event),
"health_status": health_status,
"partial_fields": list(partial_fields),
"error_stem": normalized_error_stem(last_error),
},
)
evidence = IssueEvidenceLink(
session_id=event.session_id,
timestamp_ns=event.timestamp_ns,
rank=event.rank,
source_path=row.source_path,
source_kind=row.source_kind,
event_type=event.event_type,
metadata={
"collector_consecutive_failures": metadata.get(
"collector_consecutive_failures"
),
"collector_next_retry_epoch_s": metadata.get(
"collector_next_retry_epoch_s"
),
"session_status": row.session_status,
},
)
_accumulate_issue(
accumulator,
fingerprint=fingerprint,
title="Collector degradation",
severity="critical" if health_status == "unhealthy" else "warning",
evidence=evidence,
seen_ns=event.timestamp_ns,
session_id=event.session_id,
details={
"collector": event.collector,
"backend": _event_backend(event),
"health_status": health_status,
"partial_fields": list(partial_fields),
"error_stem": normalized_error_stem(last_error),
},
)
def _accumulate_alert_issue(
accumulator: dict[str, _IssueAccumulator],
row: EventRow,
) -> None:
event = row.event
severity = _event_severity(event)
category = categorize_alert_context(event.context)
fingerprint = IssueFingerprint(
kind="alert",
dimensions={
"event_type": event.event_type,
"severity": severity,
"collector": event.collector,
"backend": _event_backend(event),
"category": category,
},
)
evidence = IssueEvidenceLink(
session_id=event.session_id,
timestamp_ns=event.timestamp_ns,
rank=event.rank,
source_path=row.source_path,
source_kind=row.source_kind,
event_type=event.event_type,
metadata={"context": event.context, "session_status": row.session_status},
)
_accumulate_issue(
accumulator,
fingerprint=fingerprint,
title=f"Alert: {category.replace('_', ' ')}",
severity=severity,
evidence=evidence,
seen_ns=event.timestamp_ns,
session_id=event.session_id,
details={
"event_type": event.event_type,
"collector": event.collector,
"backend": _event_backend(event),
"category": category,
},
)
def _accumulate_hidden_memory_issues(
accumulator: dict[str, _IssueAccumulator],
loaded: LoadedTelemetrySession,
source: CatalogSource,
) -> None:
if len(loaded.events) < 3:
return
phase_resolver = PhaseReplayIndex.from_events(loaded.events)
findings = analyze_hidden_memory_gaps(
events=cast(Sequence[TelemetryEventV2], loaded.events),
thresholds=_GAP_ANALYSIS_THRESHOLDS,
format_memory=format_bytes,
remediation_by_classification=_GAP_REMEDIATION_BY_CLASSIFICATION,
phase_resolver=phase_resolver,
)
for finding in findings:
evidence_event = _event_at_timestamp(
loaded.events,
finding.evidence_timestamp_ns,
)
phase_label = summarize_phase_attribution(finding.phase_attribution)
fingerprint = IssueFingerprint(
kind="hidden_memory_anomaly",
dimensions={
"classification": finding.classification,
"severity": finding.severity,
"phase": phase_label,
"collector": (
evidence_event.collector
if evidence_event is not None
else "unknown"
),
"backend": (
_event_backend(evidence_event)
if evidence_event is not None
else "unknown"
),
},
)
evidence = IssueEvidenceLink(
session_id=loaded.summary.session_id,
timestamp_ns=finding.evidence_timestamp_ns,
rank=evidence_event.rank if evidence_event is not None else None,
source_path=_event_source_path(loaded, source),
source_kind=source.source_kind,
event_type=(
evidence_event.event_type if evidence_event is not None else "sample"
),
metadata={
"classification": finding.classification,
"confidence": finding.confidence,
"phase_attribution": phase_attribution_to_payload(
finding.phase_attribution
),
},
)
_accumulate_issue(
accumulator,
fingerprint=fingerprint,
title=f"Hidden-memory anomaly: {finding.classification}",
severity=finding.severity,
evidence=evidence,
seen_ns=finding.evidence_timestamp_ns,
session_id=loaded.summary.session_id,
details={
"classification": finding.classification,
"description": finding.description,
"confidence": finding.confidence,
"evidence": dict(finding.evidence),
"phase": phase_label,
},
)
def _event_at_timestamp(
events: Sequence[TelemetryEvent],
timestamp_ns: int | None,
) -> TelemetryEvent | None:
if timestamp_ns is None:
return None
for event in events:
if event.timestamp_ns == timestamp_ns:
return event
return None
def _max_severity(first: str, second: str) -> str:
first_rank = _SEVERITY_RANK.get(first, 9)
second_rank = _SEVERITY_RANK.get(second, 9)
return second if second_rank < first_rank else first
def _issue_matches(issue: StormlogIssue, filters: IssueFilter) -> bool:
if filters.fingerprint_id is not None and (
issue.fingerprint_id != filters.fingerprint_id
):
return False
if filters.kind is not None and issue.kind != filters.kind:
return False
if filters.state is not None and issue.state != filters.state:
return False
if filters.severity is not None and issue.severity != filters.severity:
return False
if filters.session_id is not None and (
filters.session_id not in issue.affected_sessions
):
return False
return True
def _issue_sort_key(issue: StormlogIssue) -> tuple[int, int, str]:
last_seen = issue.last_seen_ns if issue.last_seen_ns is not None else -1
return (
_SEVERITY_RANK.get(issue.severity, 9),
-last_seen,
issue.fingerprint_id,
)
def _summarize_peak(
rows: Sequence[EventRow],
metric: str,
field_name: str,
group_by: SummaryGroupBy,
) -> list[SummaryRow]:
best: dict[tuple[str | None, int | None, str | None], tuple[int, EventRow]] = {}
for row in rows:
value = int(getattr(row.event, field_name))
key = _summary_group_key(row, group_by)
existing = best.get(key)
if existing is None or value > existing[0]:
best[key] = (value, row)
output: list[SummaryRow] = []
for key, (value, event_row) in sorted(best.items(), key=lambda item: str(item[0])):
session_id, rank, status = key
output.append(
SummaryRow(
metric=metric,
group_by=group_by,
session_id=session_id,
rank=rank,
status=status,
value=value,
details={"timestamp_ns": event_row.event.timestamp_ns},
)
)
return output
def _summarize_count(
rows: Sequence[EventRow],
metric: str,
group_by: SummaryGroupBy,
) -> list[SummaryRow]:
counts: dict[tuple[str | None, int | None, str | None], int] = defaultdict(int)
for row in rows:
counts[_summary_group_key(row, group_by)] += 1
output: list[SummaryRow] = []
for session_id, rank, status in sorted(counts, key=str):
output.append(
SummaryRow(
metric=metric,
group_by=group_by,
session_id=session_id,
rank=rank,
status=status,
value=counts[(session_id, rank, status)],
)
)
return output
def _summarize_hidden_memory_gap_growth(
rows: Sequence[EventRow],
group_by: SummaryGroupBy,
) -> list[SummaryRow]:
grouped: dict[tuple[str | None, int | None, str | None], list[EventRow]] = (
defaultdict(list)
)
for row in rows:
if row.event.event_type != "sample":
continue
grouped[_summary_group_key(row, group_by)].append(row)
output: list[SummaryRow] = []
for key, group_rows in sorted(grouped.items(), key=lambda item: str(item[0])):
group_rows.sort(key=lambda row: row.event.timestamp_ns)
gaps = [
row.event.device_used_bytes - row.event.allocator_reserved_bytes
for row in group_rows
]
if not gaps:
continue
session_id, rank, status = key
output.append(
SummaryRow(
metric="hidden_memory_gap_growth",
group_by=group_by,
session_id=session_id,
rank=rank,
status=status,
value=gaps[-1] - gaps[0],
details={
"first_gap_bytes": gaps[0],
"latest_gap_bytes": gaps[-1],
"peak_gap_bytes": max(gaps),
"sample_count": len(gaps),
},
)
)
return output
def _string_or_none(value: object) -> str | None:
return value if isinstance(value, str) and value else None
def _int_or_none(value: object) -> int | None:
if value is None:
return None
if not isinstance(value, (int, float, str)) or isinstance(value, bool):
return None
try:
return int(value)
except (TypeError, ValueError):
return None
__all__ = [
"ArtifactCatalog",
"CatalogOOMBundle",
"CatalogSource",
"CatalogWarning",
"EventFilter",
"EventRow",
"IssueFilter",
"OOMBundleFilter",
"OOMBundleRow",
"QueryStore",
"SessionFilter",
"SessionRow",
"SummaryRow",
"open",
]