"""Append-only telemetry sink with rollover and retention bounds."""
from __future__ import annotations
import json
import os
import threading
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Mapping, TextIO, cast
from .session import (
SESSION_STATUS_COMPLETED,
SESSION_STATUS_INTERRUPTED,
SESSION_STATUS_RUNNING,
SessionSummary,
create_session_summary,
now_ns,
session_summary_from_dict,
session_summary_to_dict,
update_session_summary,
)
MANIFEST_FILENAME = "manifest.json"
SEGMENT_PREFIX = "segment-"
SEGMENT_SUFFIX = ".jsonl"
SINK_SCHEMA_VERSION = 2
[docs]
@dataclass
class TelemetrySinkConfig:
"""Runtime policy for append-only telemetry persistence."""
root_dir: Path
flush_every_events: int = 50
flush_every_seconds: float = 2.0
rollover_max_bytes: int = 64 * 1024 * 1024
rollover_max_events: int = 10000
retention_max_files: int = 8
retention_max_total_bytes: int = 512 * 1024 * 1024
def __post_init__(self) -> None:
self.root_dir = Path(self.root_dir)
if self.flush_every_events <= 0:
raise ValueError("flush_every_events must be >= 1")
if self.flush_every_seconds <= 0:
raise ValueError("flush_every_seconds must be > 0")
if self.rollover_max_bytes <= 0:
raise ValueError("rollover_max_bytes must be >= 1")
if self.rollover_max_events <= 0:
raise ValueError("rollover_max_events must be >= 1")
if self.retention_max_files <= 0:
raise ValueError("retention_max_files must be >= 1")
if self.retention_max_total_bytes <= 0:
raise ValueError("retention_max_total_bytes must be >= 1")
if self.retention_max_total_bytes < self.rollover_max_bytes:
raise ValueError("retention_max_total_bytes must be >= rollover_max_bytes")
[docs]
@dataclass
class TelemetrySinkSegment:
"""One JSONL segment tracked by the append-only sink manifest."""
filename: str
event_count: int
size_bytes: int
closed: bool
session_id: str | None = None
[docs]
@dataclass
class TelemetrySinkManifest:
"""Parsed append-only sink manifest with session ledger and segments."""
schema_version: int
format: str
sessions: list[SessionSummary] = field(default_factory=list)
segments: list[TelemetrySinkSegment] = field(default_factory=list)
[docs]
class AppendOnlyTelemetrySink:
"""Write telemetry records to newline-delimited JSON segments."""
def __init__(self, config: TelemetrySinkConfig) -> None:
self.config = config
self.root_dir = config.root_dir
self.root_dir.mkdir(parents=True, exist_ok=True)
self._manifest_path = self.root_dir / MANIFEST_FILENAME
self._segments: list[TelemetrySinkSegment] = []
self._sessions: dict[str, SessionSummary] = {}
self._active_session_id: str | None = None
self._next_segment_index = 1
self._buffer: list[str] = []
self._buffered_event_count = 0
self._handle: TextIO | None = None
self._lock = threading.Lock()
self._flush_stop_event = threading.Event()
self._flush_thread: threading.Thread | None = None
self._last_flush_monotonic = time.monotonic()
self._closed = False
self._rollover_count = 0
self._pruned_segment_count = 0
self._pruned_bytes = 0
self._load_existing_state()
[docs]
def start_session(self, summary: SessionSummary | None = None) -> SessionSummary:
"""Register the active session for subsequent records."""
with self._lock:
if self._active_session_id is not None:
active = self._sessions.get(self._active_session_id)
if active is not None:
return active
resolved = summary or create_session_summary(
source="stormlog.telemetry_sink",
status=SESSION_STATUS_RUNNING,
)
resolved = update_session_summary(
resolved,
status=SESSION_STATUS_RUNNING,
ended_at_ns=None,
)
self._sessions[resolved.session_id] = resolved
self._active_session_id = resolved.session_id
self._closed = False
self._write_manifest_locked()
return resolved
[docs]
def current_session(self) -> SessionSummary | None:
"""Return the active session summary, if any."""
with self._lock:
if self._active_session_id is None:
return None
return self._sessions.get(self._active_session_id)
[docs]
def append(self, record: Mapping[str, Any]) -> None:
with self._lock:
self._ensure_flush_thread_locked()
self._closed = False
self._ensure_active_session_locked(record)
self._buffer.append(json.dumps(dict(record), sort_keys=True) + "\n")
self._buffered_event_count += 1
self._flush_locked(force=False)
[docs]
def flush(self, force: bool = False) -> None:
with self._lock:
self._flush_locked(force=force)
[docs]
def close(self, session_status: str = SESSION_STATUS_COMPLETED) -> None:
try:
with self._lock:
self._flush_locked(force=True)
if self._handle is not None:
self._handle.close()
self._handle = None
current = self._current_segment()
if current is not None and not current.closed:
current.closed = True
if self._active_session_id is not None:
active = self._sessions.get(self._active_session_id)
if active is not None:
self._sessions[self._active_session_id] = (
update_session_summary(
active,
status=session_status,
ended_at_ns=now_ns(),
)
)
self._active_session_id = None
self._write_manifest_locked()
self._closed = True
finally:
self._stop_flush_thread()
[docs]
def get_diagnostics(self) -> dict[str, int]:
"""Return runtime retention and rollover diagnostics."""
with self._lock:
return self._diagnostics_locked()
def _ensure_active_session_locked(self, record: Mapping[str, Any]) -> None:
record_session_id = record.get("session_id")
if record_session_id is not None and not isinstance(record_session_id, str):
raise ValueError("telemetry sink record session_id must be a string")
if self._active_session_id is None:
resolved = create_session_summary(
source="stormlog.telemetry_sink",
status=SESSION_STATUS_RUNNING,
session_id=(
record_session_id if isinstance(record_session_id, str) else None
),
)
self._sessions[resolved.session_id] = resolved
self._active_session_id = resolved.session_id
self._write_manifest_locked()
return
if (
isinstance(record_session_id, str)
and record_session_id != self._active_session_id
):
raise ValueError(
"telemetry sink record session_id does not match the active session"
)
def _flush_locked(self, force: bool) -> None:
if not self._buffer:
return
now = time.monotonic()
if not force:
if self._buffered_event_count < self.config.flush_every_events and (
now - self._last_flush_monotonic < self.config.flush_every_seconds
):
return
current = self._ensure_current_segment_locked()
payload = "".join(self._buffer)
payload_bytes = payload.encode("utf-8")
handle = self._ensure_handle_locked(current)
handle.write(payload)
handle.flush()
os.fsync(handle.fileno())
current.event_count += self._buffered_event_count
current.size_bytes += len(payload_bytes)
self._buffer.clear()
self._buffered_event_count = 0
self._last_flush_monotonic = now
self._rollover_locked(current)
self._prune_retention_locked()
self._write_manifest_locked()
def _rollover_locked(self, current: TelemetrySinkSegment) -> None:
if (
current.event_count < self.config.rollover_max_events
and current.size_bytes < self.config.rollover_max_bytes
):
return
current.closed = True
self._rollover_count += 1
if self._handle is not None:
self._handle.close()
self._handle = None
def _prune_retention_locked(self) -> None:
while True:
total_bytes = sum(segment.size_bytes for segment in self._segments)
over_file_limit = len(self._segments) > self.config.retention_max_files
over_size_limit = total_bytes > self.config.retention_max_total_bytes
if not over_file_limit and not over_size_limit:
return
removable = next(
(segment for segment in self._segments if segment.closed),
None,
)
if removable is None:
return
path = self.root_dir / removable.filename
if path.exists():
path.unlink()
self._segments.remove(removable)
self._pruned_segment_count += 1
self._pruned_bytes += removable.size_bytes
def _current_segment(self) -> TelemetrySinkSegment | None:
if not self._segments:
return None
current = self._segments[-1]
if current.closed:
return None
if (
self._active_session_id is not None
and current.session_id != self._active_session_id
):
return None
return current
def _ensure_flush_thread_locked(self) -> None:
if self._flush_thread is not None and self._flush_thread.is_alive():
return
self._flush_stop_event = threading.Event()
self._flush_thread = threading.Thread(
target=self._run_flush_loop,
name="stormlog-telemetry-sink-flush",
daemon=True,
)
self._flush_thread.start()
def _stop_flush_thread(self) -> None:
thread = self._flush_thread
if thread is None:
return
self._flush_stop_event.set()
thread.join(timeout=self.config.flush_every_seconds + 1.0)
self._flush_thread = None
def _run_flush_loop(self) -> None:
while not self._flush_stop_event.wait(timeout=self.config.flush_every_seconds):
with self._lock:
self._flush_locked(force=False)
def _ensure_current_segment_locked(self) -> TelemetrySinkSegment:
current = self._current_segment()
if current is not None:
return current
segment = TelemetrySinkSegment(
filename=f"{SEGMENT_PREFIX}{self._next_segment_index:06d}{SEGMENT_SUFFIX}",
event_count=0,
size_bytes=0,
closed=False,
session_id=self._active_session_id,
)
self._next_segment_index += 1
self._segments.append(segment)
return segment
def _ensure_handle_locked(self, current: TelemetrySinkSegment) -> TextIO:
if self._handle is None:
segment_path = self.root_dir / current.filename
self._recover_segment_tail_locked(segment_path, current)
self._handle = segment_path.open("a", encoding="utf-8")
return self._handle
def _load_existing_state(self) -> None:
discovered = _discover_segment_paths(self.root_dir)
manifest = read_telemetry_sink_manifest(self.root_dir)
manifest_needs_rewrite = False
if manifest is not None:
self._segments = self._merge_segment_state(manifest.segments, discovered)
self._sessions = {
summary.session_id: summary for summary in manifest.sessions
}
for session_id, summary in list(self._sessions.items()):
if summary.status == SESSION_STATUS_RUNNING:
self._sessions[session_id] = update_session_summary(
summary,
status=SESSION_STATUS_INTERRUPTED,
ended_at_ns=now_ns(),
)
manifest_needs_rewrite = True
for segment in self._segments:
if segment.session_id in self._sessions and (
self._sessions[segment.session_id].status
== SESSION_STATUS_INTERRUPTED
):
segment.closed = True
self._next_segment_index = self._compute_next_segment_index()
if manifest.schema_version != SINK_SCHEMA_VERSION:
manifest_needs_rewrite = True
if manifest_needs_rewrite:
self._write_manifest_locked()
return
if not discovered:
return
for path in discovered:
self._segments.append(
TelemetrySinkSegment(
filename=path.name,
event_count=self._count_records(path),
size_bytes=path.stat().st_size,
closed=True,
session_id=None,
)
)
self._next_segment_index = self._compute_next_segment_index()
self._write_manifest_locked()
def _compute_next_segment_index(self) -> int:
max_index = 0
for segment in self._segments:
stem = Path(segment.filename).stem
if not stem.startswith(SEGMENT_PREFIX):
continue
suffix = stem[len(SEGMENT_PREFIX) :]
if suffix.isdigit():
max_index = max(max_index, int(suffix))
return max_index + 1
def _write_manifest_locked(self) -> None:
payload = {
"schema_version": SINK_SCHEMA_VERSION,
"format": "stormlog.append_only_telemetry_sink",
"sessions": [
session_summary_to_dict(summary)
for summary in sorted(
self._sessions.values(),
key=lambda session: (session.started_at_ns, session.session_id),
)
],
"segments": [asdict(segment) for segment in self._segments],
}
temp_path = self._manifest_path.with_suffix(".tmp")
temp_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
temp_path.replace(self._manifest_path)
def _diagnostics_locked(self) -> dict[str, int]:
retained_bytes = sum(segment.size_bytes for segment in self._segments)
return {
"rollover_count": self._rollover_count,
"pruned_segment_count": self._pruned_segment_count,
"pruned_bytes": self._pruned_bytes,
"final_retained_files": len(self._segments),
"final_retained_bytes": retained_bytes,
}
@staticmethod
def _count_records(path: Path) -> int:
with path.open("r", encoding="utf-8") as handle:
return sum(1 for line in handle if line.strip())
def _recover_segment_tail_locked(
self,
segment_path: Path,
current: TelemetrySinkSegment,
) -> None:
if not segment_path.exists():
current.event_count = 0
current.size_bytes = 0
return
payload = segment_path.read_bytes()
if payload and not payload.endswith(b"\n"):
last_newline = payload.rfind(b"\n")
payload = payload[: last_newline + 1] if last_newline >= 0 else b""
segment_path.write_bytes(payload)
current.size_bytes = len(payload)
current.event_count = self._count_records(segment_path)
def _merge_segment_state(
self,
manifest_segments: list[TelemetrySinkSegment],
discovered_segments: list[Path],
) -> list[TelemetrySinkSegment]:
manifest_by_name = {
segment.filename: TelemetrySinkSegment(
filename=segment.filename,
event_count=segment.event_count,
size_bytes=segment.size_bytes,
closed=segment.closed,
session_id=segment.session_id,
)
for segment in manifest_segments
}
discovered_by_name = {path.name: path for path in discovered_segments}
merged_names = sorted(set(manifest_by_name) | set(discovered_by_name))
merged: list[TelemetrySinkSegment] = []
for name in merged_names:
manifest_segment = manifest_by_name.get(name)
if manifest_segment is not None:
merged.append(manifest_segment)
continue
path = discovered_by_name[name]
merged.append(
TelemetrySinkSegment(
filename=name,
event_count=self._count_records(path),
size_bytes=path.stat().st_size,
closed=True,
session_id=None,
)
)
return merged
[docs]
def resolve_telemetry_sink_segment_paths(path: str | Path) -> list[Path]:
"""Resolve append-only sink inputs to ordered JSONL segment paths."""
resolved_path = Path(path)
if resolved_path.is_file():
if resolved_path.suffix == SEGMENT_SUFFIX:
return [resolved_path]
if resolved_path.name == MANIFEST_FILENAME:
manifest = read_telemetry_sink_manifest(resolved_path)
manifest_segments = [
resolved_path.parent / segment.filename
for segment in (manifest.segments if manifest is not None else [])
if (resolved_path.parent / segment.filename).exists()
]
return _merge_segment_paths(
manifest_segments,
_discover_segment_paths(resolved_path.parent),
)
return []
if not resolved_path.is_dir():
return []
manifest = read_telemetry_sink_manifest(resolved_path)
if manifest is not None:
manifest_segments = [
resolved_path / segment.filename
for segment in manifest.segments
if (resolved_path / segment.filename).exists()
]
return _merge_segment_paths(
manifest_segments,
_discover_segment_paths(resolved_path),
)
return _discover_segment_paths(resolved_path)
[docs]
def resolve_telemetry_sink_manifest_path(path: str | Path) -> Path | None:
"""Resolve a file or directory path to a sink manifest path, if present."""
resolved = Path(path)
if resolved.is_file():
if resolved.name == MANIFEST_FILENAME:
return resolved
if resolved.suffix == SEGMENT_SUFFIX:
candidate = resolved.parent / MANIFEST_FILENAME
return candidate if candidate.exists() else None
return None
if resolved.is_dir():
candidate = resolved / MANIFEST_FILENAME
return candidate if candidate.exists() else None
return None
def _coerce_manifest_int(
value: object,
*,
default: int,
minimum: int | None = None,
) -> int:
try:
coerced = int(cast(Any, value))
except (TypeError, ValueError):
return default
if minimum is not None and coerced < minimum:
return default
return int(coerced)
def _coerce_manifest_entries(value: object) -> list[object]:
return value if isinstance(value, list) else []
[docs]
def read_telemetry_sink_manifest(path: str | Path) -> TelemetrySinkManifest | None:
"""Read a sink manifest from a sink directory, manifest file, or segment file."""
manifest_path = resolve_telemetry_sink_manifest_path(path)
if manifest_path is None or not manifest_path.exists():
return None
try:
payload = json.loads(manifest_path.read_text(encoding="utf-8"))
except Exception:
return None
if not isinstance(payload, Mapping):
return None
schema_version = _coerce_manifest_int(
payload.get("schema_version", 1),
default=1,
minimum=1,
)
fmt_value = payload.get("format", "stormlog.append_only_telemetry_sink")
fmt = (
fmt_value
if isinstance(fmt_value, str) and fmt_value
else "stormlog.append_only_telemetry_sink"
)
sessions: list[SessionSummary] = []
if schema_version >= 2:
for raw_session in _coerce_manifest_entries(payload.get("sessions", [])):
if not isinstance(raw_session, Mapping):
continue
try:
sessions.append(session_summary_from_dict(raw_session))
except Exception:
continue
segments: list[TelemetrySinkSegment] = []
for raw_segment in _coerce_manifest_entries(payload.get("segments", [])):
if not isinstance(raw_segment, Mapping):
continue
filename = raw_segment.get("filename")
if not isinstance(filename, str):
continue
try:
segments.append(
TelemetrySinkSegment(
filename=filename,
event_count=_coerce_manifest_int(
raw_segment.get("event_count", 0),
default=0,
minimum=0,
),
size_bytes=_coerce_manifest_int(
raw_segment.get("size_bytes", 0),
default=0,
minimum=0,
),
closed=bool(raw_segment.get("closed", False)),
session_id=(
raw_segment.get("session_id")
if isinstance(raw_segment.get("session_id"), str)
else None
),
)
)
except Exception:
continue
return TelemetrySinkManifest(
schema_version=schema_version,
format=fmt,
sessions=sessions,
segments=segments,
)
def _discover_segment_paths(root_dir: Path) -> list[Path]:
return sorted(root_dir.glob(f"{SEGMENT_PREFIX}*{SEGMENT_SUFFIX}"))
def _merge_segment_paths(
manifest_segments: list[Path],
discovered_segments: list[Path],
) -> list[Path]:
merged_by_name = {path.name: path for path in discovered_segments}
for path in manifest_segments:
merged_by_name[path.name] = path
return [merged_by_name[name] for name in sorted(merged_by_name)]
__all__ = [
"AppendOnlyTelemetrySink",
"MANIFEST_FILENAME",
"SEGMENT_PREFIX",
"SEGMENT_SUFFIX",
"SINK_SCHEMA_VERSION",
"TelemetrySinkConfig",
"TelemetrySinkManifest",
"TelemetrySinkSegment",
"read_telemetry_sink_manifest",
"resolve_telemetry_sink_manifest_path",
"resolve_telemetry_sink_segment_paths",
]