From a3b4a2a9dd8e5a4b762746086267a599ab985f2f Mon Sep 17 00:00:00 2001 From: Tony Mongkolsmai Date: Wed, 15 Apr 2026 14:44:57 -0400 Subject: [PATCH] Add telemetry plugin system for sampling profiler Introduce a pluggable telemetry framework that allows external data sources (e.g. GPU metrics) to be collected alongside CPU sampling profiles. - New --plugin CLI flag (repeatable) wired through attach, run, live-attach, live-run, and replay code paths - TelemetryHelperManager lifecycle (start/poll/stop) integrated into SampleProfiler sampling loop - Binary format gains sidecar chunk support via TelemetryChunkWriter for writing plugin events, and replay_sidecar_to_sink for reading them back - Live TUI gets a telemetry panel widget with keybindings for toggling (g), cycling plugins (n), and cycling display modes (m) - New Lib/profiling/sampling/telemetry/ package with plugin registry, helper subprocess manager, chunk I/O, and an NVIDIA GPU plugin Made-with: Cursor --- Lib/profiling/sampling/binary_collector.py | 22 ++ Lib/profiling/sampling/binary_reader.py | 5 +- Lib/profiling/sampling/cli.py | 41 ++- .../sampling/live_collector/__init__.py | 4 + .../sampling/live_collector/collector.py | 65 ++++- .../sampling/live_collector/constants.py | 3 +- .../sampling/live_collector/widgets.py | 38 ++- Lib/profiling/sampling/sample.py | 45 +++- Lib/profiling/sampling/telemetry/README.md | 245 ++++++++++++++++++ Lib/profiling/sampling/telemetry/__init__.py | 18 ++ Lib/profiling/sampling/telemetry/__main__.py | 4 + Lib/profiling/sampling/telemetry/chunks.py | 56 ++++ Lib/profiling/sampling/telemetry/helper.py | 45 ++++ .../sampling/telemetry/interfaces.py | 16 ++ Lib/profiling/sampling/telemetry/manager.py | 71 +++++ .../sampling/telemetry/plugin_registry.py | 67 +++++ .../sampling/telemetry/plugins/__init__.py | 1 + .../sampling/telemetry/plugins/nvidia_gpu.py | 134 ++++++++++ .../plugins/nvidia_gpu_aggregator.py | 63 +++++ Lib/profiling/sampling/telemetry/replay.py | 31 +++ 20 files changed, 967 insertions(+), 7 deletions(-) create mode 100644 Lib/profiling/sampling/telemetry/README.md create mode 100644 Lib/profiling/sampling/telemetry/__init__.py create mode 100644 Lib/profiling/sampling/telemetry/__main__.py create mode 100644 Lib/profiling/sampling/telemetry/chunks.py create mode 100644 Lib/profiling/sampling/telemetry/helper.py create mode 100644 Lib/profiling/sampling/telemetry/interfaces.py create mode 100644 Lib/profiling/sampling/telemetry/manager.py create mode 100644 Lib/profiling/sampling/telemetry/plugin_registry.py create mode 100644 Lib/profiling/sampling/telemetry/plugins/__init__.py create mode 100644 Lib/profiling/sampling/telemetry/plugins/nvidia_gpu.py create mode 100644 Lib/profiling/sampling/telemetry/plugins/nvidia_gpu_aggregator.py create mode 100644 Lib/profiling/sampling/telemetry/replay.py diff --git a/Lib/profiling/sampling/binary_collector.py b/Lib/profiling/sampling/binary_collector.py index 64afe632fae175..8cf72e848b9fd8 100644 --- a/Lib/profiling/sampling/binary_collector.py +++ b/Lib/profiling/sampling/binary_collector.py @@ -5,6 +5,7 @@ import _remote_debugging from .collector import Collector +from .telemetry.chunks import TelemetryChunkWriter # Compression type constants (must match binary_io.h) COMPRESSION_NONE = 0 @@ -67,6 +68,21 @@ def __init__(self, filename, sample_interval_usec, *, skip_idle=False, self._writer = _remote_debugging.BinaryWriter( filename, sample_interval_usec, start_time_us, compression=compression_type ) + self._telemetry_chunk_writer = None + + def set_plugin_enabled(self, plugin_id): + if plugin_id and self._telemetry_chunk_writer is None: + self._telemetry_chunk_writer = TelemetryChunkWriter(self.filename) + + def set_plugin_metadata(self, plugin_id, metadata): + if self._telemetry_chunk_writer is None: + return + self._telemetry_chunk_writer.write_record(plugin_id, "metadata", metadata) + + def collect_plugin_event(self, plugin_id, event_type, payload): + if self._telemetry_chunk_writer is None: + return + self._telemetry_chunk_writer.write_record(plugin_id, event_type, payload) def collect(self, stack_frames, timestamp_us=None): """Collect profiling data from stack frames. @@ -94,6 +110,8 @@ def export(self, filename=None): filename: Ignored (binary files are written incrementally) """ self._writer.finalize() + if self._telemetry_chunk_writer is not None: + self._telemetry_chunk_writer.close() @property def total_samples(self): @@ -115,6 +133,10 @@ def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit - finalize unless there was an error.""" if exc_type is None: self._writer.finalize() + if self._telemetry_chunk_writer is not None: + self._telemetry_chunk_writer.close() else: self._writer.close() + if self._telemetry_chunk_writer is not None: + self._telemetry_chunk_writer.close() return False diff --git a/Lib/profiling/sampling/binary_reader.py b/Lib/profiling/sampling/binary_reader.py index a11be3652597a6..7507a3969b9d6d 100644 --- a/Lib/profiling/sampling/binary_reader.py +++ b/Lib/profiling/sampling/binary_reader.py @@ -5,6 +5,7 @@ from .gecko_collector import GeckoCollector from .stack_collector import FlamegraphCollector, CollapsedStackCollector from .pstats_collector import PstatsCollector +from .telemetry import replay_sidecar_to_sink class BinaryReader: @@ -70,7 +71,9 @@ def replay_samples(self, collector, progress_callback=None): """ if self._reader is None: raise RuntimeError("Reader not open. Use as context manager.") - return self._reader.replay(collector, progress_callback) + replayed = self._reader.replay(collector, progress_callback) + replay_sidecar_to_sink(self.filename, collector) + return replayed @property def sample_count(self): diff --git a/Lib/profiling/sampling/cli.py b/Lib/profiling/sampling/cli.py index c0aa3ae024a120..da9822d05cd950 100644 --- a/Lib/profiling/sampling/cli.py +++ b/Lib/profiling/sampling/cli.py @@ -21,6 +21,7 @@ from .gecko_collector import GeckoCollector from .binary_collector import BinaryCollector from .binary_reader import BinaryReader +from .telemetry.plugin_registry import resolve_helper_config from .constants import ( MICROSECONDS_PER_SECOND, PROFILING_MODE_ALL, @@ -159,6 +160,11 @@ def _build_child_profiler_args(args): if mode != "wall": child_args.extend(["--mode", mode]) + # Generic telemetry plugin options + child_plugins = getattr(args, "plugins", None) or [] + for plugin_id in child_plugins: + child_args.extend(["--plugin", plugin_id]) + # Format options (skip pstats as it's the default) if args.format != "pstats": child_args.append(f"--{args.format}") @@ -416,8 +422,13 @@ def _add_sampling_options(parser): "Uses thread_suspend on macOS and ptrace on Linux. Adds overhead but ensures memory " "reads are from a frozen state.", ) - - + sampling_group.add_argument( + "--plugin", + dest="plugins", + action="append", + default=[], + help="Enable a telemetry plugin (repeatable)", + ) def _add_mode_options(parser): """Add mode options to a parser.""" mode_group = parser.add_argument_group("Mode options") @@ -789,6 +800,10 @@ def _validate_args(args, parser): if getattr(args, 'command', None) == "replay": return + # Deduplicate while preserving order + if getattr(args, "plugins", None): + args.plugins = list(dict.fromkeys(args.plugins)) + # Warn about blocking mode with aggressive sampling intervals if args.blocking and args.sample_interval_usec < 100: print( @@ -859,6 +874,10 @@ def _validate_args(args, parser): ) return + # Non-live GPU integration currently supports binary capture only. + if getattr(args, "plugins", None) and not args.live and getattr(args, "format", "pstats") != "binary": + parser.error("Telemetry plugins currently require --binary for non-live mode.") + # Validate gecko mode doesn't use non-wall mode if args.format == "gecko" and getattr(args, 'mode', 'wall') != "wall": parser.error( @@ -1067,6 +1086,7 @@ def _handle_attach(args): compression=getattr(args, 'compression', 'auto'), diff_baseline=args.diff_baseline ) + telemetry_plugins = _build_telemetry_plugins(args) with _get_child_monitor_context(args, args.pid): collector = sample( @@ -1081,6 +1101,7 @@ def _handle_attach(args): gc=args.gc, opcodes=args.opcodes, blocking=args.blocking, + telemetry_plugins=telemetry_plugins, ) _handle_output(collector, args, args.pid, mode) @@ -1146,6 +1167,7 @@ def _handle_run(args): compression=getattr(args, 'compression', 'auto'), diff_baseline=args.diff_baseline ) + telemetry_plugins = _build_telemetry_plugins(args) with _get_child_monitor_context(args, process.pid): try: @@ -1161,6 +1183,7 @@ def _handle_run(args): gc=args.gc, opcodes=args.opcodes, blocking=args.blocking, + telemetry_plugins=telemetry_plugins, ) _handle_output(collector, args, process.pid, mode) finally: @@ -1192,7 +1215,9 @@ def _handle_live_attach(args, pid): mode=mode, opcodes=args.opcodes, async_aware=args.async_mode if args.async_aware else None, + telemetry_plugin_ids=getattr(args, "plugins", []), ) + telemetry_plugins = _build_telemetry_plugins(args) # Sample in live mode sample_live( @@ -1207,6 +1232,7 @@ def _handle_live_attach(args, pid): gc=args.gc, opcodes=args.opcodes, blocking=args.blocking, + telemetry_plugins=telemetry_plugins, ) @@ -1239,7 +1265,9 @@ def _handle_live_run(args): mode=mode, opcodes=args.opcodes, async_aware=args.async_mode if args.async_aware else None, + telemetry_plugin_ids=getattr(args, "plugins", []), ) + telemetry_plugins = _build_telemetry_plugins(args) # Profile the subprocess in live mode try: @@ -1255,6 +1283,7 @@ def _handle_live_run(args): gc=args.gc, opcodes=args.opcodes, blocking=args.blocking, + telemetry_plugins=telemetry_plugins, ) finally: # Clean up the subprocess and get any error output @@ -1294,5 +1323,13 @@ def _handle_replay(args): sys.exit(f"Error: {exc}") +def _build_telemetry_plugins(args): + plugins = [] + for plugin_id in getattr(args, "plugins", []) or []: + config = resolve_helper_config(plugin_id) + plugins.append({"id": plugin_id, "config": config}) + return plugins or None + + if __name__ == "__main__": main() diff --git a/Lib/profiling/sampling/live_collector/__init__.py b/Lib/profiling/sampling/live_collector/__init__.py index 59d50955e52959..0703364fb34fa7 100644 --- a/Lib/profiling/sampling/live_collector/__init__.py +++ b/Lib/profiling/sampling/live_collector/__init__.py @@ -110,6 +110,7 @@ TableWidget, FooterWidget, HelpWidget, + TelemetryPanelWidget, ) from .constants import ( MICROSECONDS_PER_SECOND, @@ -137,6 +138,7 @@ MAX_EFFICIENCY_BAR_WIDTH, MIN_SAMPLE_RATE_FOR_SCALING, FINISHED_BANNER_EXTRA_LINES, + TELEMETRY_PANEL_HEIGHT, COLOR_PAIR_HEADER_BG, COLOR_PAIR_CYAN, COLOR_PAIR_YELLOW, @@ -162,6 +164,7 @@ "TableWidget", "FooterWidget", "HelpWidget", + "TelemetryPanelWidget", # Constants "MICROSECONDS_PER_SECOND", "DISPLAY_UPDATE_HZ", @@ -188,6 +191,7 @@ "MAX_EFFICIENCY_BAR_WIDTH", "MIN_SAMPLE_RATE_FOR_SCALING", "FINISHED_BANNER_EXTRA_LINES", + "TELEMETRY_PANEL_HEIGHT", "COLOR_PAIR_HEADER_BG", "COLOR_PAIR_CYAN", "COLOR_PAIR_YELLOW", diff --git a/Lib/profiling/sampling/live_collector/collector.py b/Lib/profiling/sampling/live_collector/collector.py index c03df4075277cd..77c705b6b158a3 100644 --- a/Lib/profiling/sampling/live_collector/collector.py +++ b/Lib/profiling/sampling/live_collector/collector.py @@ -43,10 +43,12 @@ COLOR_PAIR_MAGENTA, COLOR_PAIR_RED, COLOR_PAIR_SORTED_HEADER, + TELEMETRY_PANEL_HEIGHT, ) from .display import CursesDisplay -from .widgets import HeaderWidget, TableWidget, FooterWidget, HelpWidget, OpcodePanel +from .widgets import HeaderWidget, TableWidget, FooterWidget, HelpWidget, OpcodePanel, TelemetryPanelWidget from .trend_tracker import TrendTracker +from ..telemetry.plugin_registry import create_live_plugin @dataclass @@ -118,6 +120,7 @@ def __init__( mode=None, opcodes=False, async_aware=None, + telemetry_plugin_ids=None, ): """ Initialize the live stats collector. @@ -176,6 +179,13 @@ def __init__( # Opcode statistics: {location: {opcode: count}} self.opcode_stats = collections.defaultdict(lambda: collections.defaultdict(int)) self.show_opcodes = opcodes # Show opcode panel when --opcodes flag is passed + self.show_telemetry_panel = bool(telemetry_plugin_ids) + self.telemetry_plugins = {} # plugin_id -> plugin runtime + self.telemetry_plugin_order = [] + self.current_telemetry_plugin_idx = 0 + self.current_telemetry_mode_idx = 0 + for plugin_id in telemetry_plugin_ids or []: + self.set_plugin_enabled(plugin_id) self.selected_row = 0 # Currently selected row in table for opcode view self.scroll_offset = 0 # Scroll offset for table when in opcode mode @@ -206,6 +216,7 @@ def __init__( self.footer_widget = None self.help_widget = None self.opcode_panel = None + self.telemetry_panel = None # Color mode self._can_colorize = _colorize.can_colorize() @@ -431,12 +442,14 @@ def _prepare_display_data(self, height): extra_header_lines = ( FINISHED_BANNER_EXTRA_LINES if self.finished else 0 ) + extra_telemetry_lines = TELEMETRY_PANEL_HEIGHT if self.show_telemetry_panel else 0 max_stats_lines = max( 0, height - HEADER_LINES - extra_header_lines - FOOTER_LINES + - extra_telemetry_lines - SAFETY_MARGIN, ) stats_list = stats_list[:max_stats_lines] @@ -455,6 +468,7 @@ def _initialize_widgets(self, colors): self.footer_widget = FooterWidget(self.display, colors, self) self.help_widget = HelpWidget(self.display, colors) self.opcode_panel = OpcodePanel(self.display, colors, self) + self.telemetry_panel = TelemetryPanelWidget(self.display, colors, self) def _render_display_sections( self, height, width, elapsed, stats_list, colors @@ -480,6 +494,10 @@ def _render_display_sections( line = self.opcode_panel.render( line, width, height=height, stats_list=stats_list ) + if self.show_telemetry_panel: + line = self.telemetry_panel.render( + line, width, height=height + ) except curses.error: pass @@ -1003,6 +1021,12 @@ def _handle_input(self): # Toggle trend colors on/off if self._trend_tracker is not None: self._trend_tracker.toggle() + elif ch == ord("g") or ch == ord("G"): + self.show_telemetry_panel = not self.show_telemetry_panel + elif ch == ord("m") or ch == ord("M"): + self._cycle_telemetry_mode() + elif ch == ord("n") or ch == ord("N"): + self._cycle_telemetry_plugin() elif ch == ord("j") or ch == ord("J"): # Move selection down in opcode mode (with scrolling) @@ -1109,3 +1133,42 @@ def export(self, filename): "Export to file is not supported in live mode. " "Use the live TUI to view statistics in real-time." ) + + def set_plugin_enabled(self, plugin_id): + if not plugin_id or plugin_id in self.telemetry_plugins: + return + plugin = create_live_plugin(plugin_id) + if plugin is None: + return + self.telemetry_plugins[plugin_id] = plugin + self.telemetry_plugin_order.append(plugin_id) + + def collect_plugin_event(self, plugin_id, event_type, payload): + plugin = self.telemetry_plugins.get(plugin_id) + if plugin is None: + self.set_plugin_enabled(plugin_id) + plugin = self.telemetry_plugins.get(plugin_id) + if plugin is not None: + plugin.ingest(event_type, payload) + + def _get_current_telemetry_plugin(self): + if not self.telemetry_plugin_order: + return None, None + idx = self.current_telemetry_plugin_idx % len(self.telemetry_plugin_order) + plugin_id = self.telemetry_plugin_order[idx] + return plugin_id, self.telemetry_plugins.get(plugin_id) + + def _cycle_telemetry_mode(self): + _, plugin = self._get_current_telemetry_plugin() + if plugin is None: + return + modes = list(getattr(plugin, "panel_modes", ("default",))) + if not modes: + return + self.current_telemetry_mode_idx = (self.current_telemetry_mode_idx + 1) % len(modes) + + def _cycle_telemetry_plugin(self): + if not self.telemetry_plugin_order: + return + self.current_telemetry_plugin_idx = (self.current_telemetry_plugin_idx + 1) % len(self.telemetry_plugin_order) + self.current_telemetry_mode_idx = 0 diff --git a/Lib/profiling/sampling/live_collector/constants.py b/Lib/profiling/sampling/live_collector/constants.py index bb45006553a67b..8926d5908ab756 100644 --- a/Lib/profiling/sampling/live_collector/constants.py +++ b/Lib/profiling/sampling/live_collector/constants.py @@ -45,8 +45,9 @@ # Finished banner display FINISHED_BANNER_EXTRA_LINES = 3 # Blank line + banner + blank line -# Opcode panel display +# Panel display OPCODE_PANEL_HEIGHT = 12 # Height reserved for opcode statistics panel +TELEMETRY_PANEL_HEIGHT = 10 # Height reserved for telemetry plugin panel # Color pair IDs COLOR_PAIR_SAMPLES = 1 diff --git a/Lib/profiling/sampling/live_collector/widgets.py b/Lib/profiling/sampling/live_collector/widgets.py index 86d2649f875e62..05c67454770cca 100644 --- a/Lib/profiling/sampling/live_collector/widgets.py +++ b/Lib/profiling/sampling/live_collector/widgets.py @@ -21,6 +21,7 @@ FOOTER_LINES, FINISHED_BANNER_EXTRA_LINES, OPCODE_PANEL_HEIGHT, + TELEMETRY_PANEL_HEIGHT, ) from ..constants import ( THREAD_STATUS_HAS_GIL, @@ -917,7 +918,7 @@ def render(self, line, width, **kwargs): if self.collector.finished: footer = f"{status_str}" else: - footer = f"{status_str}Sort: {sort_display} | 't':mode 'x':trends ←→:thread 'h':help 'q':quit" + footer = f"{status_str}Sort: {sort_display} | 't':mode 'x':trends 'g':panel 'n':plugin 'm':mode ←→:thread 'h':help 'q':quit" self.add_str( line, 0, @@ -966,6 +967,9 @@ def render(self, line, width, **kwargs): (" S - Cycle through sort modes (backward)", A_NORMAL), (" t - Toggle view mode (ALL / per-thread)", A_NORMAL), (" x - Toggle trend colors (on/off)", A_NORMAL), + (" g - Toggle telemetry panel", A_NORMAL), + (" n - Cycle telemetry plugin", A_NORMAL), + (" m - Cycle telemetry plugin mode", A_NORMAL), (" j/k or ↑/↓ - Select next/previous function (--opcodes)", A_NORMAL), (" ← / → - Cycle through threads", A_NORMAL), (" + - Faster display refresh rate", A_NORMAL), @@ -1088,3 +1092,35 @@ def render(self, line, width, **kwargs): line += 1 return line + + +class TelemetryPanelWidget(Widget): + """Widget for displaying current telemetry plugin panel.""" + + def __init__(self, display, colors, collector): + super().__init__(display, colors) + self.collector = collector + + def render(self, line, width, **kwargs): + A_BOLD = self.display.get_attr("A_BOLD") + A_DIM = self.display.get_attr("A_DIM") + A_NORMAL = self.display.get_attr("A_NORMAL") + plugin_id, plugin = self.collector._get_current_telemetry_plugin() + if plugin is None: + self.add_str(line, 0, "No telemetry plugins active.", A_DIM) + return line + 1 + + modes = list(getattr(plugin, "panel_modes", ("default",))) + mode_idx = self.collector.current_telemetry_mode_idx % len(modes) + mode = modes[mode_idx] + title = f"─── Telemetry [{plugin_id}] ({mode.upper()}) " + title += "─" * max(0, width - len(title) - 1) + self.add_str(line, 0, title[: width - 1], self.colors.get("magenta", A_BOLD) | A_BOLD) + line += 1 + + for panel_line in plugin.render_lines(mode, width)[: TELEMETRY_PANEL_HEIGHT - 1]: + attr = A_DIM if not panel_line else (A_BOLD if str(panel_line).endswith(":") else A_NORMAL) + self.add_str(line, 0, str(panel_line)[: width - 1], attr) + line += 1 + + return line diff --git a/Lib/profiling/sampling/sample.py b/Lib/profiling/sampling/sample.py index 6a76bbeeb24ee3..7b4e0f9861c10a 100644 --- a/Lib/profiling/sampling/sample.py +++ b/Lib/profiling/sampling/sample.py @@ -13,6 +13,10 @@ from .heatmap_collector import HeatmapCollector from .gecko_collector import GeckoCollector from .binary_collector import BinaryCollector +try: + from .telemetry.manager import TelemetryHelperManager +except ImportError: + TelemetryHelperManager = None @contextlib.contextmanager @@ -48,7 +52,7 @@ def _pause_threads(unwinder, blocking): MIN_SAMPLES_FOR_TUI = 200 class SampleProfiler: - def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False, skip_non_matching_threads=True, collect_stats=False, blocking=False): + def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False, skip_non_matching_threads=True, collect_stats=False, blocking=False, telemetry_plugins=None): self.pid = pid self.sample_interval_usec = sample_interval_usec self.all_threads = all_threads @@ -63,6 +67,15 @@ def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MOD self.sample_intervals = deque(maxlen=100) self.total_samples = 0 self.realtime_stats = False + self.telemetry_managers = [] + if telemetry_plugins and TelemetryHelperManager is not None: + for plugin in telemetry_plugins: + plugin_id = plugin.get("id") + plugin_config = plugin.get("config", {}) + if plugin_id: + self.telemetry_managers.append( + TelemetryHelperManager(plugin_id, plugin_config) + ) def _new_unwinder(self, native, gc, opcodes, skip_non_matching_threads): kwargs = {} @@ -93,6 +106,11 @@ def sample(self, collector, duration_sec=None, *, async_aware=False): last_sample_time = start_time realtime_update_interval = 1.0 # Update every second last_realtime_update = start_time + for mgr in self.telemetry_managers: + mgr.start() + if hasattr(collector, "set_plugin_enabled"): + collector.set_plugin_enabled(mgr.plugin_id) + try: while duration_sec is None or running_time_sec < duration_sec: # Check if live collector wants to stop @@ -114,6 +132,9 @@ def sample(self, collector, duration_sec=None, *, async_aware=False): else: stack_frames = self.unwinder.get_stack_trace() collector.collect(stack_frames) + + if self.telemetry_managers: + self._poll_telemetry_records(collector) except ProcessLookupError as e: running_time_sec = current_time - start_time break @@ -151,6 +172,12 @@ def sample(self, collector, duration_sec=None, *, async_aware=False): interrupted = True running_time_sec = time.perf_counter() - start_time print("Interrupted by user.") + finally: + if self.telemetry_managers: + # Final drain before shutdown. + self._poll_telemetry_records(collector) + for mgr in self.telemetry_managers: + mgr.stop() # Clear real-time stats line if it was being displayed if self.realtime_stats and len(self.sample_intervals) > 0: @@ -187,6 +214,16 @@ def sample(self, collector, duration_sec=None, *, async_aware=False): f"({fmt((expected_samples - num_samples) / expected_samples * 100, 2)}%)" ) + def _poll_telemetry_records(self, collector): + for mgr in self.telemetry_managers: + records = mgr.poll() + for event in records: + plugin_id = event.get("plugin", mgr.plugin_id) + event_type = event.get("event_type") + payload = event.get("payload", {}) + if hasattr(collector, "collect_plugin_event"): + collector.collect_plugin_event(plugin_id, event_type, payload) + def _print_realtime_stats(self): """Print real-time sampling statistics.""" if len(self.sample_intervals) < 2: @@ -384,6 +421,7 @@ def sample( gc=True, opcodes=False, blocking=False, + telemetry_plugins=None, ): """Sample a process using the provided collector. @@ -427,6 +465,7 @@ def sample( skip_non_matching_threads=skip_non_matching_threads, collect_stats=realtime_stats, blocking=blocking, + telemetry_plugins=telemetry_plugins, ) profiler.realtime_stats = realtime_stats @@ -449,6 +488,7 @@ def sample_live( gc=True, opcodes=False, blocking=False, + telemetry_plugins=None, ): """Sample a process in live/interactive mode with curses TUI. @@ -496,6 +536,7 @@ def sample_live( skip_non_matching_threads=skip_non_matching_threads, collect_stats=realtime_stats, blocking=blocking, + telemetry_plugins=telemetry_plugins, ) profiler.realtime_stats = realtime_stats @@ -531,3 +572,5 @@ def curses_wrapper_func(stdscr): print(f"Only {collector.successful_samples} sample(s) collected (minimum {MIN_SAMPLES_FOR_TUI} required for TUI) - process {pid} exited too quickly.", file=sys.stderr) return collector + + diff --git a/Lib/profiling/sampling/telemetry/README.md b/Lib/profiling/sampling/telemetry/README.md new file mode 100644 index 00000000000000..8fa7ea06f738f3 --- /dev/null +++ b/Lib/profiling/sampling/telemetry/README.md @@ -0,0 +1,245 @@ +# Telemetry Plugin Architecture (`profiling.sampling`) + +This document describes the telemetry plugin system used by `profiling.sampling`, +including live rendering, helper-process collection, binary sidecar persistence, +and replay. + +It is intended for contributors adding or modifying telemetry plugins (for +example GPU, CPU, runtime counters, system metrics). + +--- + +## Goals + +- Keep core sampling (`SampleProfiler`) independent from vendor/provider logic. +- Collect telemetry out-of-process via helper subprocesses. +- Persist plugin events alongside binary profile output. +- Replay telemetry into collectors in a plugin-agnostic way. +- Support multiple plugins in a single profiling run. +- Allow adding new plugins by dropping modules into `telemetry/plugins/`. + +--- + +## High-Level Flow + +1. CLI enables one or more plugins via `--plugin`. +2. CLI builds telemetry plugin entries and resolves helper config. +3. `SampleProfiler` creates one `TelemetryHelperManager` per plugin. +4. Each manager launches `python -m profiling.sampling.telemetry --plugin ...`. +5. Helper emits JSON lines (`plugin`, `event_type`, `payload`) over stdout. +6. Profiler polls all managers and forwards events to collector via: + - `collect_plugin_event(plugin_id, event_type, payload)` +7. In binary mode, `BinaryCollector` writes telemetry events to + `.telemetrychunks`. +8. During replay, `replay_sidecar_to_sink()` reads sidecar records and routes + events back into the sink collector. +9. In live mode, `LiveStatsCollector` instantiates live plugin adapters and + renders plugin panels in the TUI. + +--- + +## Key Modules + +- `telemetry/plugin_registry.py` + - Dynamic plugin discovery from `telemetry/plugins/`. + - Dispatches to plugin module hooks for: + - live plugin creation + - helper config resolution + - helper execution + +- `telemetry/helper.py` + - Generic helper-process entrypoint. + - Parses args, invokes registry dispatch, and emits JSON events. + +- `telemetry/manager.py` + - One manager per plugin helper process. + - Starts subprocess, polls non-blocking stdout lines, stops process. + +- `telemetry/chunks.py` + - Sidecar writer/reader (`.telemetrychunks`). + - Line-delimited JSON records with magic header. + +- `telemetry/replay.py` + - Replays sidecar data into sinks that implement plugin event API. + +- `telemetry/interfaces.py` + - Defines `LiveTelemetryPlugin` interface for TUI rendering adapters. + +- `telemetry/plugins/` + - Plugin modules (auto-discovered). + - Current example: `nvidia_gpu.py` (scaffold). + +--- + +## Dynamic Plugin Discovery Contract + +`plugin_registry` scans `telemetry/plugins/` modules and registers those that +declare `PLUGIN_ID`. + +A plugin module should define: + +- `PLUGIN_ID: str` +- `create_live_plugin() -> LiveTelemetryPlugin | None` +- `resolve_helper_config(config: dict | None) -> dict` +- `run_helper(config: dict, emit: callable) -> None` + +Only `PLUGIN_ID` is required for discovery; missing hooks degrade gracefully: + +- Missing `create_live_plugin`: plugin not available in live panel. +- Missing `resolve_helper_config`: config passes through unchanged. +- Missing `run_helper`: helper emits metadata error note. + +--- + +## Event Format + +Helper emits line-delimited JSON objects: + +```json +{ + "plugin": "", + "event_type": "", + "payload": { "...": "plugin-defined" } +} +``` + +Notes: +- `event_type` semantics are plugin-specific. +- `payload` schema is plugin-defined and versioned by plugin behavior. + +--- + +## Multi-Plugin Behavior + +Multi-plugin runs are supported end-to-end: + +- One helper subprocess per plugin. +- One shared sidecar writer that multiplexes all plugin records. +- Records are demultiplexed by `plugin` field during replay. +- Live UI keeps plugin runtimes in a map and can cycle plugin panels (`n` key). + +This is expected and intentional; a single sidecar stream stores all plugins. + +--- + +## CLI Integration + +- Enable plugins with repeatable `--plugin`: + - `--plugin nvidia_gpu` + - `--plugin nvidia_gpu --plugin another_plugin` + +- CLI does not hardcode provider details. It asks the registry to resolve helper + config for each plugin via `resolve_helper_config(plugin_id)`. + +### Plugin-specific CLI subflags + +Plugins can define their own flags for plugin-specific controls, such as: + +- `--nvidia-device` +- `--nvidia-pm` / `--no-nvidia-pm` +- `--nvidia-pc` / `--no-nvidia-pc` + +Recommended ownership model: + +- Plugin module defines: + - `register_cli_flags(parser)` to add plugin-specific args. + - `extract_cli_config(args)` to convert parsed args to plugin config overrides. +- CLI remains generic: + - discovers plugins + - asks enabled plugins to register flags + - merges defaults from `resolve_helper_config(plugin_id)` with plugin overrides + +Status: this hook pattern is documented and recommended, but not yet implemented +in the current CLI wiring. + +Current validation: +- In non-live mode, telemetry plugins require `--binary` output. + +--- + +## Live UI Integration + +`LiveStatsCollector` handles generic plugin rendering: + +- `set_plugin_enabled(plugin_id)` creates plugin runtime from registry. +- `collect_plugin_event(...)` routes incoming events to plugin `ingest(...)`. +- `TelemetryPanelWidget` renders current plugin/mode lines. +- Panel controls: + - `g`: toggle telemetry panel + - `n`: cycle plugin + - `m`: cycle plugin mode + +--- + +## Binary Persistence and Replay + +### Sidecar path + +For `profile.bin`, sidecar is: + +- `profile.bin.telemetrychunks` + +### Sidecar record + +Each line after magic header `TCHUNK1` is a compact JSON record with: +- `plugin` +- `event_type` +- `payload` + +### Replay + +`replay_sidecar_to_sink(binary_filename, sink)`: +- Reads sidecar records. +- Calls `sink.set_plugin_enabled(plugin_id)` once per seen plugin (if present). +- Calls `sink.collect_plugin_event(plugin_id, event_type, payload)` for each event. + +--- + +## Adding a New Plugin + +1. Create module in `telemetry/plugins/`, e.g. `my_plugin.py`. +2. Add: + - `PLUGIN_ID = "my_plugin"` + - `create_live_plugin()` + - `resolve_helper_config(config)` + - `run_helper(config, emit)` + - optional: `register_cli_flags(parser)` + - optional: `extract_cli_config(args)` +3. Implement a live adapter class extending `LiveTelemetryPlugin`: + - `ingest(event_type, payload)` + - `render_lines(mode, width)` +4. Enable from CLI: + - `--plugin my_plugin` + +No `plugin_registry.py` edits are required when contract is followed. + +--- + +## Current NVIDIA Plugin Status + +`telemetry/plugins/nvidia_gpu.py` is currently a scaffold: + +- Emits synthetic PM/PC events. +- Includes helper config defaults (`provider/device/pm/pc`). +- Live panel supports `pc` and `pm` modes. + +It is designed as a placeholder for real NVIDIA integration (for example CUPTI / +NVPerf / Nsight-backed collection). + +--- + +## Known Constraints and Follow-Ups + +- Helper protocol is JSON lines over stdout (simple, portable, not binary-fast). +- Sidecar payloads are untyped at framework level (plugin-owned schema). +- No plugin capability negotiation yet (beyond graceful missing-hook handling). +- Unknown plugin in helper emits metadata note and heartbeat loop continues. +- Non-live plugin capture currently expects binary format. + +Potential future improvements: +- Plugin metadata schema/versioning. +- Optional plugin-specific CLI overrides. +- Plugin capability introspection for UI and validation. +- Robust helper health/failure reporting in collectors/UI. + +1q \ No newline at end of file diff --git a/Lib/profiling/sampling/telemetry/__init__.py b/Lib/profiling/sampling/telemetry/__init__.py new file mode 100644 index 00000000000000..ea0c095818e27b --- /dev/null +++ b/Lib/profiling/sampling/telemetry/__init__.py @@ -0,0 +1,18 @@ +"""Generic telemetry plugin framework for profiling.sampling.""" + +from .chunks import TelemetryChunkReader, TelemetryChunkWriter, sidecar_path_for_binary +from .manager import TelemetryHelperManager +from .plugin_registry import create_live_plugin, resolve_helper_config +from .interfaces import LiveTelemetryPlugin +from .replay import replay_sidecar_to_sink + +__all__ = ( + "TelemetryChunkReader", + "TelemetryChunkWriter", + "TelemetryHelperManager", + "sidecar_path_for_binary", + "LiveTelemetryPlugin", + "create_live_plugin", + "resolve_helper_config", + "replay_sidecar_to_sink", +) diff --git a/Lib/profiling/sampling/telemetry/__main__.py b/Lib/profiling/sampling/telemetry/__main__.py new file mode 100644 index 00000000000000..db120e4fbce259 --- /dev/null +++ b/Lib/profiling/sampling/telemetry/__main__.py @@ -0,0 +1,4 @@ +from .helper import main + +if __name__ == "__main__": + main() diff --git a/Lib/profiling/sampling/telemetry/chunks.py b/Lib/profiling/sampling/telemetry/chunks.py new file mode 100644 index 00000000000000..67c8578ecdb0b4 --- /dev/null +++ b/Lib/profiling/sampling/telemetry/chunks.py @@ -0,0 +1,56 @@ +"""Plugin telemetry chunk sidecar read/write helpers.""" + +from __future__ import annotations + +import json +import os + +_MAGIC = "TCHUNK1" + + +def sidecar_path_for_binary(binary_filename): + return f"{binary_filename}.telemetrychunks" + + +class TelemetryChunkWriter: + def __init__(self, binary_filename): + self.sidecar_path = sidecar_path_for_binary(binary_filename) + self._fp = open(self.sidecar_path, "w", encoding="utf-8") + self._fp.write(_MAGIC + "\n") + + def write_record(self, plugin_id, event_type, payload): + record = { + "plugin": plugin_id, + "event_type": event_type, + "payload": payload, + } + self._fp.write(json.dumps(record, separators=(",", ":")) + "\n") + + def close(self): + if self._fp is not None: + self._fp.close() + self._fp = None + + +class TelemetryChunkReader: + def __init__(self, binary_filename): + self.sidecar_path = sidecar_path_for_binary(binary_filename) + + def exists(self): + return os.path.exists(self.sidecar_path) + + def iter_records(self): + if not self.exists(): + return + with open(self.sidecar_path, "r", encoding="utf-8") as fp: + first = fp.readline().rstrip("\n") + if first != _MAGIC: + return + for line in fp: + line = line.strip() + if not line: + continue + try: + yield json.loads(line) + except json.JSONDecodeError: + continue diff --git a/Lib/profiling/sampling/telemetry/helper.py b/Lib/profiling/sampling/telemetry/helper.py new file mode 100644 index 00000000000000..cacf4d10056cda --- /dev/null +++ b/Lib/profiling/sampling/telemetry/helper.py @@ -0,0 +1,45 @@ +"""Generic helper-process telemetry entrypoint.""" + +from __future__ import annotations + +import argparse +import json +import time + +from .plugin_registry import run_helper_plugin + + +def _emit(plugin_id, event_type, payload): + print( + json.dumps( + { + "plugin": plugin_id, + "event_type": event_type, + "payload": payload, + }, + separators=(",", ":"), + ), + flush=True, + ) + + +def main(): + parser = argparse.ArgumentParser(description="Telemetry helper process") + parser.add_argument("--plugin", required=True) + parser.add_argument("--config-json", default="{}") + args = parser.parse_args() + + try: + config = json.loads(args.config_json) + except json.JSONDecodeError: + config = {} + + run_helper_plugin(args.plugin, config, _emit) + # Unknown plugins return immediately; keep helper alive with heartbeats. + while True: + _emit(args.plugin, "heartbeat", {"timestamp_us": int(time.monotonic() * 1_000_000)}) + time.sleep(1.0) + + +if __name__ == "__main__": + main() diff --git a/Lib/profiling/sampling/telemetry/interfaces.py b/Lib/profiling/sampling/telemetry/interfaces.py new file mode 100644 index 00000000000000..d8a0ea2e70294e --- /dev/null +++ b/Lib/profiling/sampling/telemetry/interfaces.py @@ -0,0 +1,16 @@ +"""Telemetry plugin interfaces.""" + +from __future__ import annotations + + +class LiveTelemetryPlugin: + """Base interface for telemetry plugins that render in live view.""" + + plugin_id = "base" + panel_modes = ("default",) + + def ingest(self, event_type, payload): + raise NotImplementedError + + def render_lines(self, mode, width): + raise NotImplementedError diff --git a/Lib/profiling/sampling/telemetry/manager.py b/Lib/profiling/sampling/telemetry/manager.py new file mode 100644 index 00000000000000..2df0679b3a49e2 --- /dev/null +++ b/Lib/profiling/sampling/telemetry/manager.py @@ -0,0 +1,71 @@ +"""Manage helper-process telemetry plugins.""" + +from __future__ import annotations + +import json +import selectors +import subprocess +import sys + + +class TelemetryHelperManager: + """Single-plugin helper process manager.""" + + def __init__(self, plugin_id, plugin_config=None): + self.plugin_id = plugin_id + self.plugin_config = plugin_config or {} + self.process = None + self.selector = selectors.DefaultSelector() + self._started = False + + def start(self): + if self._started: + return + args = [ + sys.executable, + "-m", + "profiling.sampling.telemetry", + "--plugin", + self.plugin_id, + "--config-json", + json.dumps(self.plugin_config, separators=(",", ":")), + ] + self.process = subprocess.Popen( + args, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + text=True, + bufsize=1, + ) + if self.process.stdout is not None: + self.selector.register(self.process.stdout, selectors.EVENT_READ) + self._started = True + + def poll(self): + if not self._started or self.process is None: + return [] + events = [] + for key, _ in self.selector.select(timeout=0): + line = key.fileobj.readline() + if not line: + continue + line = line.strip() + if not line: + continue + try: + events.append(json.loads(line)) + except json.JSONDecodeError: + continue + return events + + def stop(self): + if self.process is not None: + if self.process.poll() is None: + self.process.terminate() + try: + self.process.wait(timeout=1.0) + except subprocess.TimeoutExpired: + self.process.kill() + self.process = None + self._started = False diff --git a/Lib/profiling/sampling/telemetry/plugin_registry.py b/Lib/profiling/sampling/telemetry/plugin_registry.py new file mode 100644 index 00000000000000..049e2cc8cb02a4 --- /dev/null +++ b/Lib/profiling/sampling/telemetry/plugin_registry.py @@ -0,0 +1,67 @@ +"""Telemetry plugin registry and dispatch.""" + +from __future__ import annotations + +import importlib +import pkgutil + +from . import plugins as plugins_pkg + + +_PLUGIN_MODULES = None + + +def _discover_plugin_modules(): + modules = {} + for module_info in pkgutil.iter_modules(plugins_pkg.__path__): + name = module_info.name + if name.startswith("_"): + continue + module = importlib.import_module(f"{plugins_pkg.__name__}.{name}") + plugin_id = getattr(module, "PLUGIN_ID", None) + if plugin_id: + modules[plugin_id] = module + return modules + + +def _get_plugin_modules(): + global _PLUGIN_MODULES + if _PLUGIN_MODULES is None: + _PLUGIN_MODULES = _discover_plugin_modules() + return _PLUGIN_MODULES + + +def _get_plugin_module(plugin_id): + return _get_plugin_modules().get(plugin_id) + + +def create_live_plugin(plugin_id): + module = _get_plugin_module(plugin_id) + if module is None: + return None + factory = getattr(module, "create_live_plugin", None) + if callable(factory): + return factory() + return None + + +def resolve_helper_config(plugin_id, config=None): + module = _get_plugin_module(plugin_id) + if module is None: + return dict(config or {}) + resolver = getattr(module, "resolve_helper_config", None) + if callable(resolver): + return resolver(config) + return dict(config or {}) + + +def run_helper_plugin(plugin_id, config, emit): + module = _get_plugin_module(plugin_id) + if module is None: + emit(plugin_id, "metadata", {"note": "unknown plugin"}) + return + runner = getattr(module, "run_helper", None) + if callable(runner): + runner(config, emit) + return + emit(plugin_id, "metadata", {"note": "plugin missing run_helper"}) diff --git a/Lib/profiling/sampling/telemetry/plugins/__init__.py b/Lib/profiling/sampling/telemetry/plugins/__init__.py new file mode 100644 index 00000000000000..bf20821f035eaa --- /dev/null +++ b/Lib/profiling/sampling/telemetry/plugins/__init__.py @@ -0,0 +1 @@ +"""Telemetry plugin implementations package.""" diff --git a/Lib/profiling/sampling/telemetry/plugins/nvidia_gpu.py b/Lib/profiling/sampling/telemetry/plugins/nvidia_gpu.py new file mode 100644 index 00000000000000..26b38757811d9c --- /dev/null +++ b/Lib/profiling/sampling/telemetry/plugins/nvidia_gpu.py @@ -0,0 +1,134 @@ +"""NVIDIA GPU telemetry plugin scaffold.""" + +from __future__ import annotations + +import time + +from ..interfaces import LiveTelemetryPlugin +from .nvidia_gpu_aggregator import NvidiaGpuAggregator + +PLUGIN_ID = "nvidia_gpu" + + +class NvidiaGpuLivePlugin(LiveTelemetryPlugin): + plugin_id = "nvidia_gpu" + panel_modes = ("pc", "pm") + + def __init__(self): + self.agg = NvidiaGpuAggregator() + + def ingest(self, event_type, payload): + if event_type == "metadata": + self.agg.set_metadata(payload) + elif event_type == "pm": + self.agg.add_pm(payload) + elif event_type == "pc": + self.agg.add_pc(payload) + + def render_lines(self, mode, width): + s = self.agg.snapshot() + lines = [] + if mode == "pm": + lines.append("Metric avg min max") + pm = s.get("pm_summary", {}) + if not pm: + lines.append("No PM metrics yet.") + else: + for name, st in sorted(pm.items())[:6]: + lines.append( + f"{name:<30} {st['avg']:>7.1f} {st['min']:>7.1f} {st['max']:>7.1f}" + ) + return lines + + lines.append("Top kernels:") + kernels = s.get("top_kernels", []) + if not kernels: + lines.append(" No PC samples yet.") + else: + for name, count in kernels[:3]: + lines.append(f" {name:<28} {count:>8}") + lines.append("Top stall reasons:") + stalls = s.get("top_stalls", []) + if not stalls: + lines.append(" No stall reason data yet.") + else: + for name, count in stalls[:3]: + lines.append(f" {name:<28} {count:>8}") + return lines + + +def resolve_nvidia_gpu_helper_config(config=None): + resolved = { + "provider": "nvidia", + "device": "0", + "pm": True, + "pc": True, + } + if config: + resolved.update(dict(config)) + return resolved + + +def run_nvidia_gpu_helper(config, emit): + """Helper-process loop for NVIDIA GPU plugin. + + This is still a scaffold implementation that emits synthetic PM/PC events. + """ + pm = bool(config.get("pm", False)) + pc = bool(config.get("pc", False)) + device = str(config.get("device", "0")) + plugin_id = "nvidia_gpu" + + emit( + plugin_id, + "metadata", + { + "provider": "nvidia", + "device": device, + "pm_enabled": pm, + "pc_enabled": pc, + "note": "helper-process scaffold", + }, + ) + + tick = 0 + while True: + now_us = int(time.monotonic() * 1_000_000) + emit(plugin_id, "heartbeat", {"timestamp_us": now_us}) + if pm: + emit( + plugin_id, + "pm", + { + "timestamp_us": now_us, + "metrics": { + "sm_throughput_pct": 45 + (tick % 25), + "dram_throughput_pct": 35 + (tick % 20), + }, + }, + ) + if pc: + emit( + plugin_id, + "pc", + { + "timestamp_us": now_us, + "kernel_name": "matmul_kernel", + "stall_reason": "Long Scoreboard", + "sample_count": 100 + (tick % 40), + }, + ) + tick += 1 + time.sleep(0.25) + + +def create_live_plugin(): + return NvidiaGpuLivePlugin() + + +def resolve_helper_config(config=None): + return resolve_nvidia_gpu_helper_config(config) + + +def run_helper(config, emit): + run_nvidia_gpu_helper(config, emit) diff --git a/Lib/profiling/sampling/telemetry/plugins/nvidia_gpu_aggregator.py b/Lib/profiling/sampling/telemetry/plugins/nvidia_gpu_aggregator.py new file mode 100644 index 00000000000000..b1473d458a3c75 --- /dev/null +++ b/Lib/profiling/sampling/telemetry/plugins/nvidia_gpu_aggregator.py @@ -0,0 +1,63 @@ +"""NVIDIA GPU aggregation utilities for telemetry plugin.""" + +from __future__ import annotations + +from collections import Counter, defaultdict, deque + + +class NvidiaGpuAggregator: + """In-memory aggregator for NVIDIA PM/PC telemetry.""" + + def __init__(self, *, window_size=1024): + self.window_size = window_size + self.pm_samples = deque(maxlen=window_size) + self.pc_samples = deque(maxlen=window_size) + self.metadata = {} + + self._pm_values = defaultdict(list) + self._kernel_counter = Counter() + self._stall_counter = Counter() + + def set_metadata(self, metadata): + if metadata: + self.metadata.update(metadata) + + def add_pm(self, sample): + self.pm_samples.append(sample) + metrics = sample.get("metrics", {}) + for name, value in metrics.items(): + try: + v = float(value) + except (TypeError, ValueError): + continue + self._pm_values[name].append(v) + + def add_pc(self, sample): + self.pc_samples.append(sample) + count = int(sample.get("sample_count", 1) or 1) + kernel = sample.get("kernel_name") + stall = sample.get("stall_reason") + if kernel: + self._kernel_counter[kernel] += count + if stall: + self._stall_counter[stall] += count + + def snapshot(self): + pm_summary = {} + for name, values in self._pm_values.items(): + if not values: + continue + pm_summary[name] = { + "avg": sum(values) / len(values), + "min": min(values), + "max": max(values), + } + + return { + "metadata": dict(self.metadata), + "pm_count": len(self.pm_samples), + "pc_count": len(self.pc_samples), + "pm_summary": pm_summary, + "top_kernels": self._kernel_counter.most_common(6), + "top_stalls": self._stall_counter.most_common(6), + } diff --git a/Lib/profiling/sampling/telemetry/replay.py b/Lib/profiling/sampling/telemetry/replay.py new file mode 100644 index 00000000000000..4c84747bd6b1d9 --- /dev/null +++ b/Lib/profiling/sampling/telemetry/replay.py @@ -0,0 +1,31 @@ +"""Telemetry replay routing for sidecar chunks.""" + +from __future__ import annotations + +from .chunks import TelemetryChunkReader + + +def replay_sidecar_to_sink(binary_filename, sink): + """Replay telemetry sidecar records into a sink. + + Supported sink contract: + - `collect_plugin_event(plugin_id, event_type, payload)` + """ + reader = TelemetryChunkReader(binary_filename) + if not reader.exists(): + return + + seen_plugins = set() + for record in reader.iter_records(): + plugin_id = record.get("plugin") + event_type = record.get("event_type") + payload = record.get("payload", {}) + if not plugin_id or not event_type: + continue + + if plugin_id not in seen_plugins and hasattr(sink, "set_plugin_enabled"): + sink.set_plugin_enabled(plugin_id) + seen_plugins.add(plugin_id) + + if hasattr(sink, "collect_plugin_event"): + sink.collect_plugin_event(plugin_id, event_type, payload)