Source code for stormlog._wandb.diagnose

"""Diagnose-bundle W&B export helpers."""

from __future__ import annotations

from pathlib import Path
from typing import Any, Mapping

from .attribution import log_attribution_outputs
from .core import (
    WandbExportConfig,
    log_directory_artifact,
    read_json_if_exists,
    resolve_run,
    session_slug,
    session_summary_fields,
    session_summary_from_manifest,
    update_summary,
)


[docs] def export_diagnose_bundle_to_wandb( config: WandbExportConfig, *, command_name: str, artifact_dir: str | Path, ) -> None: """Export one diagnose bundle directory to W&B.""" if not config.enabled: return bundle_dir = Path(artifact_dir) if not bundle_dir.exists() or not bundle_dir.is_dir(): raise FileNotFoundError( f"Diagnose artifact directory not found: {artifact_dir}" ) manifest = read_json_if_exists(bundle_dir / "manifest.json") diagnostic_summary = read_json_if_exists(bundle_dir / "diagnostic_summary.json") session_summary = session_summary_from_manifest(manifest) wandb, run, managed = resolve_run( config, command_name=command_name, session_summary=session_summary, ) try: update_summary( run, diagnose_metrics(diagnostic_summary, manifest) | session_summary_fields(session_summary) | diagnose_summary_fields(bundle_dir, manifest), ) if config.log_tables: log_suggestions_table(wandb, run, diagnostic_summary) if config.log_artifacts: log_directory_artifact( wandb, run, artifact_name=f"stormlog-diagnose-{session_slug(session_summary)}", artifact_type="stormlog-diagnose", path=bundle_dir, ) if config.log_attribution: update_summary( run, log_attribution_outputs( wandb, run, root=bundle_dir, session_slug=session_slug(session_summary), allow_artifact_logging=config.log_artifacts, ), ) finally: if managed: run.finish()
def diagnose_metrics( diagnostic_summary: Mapping[str, Any] | None, manifest: Mapping[str, Any] | None, ) -> dict[str, Any]: if not isinstance(diagnostic_summary, Mapping): return {} metrics: dict[str, Any] = {} for source_key, target_key in ( ("allocated_bytes", "stormlog_allocated_bytes"), ("reserved_bytes", "stormlog_reserved_bytes"), ("peak_bytes", "stormlog_peak_bytes"), ("total_bytes", "stormlog_total_bytes"), ("utilization_ratio", "stormlog_utilization_ratio"), ("fragmentation_ratio", "stormlog_fragmentation_ratio"), ("num_ooms", "stormlog_num_ooms"), ): value = diagnostic_summary.get(source_key) if isinstance(value, (int, float, bool)) and not isinstance(value, complex): metrics[target_key] = value risk_flags = diagnostic_summary.get("risk_flags") if isinstance(risk_flags, Mapping): for key, value in risk_flags.items(): if isinstance(value, bool): metrics[f"stormlog_risk_{key}"] = value if isinstance(manifest, Mapping): risk_detected = manifest.get("risk_detected") exit_code = manifest.get("exit_code") if isinstance(risk_detected, bool): metrics["stormlog_risk_detected"] = risk_detected if isinstance(exit_code, int): metrics["stormlog_exit_code"] = exit_code return metrics def diagnose_summary_fields( artifact_dir: Path, manifest: Mapping[str, Any] | None, ) -> dict[str, Any]: fields: dict[str, Any] = {"stormlog_artifact_dir": artifact_dir.name} if isinstance(manifest, Mapping): for source_key, target_key in ( ("risk_detected", "stormlog_risk_detected"), ("native_history_enabled", "stormlog_native_history_enabled"), ("session_status", "stormlog_session_status"), ): value = manifest.get(source_key) if value is not None: fields[target_key] = value return fields def log_suggestions_table( wandb: Any, run: Any, diagnostic_summary: Mapping[str, Any] | None, ) -> None: if not isinstance(diagnostic_summary, Mapping): return suggestions = diagnostic_summary.get("suggestions") if not isinstance(suggestions, list) or not suggestions: return rows = [ [index + 1, str(suggestion)] for index, suggestion in enumerate(suggestions) ] run.log( { "stormlog_diagnostic_suggestions": wandb.Table( columns=["index", "suggestion"], data=rows, ) } )