Source code for stormlog.query_cli

"""Command-line interface for local Stormlog artifact queries."""

from __future__ import annotations

import argparse
import csv
import json
import sys
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from typing import Any

from . import query as query_api


@dataclass(frozen=True)
class _OutputMode:
    json: bool = False
    csv: bool = False
    table: bool = False


[docs] def main(argv: Sequence[str] | None = None) -> int: """Run the `stormlog query` CLI.""" parser = _build_parser() args = parser.parse_args(argv) if args.command is None: parser.print_help() return 0 store = query_api.open(args.paths) output_mode = _OutputMode( json=bool(getattr(args, "json", False)), csv=bool(getattr(args, "csv", False)), table=bool(getattr(args, "table", False)), ) try: if args.command == "sessions": rows = [ row.as_dict() for row in store.list_sessions( query_api.SessionFilter( session_id=args.session_id, status=args.status, job_id=args.job_id, rank=args.rank, world_size=args.world_size, has_oom_bundle=args.has_oom_bundle, source_kind=args.source_kind, ) ) ] _emit_rows(rows, output_mode, _SESSION_COLUMNS) elif args.command == "events": rows = [ row.as_dict() for row in store.query_events( query_api.EventFilter( session_id=args.session_id, event_type=args.event_type, rank=args.rank, collector=args.collector, status=args.status, time_start_ns=args.time_start_ns, time_end_ns=args.time_end_ns, has_alert=args.has_alert, collector_health_status=args.collector_health_status, backend=args.backend, limit=args.limit, ) ) ] _emit_rows(rows, output_mode, _EVENT_COLUMNS) elif args.command == "ooms": rows = [ row.as_dict() for row in store.list_oom_bundles( query_api.OOMBundleFilter( session_id=args.session_id, backend=args.backend, reason=args.reason, created_after=args.created_after, created_before=args.created_before, ) ) ] _emit_rows(rows, output_mode, _OOM_COLUMNS) elif args.command == "issues": if output_mode.csv: print( "Error: --csv is not supported for issue queries", file=sys.stderr ) return 2 rows = [ row.as_dict() for row in store.list_issues( query_api.IssueFilter( fingerprint_id=args.fingerprint_id, kind=args.kind, state=args.state, severity=args.severity, session_id=args.session_id, ) ) ] _emit_rows(rows, output_mode, _ISSUE_COLUMNS) elif args.command == "summary": if output_mode.csv: print( "Error: --csv is not supported for summary queries", file=sys.stderr ) return 2 rows = [ row.as_dict() for row in store.summarize(args.metric, group_by=args.group_by) ] _emit_rows(rows, output_mode, _SUMMARY_COLUMNS) except BrokenPipeError: return 1 except Exception as exc: print(f"Error: {exc}", file=sys.stderr) return 1 return 0
def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="stormlog query", description="Query local Stormlog telemetry sessions and artifact bundles.", ) subparsers = parser.add_subparsers(dest="command", help="Query targets") sessions = subparsers.add_parser("sessions", help="List artifact sessions") _add_paths(sessions) _add_output_flags(sessions, include_csv=True) sessions.add_argument("--session-id") sessions.add_argument("--status") sessions.add_argument("--job-id") sessions.add_argument("--rank", type=int) sessions.add_argument("--world-size", type=int) sessions.add_argument("--source-kind") sessions.add_argument( "--has-oom-bundle", action=argparse.BooleanOptionalAction, default=None, help="Filter sessions by linked OOM bundle presence.", ) events = subparsers.add_parser("events", help="Query telemetry events") _add_paths(events) _add_output_flags(events, include_csv=True) events.add_argument("--session-id", "--session", dest="session_id") events.add_argument("--event-type") events.add_argument("--rank", type=int) events.add_argument("--collector") events.add_argument("--status") events.add_argument("--time-start-ns", type=int) events.add_argument("--time-end-ns", type=int) events.add_argument( "--has-alert", action=argparse.BooleanOptionalAction, default=None, help="Filter warning, critical, error, or severity-marked events.", ) events.add_argument("--collector-health-status") events.add_argument("--backend") events.add_argument("--limit", type=int) ooms = subparsers.add_parser("ooms", help="List OOM dump bundles") _add_paths(ooms) _add_output_flags(ooms, include_csv=True) ooms.add_argument("--session-id", "--session", dest="session_id") ooms.add_argument("--backend") ooms.add_argument("--reason") ooms.add_argument("--created-after") ooms.add_argument("--created-before") issues = subparsers.add_parser("issues", help="List grouped recurring issues") _add_paths(issues) _add_output_flags(issues, include_csv=False) issues.add_argument("--fingerprint-id") issues.add_argument( "--kind", choices=[ "oom", "collector_degradation", "alert", "hidden_memory_anomaly", ], ) issues.add_argument( "--state", choices=["open", "resolved", "ignored", "regressed"], ) issues.add_argument("--severity") issues.add_argument("--session-id", "--session", dest="session_id") summary = subparsers.add_parser("summary", help="Run built-in summaries") _add_paths(summary) _add_output_flags(summary, include_csv=False) summary.add_argument( "--metric", required=True, choices=[ "session_count_by_status", "peak_allocator_allocated_bytes", "peak_allocator_reserved_bytes", "peak_device_used_bytes", "alert_count", "collector_degradation_transitions", "interrupted_sessions_with_oom_bundles", "hidden_memory_gap_growth", ], ) summary.add_argument( "--group-by", choices=["session", "session-rank", "rank", "status"], default=None, ) return parser def _add_paths(parser: argparse.ArgumentParser) -> None: parser.add_argument("paths", nargs="+", help="Artifact paths to query") def _add_output_flags( parser: argparse.ArgumentParser, *, include_csv: bool, ) -> None: group = parser.add_mutually_exclusive_group() group.add_argument("--json", action="store_true", help="Emit JSON rows") group.add_argument("--table", action="store_true", help="Emit a readable table") if include_csv: group.add_argument("--csv", action="store_true", help="Emit CSV rows") def _emit_rows( rows: Sequence[Mapping[str, Any]], output_mode: _OutputMode, preferred_columns: Sequence[str], ) -> None: columns = _columns_for_rows(rows, preferred_columns) if output_mode.json: print(json.dumps(rows, indent=2, sort_keys=True, default=str)) elif output_mode.csv: _emit_csv(rows, columns) else: print(_format_table(rows, columns)) def _columns_for_rows( rows: Sequence[Mapping[str, Any]], preferred_columns: Sequence[str], ) -> list[str]: discovered = {key for row in rows for key in row} columns = [column for column in preferred_columns if column in discovered] columns.extend(sorted(discovered - set(columns))) return columns or list(preferred_columns) def _emit_csv(rows: Sequence[Mapping[str, Any]], columns: Sequence[str]) -> None: writer = csv.DictWriter(sys.stdout, fieldnames=list(columns), extrasaction="ignore") writer.writeheader() for row in rows: writer.writerow({key: _csv_value(row.get(key)) for key in columns}) def _format_table( rows: Sequence[Mapping[str, Any]], columns: Sequence[str], ) -> str: if not rows: return "No rows." labels = [_label(column) for column in columns] widths = [ max(len(label), *[len(_table_value(row.get(column))) for row in rows]) for label, column in zip(labels, columns) ] header = " ".join( label.ljust(width) for label, width in zip(labels, widths) ).rstrip() separator = " ".join("-" * width for width in widths).rstrip() body = [ " ".join( _table_value(row.get(column)).ljust(width) for column, width in zip(columns, widths) ).rstrip() for row in rows ] return "\n".join([header, separator, *body]) def _table_value(value: Any) -> str: if value is None: return "-" if isinstance(value, (dict, list)): return json.dumps(value, sort_keys=True, default=str) return str(value) def _csv_value(value: Any) -> Any: if isinstance(value, (dict, list)): return json.dumps(value, sort_keys=True, default=str) return value def _label(column: str) -> str: return column.replace("_", " ").title() _SESSION_COLUMNS = ( "session_id", "status", "started_at_ns", "ended_at_ns", "host", "pid", "job_id", "rank", "local_rank", "world_size", "source_kind", "source_path", "source_count", "warning_count", "event_count", "oom_bundle_count", ) _EVENT_COLUMNS = ( "session_id", "timestamp_ns", "event_type", "rank", "local_rank", "world_size", "collector", "allocator_allocated_bytes", "allocator_reserved_bytes", "device_used_bytes", "source_kind", "source_path", "session_status", ) _OOM_COLUMNS = ( "bundle_path", "created_at_utc", "backend", "reason", "event_count", "session_id", "session_status", "exception_type", "exception_module", ) _ISSUE_COLUMNS = ( "fingerprint_id", "kind", "state", "severity", "title", "hit_count", "first_seen_ns", "last_seen_ns", "affected_sessions", "representative_evidence", ) _SUMMARY_COLUMNS = ( "metric", "group_by", "session_id", "rank", "status", "value", "timestamp_ns", "first_gap_bytes", "latest_gap_bytes", "peak_gap_bytes", "sample_count", ) __all__ = ["main"]