"""Phase boundary parsing and replay index helpers."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Mapping, Sequence
from .runtime import (
PHASE_ENTER_EVENT,
PHASE_EXIT_EVENT,
PHASE_SCOPE_ATTRIBUTES_KEY,
PHASE_SCOPE_METADATA_KEY,
)
[docs]
@dataclass(frozen=True)
class PhaseBoundaryRecord:
action: str
name: str
path: tuple[str, ...]
depth: int
scope_id: str
parent_scope_id: str | None
thread_id: int
thread_name: str
sequence: int
session_id: str
timestamp_ns: int
attributes: dict[str, Any]
[docs]
@dataclass(frozen=True)
class PhaseSpan:
session_id: str
rank: int
thread_id: int
thread_name: str
scope_id: str
path: tuple[str, ...]
start_ns: int
end_ns: int
sequence: int
synthetic_end: bool = False
@property
def depth(self) -> int:
return len(self.path)
[docs]
class PhaseReplayIndex:
"""Replay phase boundaries into queryable active spans."""
def __init__(
self, intervals_by_group: Mapping[tuple[str, int], Sequence[PhaseSpan]]
) -> None:
self._intervals_by_group = {
key: tuple(sorted(intervals, key=lambda item: (item.start_ns, item.end_ns)))
for key, intervals in intervals_by_group.items()
}
[docs]
@classmethod
def from_events(cls, events: Sequence[Any]) -> "PhaseReplayIndex":
"""Build a replay index from telemetry events."""
session_end_by_group: dict[tuple[str, int], int] = {}
boundaries: list[tuple[int, int, str, int, PhaseBoundaryRecord]] = []
for event in events:
session_id = _event_field(event, "session_id")
timestamp_ns = _event_field(event, "timestamp_ns")
if not isinstance(session_id, str) or not isinstance(timestamp_ns, int):
continue
rank = _coerce_rank(_event_field(event, "rank", 0))
if rank is not None:
group_key = (session_id, rank)
previous_end = session_end_by_group.get(group_key)
if previous_end is None or timestamp_ns > previous_end:
session_end_by_group[group_key] = timestamp_ns
event_type = _event_field(event, "event_type", "")
if event_type not in {PHASE_ENTER_EVENT, PHASE_EXIT_EVENT}:
continue
if rank is None:
continue
scope = parse_phase_boundary(event)
if scope is None:
continue
boundaries.append(
(
timestamp_ns,
scope.sequence,
scope.scope_id,
rank,
scope,
)
)
boundaries.sort(
key=lambda item: (item[4].session_id, item[3], item[0], item[1], item[2])
)
active_by_thread: dict[tuple[str, int, int], list[PhaseBoundaryRecord]] = {}
intervals_by_group: dict[tuple[str, int], list[PhaseSpan]] = {}
for timestamp_ns, _, _, rank, scope in boundaries:
thread_key = (scope.session_id, rank, scope.thread_id)
stack = active_by_thread.setdefault(thread_key, [])
if scope.action == "enter":
stack.append(scope)
continue
if not stack or stack[-1].scope_id != scope.scope_id:
continue
opened = stack.pop()
if not stack:
active_by_thread.pop(thread_key, None)
intervals_by_group.setdefault((scope.session_id, rank), []).append(
PhaseSpan(
session_id=scope.session_id,
rank=rank,
thread_id=scope.thread_id,
thread_name=scope.thread_name,
scope_id=scope.scope_id,
path=opened.path,
start_ns=opened.timestamp_ns,
end_ns=timestamp_ns,
sequence=opened.sequence,
)
)
for (session_id, rank, _thread_id), stack in active_by_thread.items():
session_end_ns = session_end_by_group.get((session_id, rank))
if session_end_ns is None:
continue
for scope in stack:
intervals_by_group.setdefault((session_id, rank), []).append(
PhaseSpan(
session_id=session_id,
rank=rank,
thread_id=scope.thread_id,
thread_name=scope.thread_name,
scope_id=scope.scope_id,
path=scope.path,
start_ns=scope.timestamp_ns,
end_ns=session_end_ns,
sequence=scope.sequence,
synthetic_end=True,
)
)
return cls(intervals_by_group)
[docs]
def spans_for(
self,
*,
session_id: str,
rank: int | None = None,
) -> list[PhaseSpan]:
"""Return all reconstructed spans for one session/rank selection."""
spans: list[PhaseSpan] = []
for (
group_session_id,
group_rank,
), intervals in self._intervals_by_group.items():
if group_session_id != session_id:
continue
if rank is not None and group_rank != rank:
continue
spans.extend(intervals)
return list(spans)
[docs]
def active_spans(
self,
*,
timestamp_ns: int,
session_id: str,
rank: int | None = None,
) -> list[PhaseSpan]:
"""Return active spans at a timestamp before attribution policy is applied."""
matches: list[PhaseSpan] = []
for (
group_session_id,
group_rank,
), intervals in self._intervals_by_group.items():
if group_session_id != session_id:
continue
if rank is not None and group_rank != rank:
continue
for interval in intervals:
if interval.start_ns <= timestamp_ns <= interval.end_ns:
matches.append(interval)
return matches
[docs]
def resolve(
self,
*,
timestamp_ns: int,
session_id: str | None,
rank: int | None = None,
origin_thread_id: int | None = None,
origin_phase_scope_id: str | None = None,
) -> Any:
"""Resolve one timestamp against the replay index."""
if session_id is None:
return None
from .policy import attribute_active_spans
return attribute_active_spans(
self.active_spans(
timestamp_ns=timestamp_ns,
session_id=session_id,
rank=rank,
),
origin_thread_id=origin_thread_id,
origin_phase_scope_id=origin_phase_scope_id,
)
[docs]
def resolve_for_event(self, event: Any) -> Any:
"""Resolve one event-like object against the replay index."""
from .policy import resolve_phase_for_event
return resolve_phase_for_event(self, event)
[docs]
def parse_phase_boundary(event: Any) -> PhaseBoundaryRecord | None:
"""Extract one normalized phase payload from an event-like object."""
event_type = _event_field(event, "event_type", "")
if event_type not in {PHASE_ENTER_EVENT, PHASE_EXIT_EVENT}:
return None
metadata = _event_field(event, "metadata", {})
if not isinstance(metadata, Mapping):
return None
raw_scope = metadata.get(PHASE_SCOPE_METADATA_KEY)
if not isinstance(raw_scope, Mapping):
return None
session_id = _event_field(event, "session_id")
timestamp_ns = _event_field(event, "timestamp_ns")
if not isinstance(session_id, str) or not isinstance(timestamp_ns, int):
return None
action = raw_scope.get("action")
name = raw_scope.get("name")
scope_id = raw_scope.get("scope_id")
path = raw_scope.get("path")
thread_name = raw_scope.get("thread_name")
if (
not isinstance(action, str)
or not isinstance(name, str)
or not name.strip()
or not isinstance(scope_id, str)
or not isinstance(thread_name, str)
or not isinstance(path, list)
):
return None
expected_action = "enter" if event_type == PHASE_ENTER_EVENT else "exit"
if action != expected_action:
return None
normalized_path = tuple(
str(part) for part in path if isinstance(part, str) and part
)
if not normalized_path:
return None
depth_value = raw_scope.get("depth")
sequence_value = raw_scope.get("sequence")
thread_id_value = raw_scope.get("thread_id")
parent_scope_id_value = raw_scope.get("parent_scope_id")
if not isinstance(depth_value, int) or depth_value != len(normalized_path):
depth_value = len(normalized_path)
if not isinstance(sequence_value, int):
return None
if not isinstance(thread_id_value, int):
return None
if parent_scope_id_value is not None and not isinstance(parent_scope_id_value, str):
return None
raw_attributes = raw_scope.get(PHASE_SCOPE_ATTRIBUTES_KEY, {})
attributes = dict(raw_attributes) if isinstance(raw_attributes, Mapping) else {}
return PhaseBoundaryRecord(
action=action,
name=name.strip(),
path=normalized_path,
depth=depth_value,
scope_id=scope_id,
parent_scope_id=parent_scope_id_value,
thread_id=thread_id_value,
thread_name=thread_name,
sequence=sequence_value,
session_id=session_id,
timestamp_ns=timestamp_ns,
attributes=attributes,
)
[docs]
def is_phase_boundary_event(event: Any) -> bool:
"""Return ``True`` when the event is a structured phase boundary."""
event_type = _event_field(event, "event_type", "")
if event_type not in {PHASE_ENTER_EVENT, PHASE_EXIT_EVENT}:
return False
return parse_phase_boundary(event) is not None
def _event_field(event: Any, field_name: str, default: Any = None) -> Any:
if isinstance(event, Mapping):
return event.get(field_name, default)
return getattr(event, field_name, default)
def _coerce_rank(value: Any) -> int | None:
if isinstance(value, bool):
return None
if isinstance(value, int):
return value
if isinstance(value, str):
stripped = value.strip()
if not stripped:
return None
try:
return int(stripped)
except ValueError:
return None
return None
__all__ = [
"PhaseBoundaryRecord",
"PhaseReplayIndex",
"PhaseSpan",
"is_phase_boundary_event",
"parse_phase_boundary",
]