RunScanner#

class stable_pretraining.web.scan.RunScanner(root: Path, poll_interval: float = 1.0)[source]#

Bases: object

Polls a directory tree for sidecar+metrics changes and fans out events.

log_content(run_id: str, stream_id: str, max_bytes: int = 4194304) bytes | None[source]#

Read the (last max_bytes of the) log identified by stream_id.

Returns None if the run / stream is unknown. Truncates from the front so the most recent output is preserved when the file exceeds the cap.

logs_index(run_id: str) dict | None[source]#

Discover .out / .err / .log files for a run.

Search order:

  1. Anything inside {run_dir}/ matching *.out / *.err.

  2. Files in hp.output_dir (Hydra often points training logs here).

  3. submitit layout (common for spt + slurm):

    {output_dir}/../{sweep_id}_{task_id}/.submitit/{sweep_id}_{task_id}_{rank}_log.{out,err}

    sweep_id comes from the sweep:N tag and task_id from hp.slurm.task_id (or the trailing _<task> part of run_id). Multiple ranks are returned individually so DDP runs get a per-rank selector.

Returns a dict {"streams": [{name, kind, rank, size, stream_id}, ...]} or None if the run is unknown.

media_file_path(run_id: str, rel_path: str) Path | None[source]#

Resolve a media file path safely, with .. traversal blocked.

The resolved file must live under {run_dir}/media/ and must actually exist as a file. Returns None otherwise — the caller should respond with 404.

media_json(run_id: str) dict | None[source]#

Return the media events for a run by parsing media.jsonl.

Returns None if the run is unknown. Returns {"events": []} if there is no media yet (empty/missing file). Each event has at least step, tag, type, path; videos may also have fps and format.

metrics_json_bytes(run_id: str) bytes | None[source]#

Return the metrics response as pre-serialised, NaN-safe JSON bytes.

First call materialises the dict (reusing the structured cache via metrics_json()) and serialises once; subsequent calls return the cached bytes directly so the HTTP layer is reduced to a memcpy.

metrics_stream(run_id: str)[source]#

Yield metrics for run_id in chunks as the CSV is parsed.

Each yielded value is a dict with shape:

{"chunk": int, "metrics": {<name>: {"step":[...], "epoch":[...], "y":[...]}}}

and the final yielded value is {"done": true}. If the metrics are already cached we emit them as a single chunk followed by done — callers don’t need a separate fast-path. If the run is unknown we yield None (HTTP layer turns this into a 404).

While streaming we accumulate the parsed structure in memory and publish it to self._metrics_cache on completion, so successive reads of the same run hit the warm cache.

progress_json() dict[source]#

Snapshot of the current scan progress (used by the UI loading bar).

start() None[source]#

Start the background scanner thread.

Returns immediately. The first scan runs in the background so that callers (e.g. the HTTP server) become responsive before a potentially slow NFS walk over thousands of sidecars completes. Clients learn about discovered runs via the SSE /api/stream channel.