Skip to content

Observatory Pro -- v2.5.8

Available since v1.3.0, current in v2.5.8 -- Six major upgrades that turn the Training Observatory from a passive log dump into an active diagnostic engine.


What's New

Feature Problem Solved Impact
Adaptive Logging EXPERT logs everything → 77k entries 80-90% log reduction
Health Warnings "All clear" despite 58% dead neurons Catches problems automatically
Epoch Summaries No statistical view per epoch Mean/std/min/max per metric
Tiered Storage One flat JSON file 3 focused files: basic/health/debug
Visual Dashboard Raw JSON only Interactive HTML charts
Training Fingerprint Can't reproduce runs Full environment capture

1. Smart / Adaptive Logging

The AdaptiveLogger wraps a standard TrainingLogger and only escalates to EXPERT detail when something looks suspicious. In normal operation it stays at BASIC level, reducing log size by 80-90%.

Anomaly Triggers

Trigger Default Threshold What Happens
Dead neurons (zeros %) 50% Escalate + emit warning
Gradient spike 5× rolling average Escalate + emit warning
Vanishing gradient L2 < 1e-7 Escalate + emit danger
Exploding gradient L2 > 100 Escalate + emit danger
Loss spike +50% between batches Escalate + emit warning
NaN / Inf anywhere Any Escalate + emit critical
Weight stagnation Δ < 1e-6 for 5 batches Escalate + emit warning
Activation saturation > 40% Escalate + emit warning

Usage

from neurogebra.logging.adaptive import AdaptiveLogger, AnomalyConfig
from neurogebra.logging.logger import TrainingLogger, LogLevel

# Create a base logger at EXPERT level
base_logger = TrainingLogger(level=LogLevel.EXPERT)

# Wrap it in the adaptive logger
adaptive = AdaptiveLogger(base_logger, config=AnomalyConfig(
    zeros_pct_threshold=50.0,      # trigger on >50% dead neurons
    gradient_spike_factor=5.0,     # trigger on 5× gradient spike
    escalation_cooldown=10,        # stay escalated for 10 events
))

# Use adaptive as a drop-in replacement
adaptive.on_train_start(total_epochs=20)
adaptive.on_epoch_start(0)

# This won't produce EXPERT events (normal data):
adaptive.on_layer_forward(0, "dense_0", output_data=normal_activations)

# This WILL produce EXPERT events (all zeros → dead neurons):
adaptive.on_layer_forward(0, "dense_0", output_data=dead_activations)

# Check what anomalies were detected
print(adaptive.get_anomaly_summary())

Customising Thresholds

config = AnomalyConfig(
    zeros_pct_threshold=30.0,          # more sensitive dead neuron detection
    gradient_spike_factor=3.0,         # more sensitive spike detection
    loss_spike_pct=30.0,               # trigger on 30% loss increase
    weight_stagnation_window=10,       # look at 10 consecutive updates
    escalation_cooldown=20,            # stay in detail mode longer
)
adaptive = AdaptiveLogger(base_logger, config=config)

2. Automated Health Warnings

The AutoHealthWarnings engine runs threshold-based rules on every batch and epoch, emitting structured HealthWarning objects with human-readable diagnoses and actionable advice.

Rules

Rule Condition Severity Message
dead_relu zeros_pct > 50% warning "Possible dying ReLU in dense_0"
gradient_spike norm > 5× rolling avg warning "Possible exploding gradient"
vanishing_gradient norm < 1e-7 danger "Vanishing gradient in dense_0"
exploding_gradient norm > 100 danger "Exploding gradient in dense_0"
overfitting val_loss / train_loss > 1.3 warning "Possible overfitting"
loss_stagnation Δloss < 1e-4 for N epochs warning "Loss stagnant"
weight_stagnation Δweight < 1e-6 for N batches warning "Optimizer may have stagnated"
nan_inf_loss NaN or Inf in loss critical "NaN/Inf detected in loss!"
loss_divergence loss ×3 over N batches danger "Loss diverging"
activation_saturation saturation > 40% warning "Activations saturated"

Usage

from neurogebra.logging.health_warnings import AutoHealthWarnings, WarningConfig

warnings_engine = AutoHealthWarnings(config=WarningConfig(
    dead_relu_zeros_pct=50.0,
    overfit_patience=3,
    overfit_ratio=1.3,
))

# Call during training
for epoch in range(epochs):
    for batch_idx, (X_batch, y_batch) in enumerate(batches):
        # ... forward/backward ...

        # Check batch-level health
        batch_alerts = warnings_engine.check_batch(
            epoch=epoch,
            batch=batch_idx,
            loss=current_loss,
            gradient_norms={"dense_0": 0.05, "dense_1": 0.03},
            activation_stats={"dense_0": {"zeros_pct": 62.0, "activation_type": "relu"}},
        )
        for alert in batch_alerts:
            print(f"  ⚠️ [{alert.severity}] {alert.message}")

    # Check epoch-level health
    epoch_alerts = warnings_engine.check_epoch(
        epoch=epoch,
        train_loss=train_loss,
        val_loss=val_loss,
    )

# Get summary
print(warnings_engine.get_summary())

Each HealthWarning contains:

HealthWarning(
    rule_name="dead_relu",
    severity="warning",
    message="Possible dying ReLU in 'dense_0' (62.0% zeros)",
    diagnosis="Neurons producing zero outputs will receive zero gradients and never recover.",
    recommendations=[
        "Use LeakyReLU(negative_slope=0.01) instead of ReLU",
        "Lower the learning rate",
        "Use He initialisation",
    ],
    layer_name="dense_0",
    epoch=5, batch=10,
)

3. Log Summarization Per Epoch

The EpochSummarizer aggregates batch-level statistics and produces mean, std, min, max across all batches in each epoch.

Usage

from neurogebra.logging.epoch_summary import EpochSummarizer

summarizer = EpochSummarizer()

for epoch in range(epochs):
    for batch_idx in range(num_batches):
        summarizer.record_batch(
            epoch=epoch,
            metrics={"loss": batch_loss, "accuracy": batch_acc},
            gradient_norms={"dense_0": grad_norm_0, "dense_1": grad_norm_1},
        )

    summary = summarizer.finalize_epoch(epoch)
    print(summary.format_text())

Output

══ Epoch 5 Summary (32 batches) ══
  Metrics:
    loss                  mean=0.342100  std=0.015200  min=0.310000  max=0.380000
    accuracy              mean=0.891200  std=0.008500  min=0.870000  max=0.910000
  Gradient Norms:
    dense_0               mean=5.23e-02  std=1.12e-02  min=3.10e-02  max=8.40e-02
    dense_1               mean=2.10e-02  std=5.30e-03  min=1.20e-02  max=3.50e-02

Programmatic Access

# Get structured data
d = summary.to_dict()
print(d["metrics"]["loss"]["mean"])   # 0.3421
print(d["metrics"]["loss"]["std"])    # 0.0152

# Get all epoch summaries
all_summaries = summarizer.get_all_summaries()

4. Tiered Storage / Streaming

Instead of one massive JSON file, TieredStorage writes three separate NDJSON (newline-delimited JSON) files:

File Contains When Written
basic.log Epoch metrics, train start/end Every epoch
health.log Warnings, anomalies, health checks On each alert (immediate)
debug.log Full EXPERT-level detail Only when needed

Usage

from neurogebra.logging.tiered_storage import TieredStorage
from neurogebra.logging.logger import TrainingLogger, LogLevel

storage = TieredStorage(
    base_dir="./training_logs",
    write_debug=True,       # set False in production to save I/O
    buffer_size=50,          # flush every 50 events
)

logger = TrainingLogger(level=LogLevel.EXPERT)
logger.add_backend(storage)

# ... train as normal ...

storage.flush()    # final flush
storage.close()    # cleanup

# Check what was written
print(storage.summary())
# {'basic': {'events': 42, 'size_bytes': 8192},
#  'health': {'events': 3, 'size_bytes': 1024},
#  'debug': {'events': 12500, 'size_bytes': 2097152},
#  'total_events': 12545}

Reading Logs

# Easy to grep through specific tiers
basic_events = storage.read_basic()
health_events = storage.read_health()

# Or from command line:
# grep "overfitting" training_logs/health.log
# grep "dense_0" training_logs/debug.log

NDJSON Format

Each line is a self-contained JSON object — easy to stream, grep, and parse:

{"event_type":"epoch_end","level":"BASIC","timestamp":1740000000.0,"epoch":0,"severity":"info","message":"Epoch 1 done","data":{"metrics":{"loss":0.85,"accuracy":0.72}}}
{"event_type":"epoch_end","level":"BASIC","timestamp":1740000001.5,"epoch":1,"severity":"info","message":"Epoch 2 done","data":{"metrics":{"loss":0.63,"accuracy":0.81}}}

5. Visual Dashboard

The DashboardExporter generates a self-contained interactive HTML dashboard with Chart.js charts.

Charts Included

  • 📉 Loss curves (train + validation)
  • 📈 Accuracy curves (train + validation)
  • ⏱️ Epoch timing bar chart
  • 📊 Raw batch-level loss curve
  • 🩺 Health diagnostics timeline

Usage

from neurogebra.logging.dashboard import DashboardExporter
from neurogebra.logging.logger import TrainingLogger, LogLevel

dashboard = DashboardExporter(path="training_logs/dashboard.html")
logger = TrainingLogger(level=LogLevel.EXPERT)
logger.add_backend(dashboard)

# ... train as normal ...

dashboard.save()  # generates the interactive HTML file
# Open training_logs/dashboard.html in any browser

TensorBoard Integration

from neurogebra.logging.dashboard import TensorBoardBridge

tb = TensorBoardBridge(log_dir="./tb_logs")
if tb.available:
    logger.add_backend(tb)
    # ... after training ...
    tb.close()
    # Then: tensorboard --logdir=./tb_logs

Weights & Biases Integration

from neurogebra.logging.dashboard import WandBBridge

wandb_bridge = WandBBridge(
    project="my_experiment",
    run_name="experiment_001",
    config={"lr": 0.01, "epochs": 50},
)
if wandb_bridge.available:
    logger.add_backend(wandb_bridge)
    # ... after training ...
    wandb_bridge.close()

6. Training Fingerprint / Reproducibility Block

The TrainingFingerprint captures everything needed to reproduce a training run:

What It Captures

Category Fields
Seeds random_seed, numpy_seed
Dataset SHA-256 hash, shape, dtype, sample count
Versions Neurogebra, Python, NumPy, SciPy, SymPy, Rich
Hardware CPU model, core count, RAM, GPU (if available)
OS System, release, machine architecture
Model Architecture hash, full model info dict
Hyperparameters All training hyperparameters
Git Commit hash, branch name, dirty status

Usage

from neurogebra.logging.fingerprint import TrainingFingerprint
import numpy as np

fingerprint = TrainingFingerprint.capture(
    model_info={"name": "my_model", "layers": [...]},
    hyperparameters={"lr": 0.01, "batch_size": 32, "epochs": 50},
    dataset=X_train,        # auto-hashed
    random_seed=42,
)

# Pretty-print
print(fingerprint.format_text())

Output

╔══ Training Fingerprint ══╗
  Run ID:       a1b2c3d4e5f6
  Timestamp:    2026-02-27 14:30:00
  Seed:         42
  Dataset Hash: 8f14e45fceea167a
  Dataset:      (10000, 784) (float64)
  Neurogebra:   1.3.0
  Python:       3.11.5
  NumPy:        1.26.0
  CPU:          AMD64 Family (8 cores)
  RAM:          16.0 GB
  GPU:          NVIDIA GeForce RTX 3060
  OS:           Windows 10
  Git:          main@a1b2c3d4 (dirty)
  Model Hash:   f47ac10b58cc
  Hyperparams:  {'lr': 0.01, 'batch_size': 32, 'epochs': 50}
╚═════════════════════════╝

Serialisation

# Save to JSON
import json
with open("fingerprint.json", "w") as f:
    json.dump(fingerprint.to_dict(), f, indent=2)

# Load back
with open("fingerprint.json") as f:
    fp2 = TrainingFingerprint.from_dict(json.load(f))

Full Integration Example

Using all v1.3.0 features together:

from neurogebra.builders.model_builder import ModelBuilder
from neurogebra.logging.adaptive import AdaptiveLogger, AnomalyConfig
from neurogebra.logging.health_warnings import AutoHealthWarnings
from neurogebra.logging.epoch_summary import EpochSummarizer
from neurogebra.logging.tiered_storage import TieredStorage
from neurogebra.logging.dashboard import DashboardExporter
from neurogebra.logging.fingerprint import TrainingFingerprint
from neurogebra.logging.logger import TrainingLogger, LogLevel
import numpy as np

# 1. Build model
builder = ModelBuilder()
model = builder.Sequential([
    builder.Dense(64, activation="relu"),
    builder.Dense(32, activation="tanh"),
    builder.Dense(1, activation="sigmoid"),
], name="my_model")

# 2. Create logging pipeline
base_logger = TrainingLogger(level=LogLevel.EXPERT)
adaptive = AdaptiveLogger(base_logger)              # Smart filtering
storage = TieredStorage(base_dir="./logs")           # Tiered files
dashboard = DashboardExporter(path="./logs/dash.html")  # Visual dashboard
base_logger.add_backend(storage)
base_logger.add_backend(dashboard)

warnings = AutoHealthWarnings()                      # Auto health rules
summarizer = EpochSummarizer()                       # Epoch aggregation

# 3. Capture fingerprint
fp = TrainingFingerprint.capture(
    model_info={"name": "my_model", "layers": 3},
    hyperparameters={"lr": 0.01, "batch_size": 32, "epochs": 20},
    dataset=X_train,
    random_seed=42,
)
print(fp.format_text())

# 4. Train with full diagnostics
adaptive.on_train_start(total_epochs=20, model_info=fp.model_info)
for epoch in range(20):
    adaptive.on_epoch_start(epoch)
    for batch in range(num_batches):
        # ... training step ...
        summarizer.record_batch(epoch=epoch, metrics={"loss": loss})
        warnings.check_batch(loss=loss, epoch=epoch, batch=batch)

    summary = summarizer.finalize_epoch(epoch)
    print(summary.format_text())
    warnings.check_epoch(epoch=epoch, train_loss=train_loss, val_loss=val_loss)
    adaptive.on_epoch_end(epoch, metrics={"loss": train_loss})

adaptive.on_train_end()

# 5. Save everything
storage.close()
dashboard.save()
print(f"Anomalies detected: {adaptive.get_anomaly_summary()['total_anomalies']}")
print(f"Health warnings: {warnings.get_summary()['total_warnings']}")

API Reference

AdaptiveLogger

neurogebra.logging.adaptive.AdaptiveLogger

Wraps a :class:TrainingLogger and filters events adaptively.

In normal mode only BASIC-level events are emitted. When an anomaly is detected the logger temporarily escalates to EXPERT for escalation_cooldown events, so the user gets the full picture around the anomaly without drowning in noise the rest of the time.

The underlying TrainingLogger must be created with level=LogLevel.EXPERT (or higher) so it can emit the detailed events when the adaptive logger un-mutes them.

Source code in neurogebra/logging/adaptive.py
class AdaptiveLogger:
    """
    Wraps a :class:`TrainingLogger` and filters events adaptively.

    In **normal** mode only BASIC-level events are emitted.
    When an anomaly is detected the logger temporarily escalates to EXPERT
    for ``escalation_cooldown`` events, so the user gets the full picture
    around the anomaly without drowning in noise the rest of the time.

    The underlying ``TrainingLogger`` must be created with
    ``level=LogLevel.EXPERT`` (or higher) so it *can* emit the detailed
    events when the adaptive logger un-mutes them.
    """

    def __init__(
        self,
        base_logger: TrainingLogger,
        config: Optional[AnomalyConfig] = None,
    ):
        self._base = base_logger
        self.config = config or AnomalyConfig()

        # Ensure the base logger will accept EXPERT events
        if self._base.level < LogLevel.EXPERT:
            self._base.level = LogLevel.EXPERT

        # Rolling state
        self._gradient_norms: Dict[str, Deque[float]] = {}
        self._last_batch_loss: Optional[float] = None
        self._weight_deltas: Dict[str, Deque[float]] = {}
        self._anomalies: List[AnomalyRecord] = []

        # Escalation bookkeeping
        self._escalated = False
        self._escalation_counter = 0

        # Shadow level: the level we *pretend* the logger is at
        self._effective_level = LogLevel.BASIC

    # ------------------------------------------------------------------
    # Public API — mirrors TrainingLogger
    # ------------------------------------------------------------------

    @property
    def anomalies(self) -> List[AnomalyRecord]:
        """Return all detected anomalies so far."""
        return list(self._anomalies)

    @property
    def is_escalated(self) -> bool:
        return self._escalated

    # Delegate attribute access to the base logger for anything not overridden
    def __getattr__(self, name: str):
        return getattr(self._base, name)

    # -- train lifecycle --------------------------------------------------

    def on_train_start(self, **kwargs) -> None:
        self._base.on_train_start(**kwargs)

    def on_train_end(self, **kwargs) -> None:
        self._base.on_train_end(**kwargs)

    def on_epoch_start(self, epoch: int, **kwargs) -> None:
        self._base.on_epoch_start(epoch, **kwargs)

    def on_epoch_end(self, epoch: int, **kwargs) -> None:
        self._base.on_epoch_end(epoch, **kwargs)

    def on_batch_start(self, batch: int, **kwargs) -> None:
        self._base.on_batch_start(batch, **kwargs)

    def on_batch_end(self, batch: int, **kwargs) -> None:
        loss = kwargs.get("loss")
        if loss is not None:
            self._check_loss_spike(loss, kwargs.get("epoch"), batch)
            self._last_batch_loss = loss
        self._base.on_batch_end(batch, **kwargs)

    # -- layer-level (gated) -----------------------------------------------

    def on_layer_forward(self, layer_index: int, layer_name: str, **kwargs) -> None:
        """Only emit EXPERT-level layer_forward when escalated or anomalous."""
        anomaly = self._check_forward_anomaly(layer_name, kwargs)
        if anomaly or self._escalated:
            self._base.on_layer_forward(layer_index, layer_name, **kwargs)
        # else: silently skip

    def on_layer_backward(self, layer_index: int, layer_name: str, **kwargs) -> None:
        anomaly = self._check_backward_anomaly(layer_name, kwargs)
        if anomaly or self._escalated:
            self._base.on_layer_backward(layer_index, layer_name, **kwargs)

    def on_gradient_computed(self, param_name: str, gradient: float, **kwargs) -> None:
        anomaly = self._check_gradient_anomaly(param_name, gradient)
        if anomaly or self._escalated:
            self._base.on_gradient_computed(param_name, gradient, **kwargs)

    def on_weight_updated(self, param_name: str, old_value: float,
                          new_value: float, **kwargs) -> None:
        anomaly = self._check_weight_stagnation(param_name, old_value, new_value)
        if anomaly or self._escalated:
            self._base.on_weight_updated(param_name, old_value, new_value, **kwargs)

    def on_health_check(self, *args, **kwargs) -> None:
        self._base.on_health_check(*args, **kwargs)

    # ------------------------------------------------------------------
    # Anomaly detection helpers
    # ------------------------------------------------------------------

    def _flag_anomaly(self, record: AnomalyRecord) -> None:
        """Register an anomaly and enter escalated mode."""
        self._anomalies.append(record)
        self._escalated = True
        self._escalation_counter = self.config.escalation_cooldown

        # Also emit a health-check event
        self._base.on_health_check(
            check_name=f"adaptive_{record.anomaly_type}",
            severity=record.severity,
            message=record.message,
            recommendations=[],
            anomaly_data=record.data,
        )

    def _tick_escalation(self) -> None:
        """Count down the escalation cooldown after each gated event."""
        if self._escalated:
            self._escalation_counter -= 1
            if self._escalation_counter <= 0:
                self._escalated = False

    # -- forward checks ---------------------------------------------------

    def _check_forward_anomaly(self, layer_name: str, kwargs: Dict) -> bool:
        self._tick_escalation()

        output_data = kwargs.get("output_data")
        if output_data is not None:
            arr = np.asarray(output_data, dtype=np.float64)

            # NaN / Inf
            if np.any(np.isnan(arr)) or np.any(np.isinf(arr)):
                self._flag_anomaly(AnomalyRecord(
                    anomaly_type="nan_inf_activation",
                    layer_name=layer_name,
                    severity="critical",
                    message=f"NaN/Inf detected in activations of '{layer_name}'",
                    data={"nan_count": int(np.sum(np.isnan(arr))),
                          "inf_count": int(np.sum(np.isinf(arr)))},
                ))
                return True

            # Dead neurons
            flat = arr.ravel()
            zeros_pct = float(np.sum(flat == 0) / max(flat.size, 1) * 100)
            if zeros_pct > self.config.zeros_pct_threshold:
                self._flag_anomaly(AnomalyRecord(
                    anomaly_type="dead_neurons",
                    layer_name=layer_name,
                    severity="warning",
                    message=(f"{zeros_pct:.1f}% zeros in '{layer_name}' "
                             f"— possible dying ReLU"),
                    data={"zeros_pct": zeros_pct},
                ))
                return True

            # Saturation (sigmoid/tanh activations mostly ∈ (0,1) or (-1,1))
            if flat.size > 0:
                sat_low = float(np.sum(np.abs(flat) < 0.01) / flat.size * 100)
                sat_high = float(np.sum(np.abs(flat) > 0.99) / flat.size * 100)
                sat_total = sat_low + sat_high
                if sat_total > self.config.saturation_threshold:
                    self._flag_anomaly(AnomalyRecord(
                        anomaly_type="activation_saturation",
                        layer_name=layer_name,
                        severity="warning",
                        message=(f"{sat_total:.1f}% activations saturated "
                                 f"in '{layer_name}'"),
                        data={"saturation_pct": sat_total},
                    ))
                    return True

        return False

    # -- backward checks --------------------------------------------------

    def _check_backward_anomaly(self, layer_name: str, kwargs: Dict) -> bool:
        self._tick_escalation()
        grad_output = kwargs.get("grad_output")
        if grad_output is not None:
            arr = np.asarray(grad_output, dtype=np.float64)
            if np.any(np.isnan(arr)) or np.any(np.isinf(arr)):
                self._flag_anomaly(AnomalyRecord(
                    anomaly_type="nan_inf_gradient",
                    layer_name=layer_name,
                    severity="critical",
                    message=f"NaN/Inf in gradient output of '{layer_name}'",
                ))
                return True
        return False

    # -- gradient norm checks ---------------------------------------------

    def _check_gradient_anomaly(self, param_name: str, gradient: float) -> bool:
        self._tick_escalation()
        g = abs(gradient)

        # Absolute thresholds
        if g < self.config.gradient_vanish_threshold:
            self._flag_anomaly(AnomalyRecord(
                anomaly_type="vanishing_gradient",
                layer_name=param_name,
                severity="danger",
                message=f"Vanishing gradient for '{param_name}' (|g|={g:.2e})",
                data={"gradient": gradient},
            ))
            return True

        if g > self.config.gradient_explode_threshold:
            self._flag_anomaly(AnomalyRecord(
                anomaly_type="exploding_gradient",
                layer_name=param_name,
                severity="danger",
                message=f"Exploding gradient for '{param_name}' (|g|={g:.2e})",
                data={"gradient": gradient},
            ))
            return True

        # Spike detection
        buf = self._gradient_norms.setdefault(
            param_name, deque(maxlen=self.config.gradient_rolling_window)
        )
        if len(buf) >= 3:
            rolling_mean = float(np.mean(buf))
            if rolling_mean > 0 and g > rolling_mean * self.config.gradient_spike_factor:
                self._flag_anomaly(AnomalyRecord(
                    anomaly_type="gradient_spike",
                    layer_name=param_name,
                    severity="warning",
                    message=(f"Gradient spike in '{param_name}': "
                             f"{g:.2e} vs rolling avg {rolling_mean:.2e}"),
                    data={"gradient": gradient, "rolling_mean": rolling_mean},
                ))
                buf.append(g)
                return True
        buf.append(g)
        return False

    # -- loss spike -------------------------------------------------------

    def _check_loss_spike(self, loss: float, epoch: Optional[int],
                          batch: int) -> bool:
        if self._last_batch_loss is not None and self._last_batch_loss > 0:
            pct_increase = (loss - self._last_batch_loss) / self._last_batch_loss * 100
            if pct_increase > self.config.loss_spike_pct:
                self._flag_anomaly(AnomalyRecord(
                    anomaly_type="loss_spike",
                    epoch=epoch,
                    batch=batch,
                    severity="warning",
                    message=(f"Loss spiked by {pct_increase:.1f}% "
                             f"({self._last_batch_loss:.4f}{loss:.4f})"),
                    data={"prev_loss": self._last_batch_loss, "new_loss": loss,
                          "pct_increase": pct_increase},
                ))
                return True
        return False

    # -- weight stagnation ------------------------------------------------

    def _check_weight_stagnation(self, param_name: str, old: float,
                                 new: float) -> bool:
        self._tick_escalation()
        delta = abs(new - old)
        buf = self._weight_deltas.setdefault(
            param_name, deque(maxlen=self.config.weight_stagnation_window)
        )
        buf.append(delta)
        if len(buf) >= self.config.weight_stagnation_window:
            if all(d < self.config.weight_stagnation_threshold for d in buf):
                self._flag_anomaly(AnomalyRecord(
                    anomaly_type="weight_stagnation",
                    layer_name=param_name,
                    severity="warning",
                    message=(f"Weight '{param_name}' stagnant for "
                             f"{self.config.weight_stagnation_window} updates "
                             f"(max Δ={max(buf):.2e})"),
                    data={"max_delta": float(max(buf))},
                ))
                return True
        return False

    # ------------------------------------------------------------------
    # Utilities
    # ------------------------------------------------------------------

    def get_anomaly_summary(self) -> Dict[str, Any]:
        """Return a structured summary of all detected anomalies."""
        by_type: Dict[str, int] = {}
        by_severity: Dict[str, int] = {}
        for a in self._anomalies:
            by_type[a.anomaly_type] = by_type.get(a.anomaly_type, 0) + 1
            by_severity[a.severity] = by_severity.get(a.severity, 0) + 1
        return {
            "total_anomalies": len(self._anomalies),
            "by_type": by_type,
            "by_severity": by_severity,
            "anomalies": [
                {
                    "type": a.anomaly_type,
                    "severity": a.severity,
                    "message": a.message,
                    "layer": a.layer_name,
                    "epoch": a.epoch,
                    "batch": a.batch,
                    "timestamp": a.timestamp,
                }
                for a in self._anomalies
            ],
        }

    def reset(self) -> None:
        """Clear all anomaly state and go back to BASIC mode."""
        self._anomalies.clear()
        self._gradient_norms.clear()
        self._weight_deltas.clear()
        self._last_batch_loss = None
        self._escalated = False
        self._escalation_counter = 0

Attributes

anomalies property

Return all detected anomalies so far.

Functions

on_layer_forward(layer_index, layer_name, **kwargs)

Only emit EXPERT-level layer_forward when escalated or anomalous.

Source code in neurogebra/logging/adaptive.py
def on_layer_forward(self, layer_index: int, layer_name: str, **kwargs) -> None:
    """Only emit EXPERT-level layer_forward when escalated or anomalous."""
    anomaly = self._check_forward_anomaly(layer_name, kwargs)
    if anomaly or self._escalated:
        self._base.on_layer_forward(layer_index, layer_name, **kwargs)

get_anomaly_summary()

Return a structured summary of all detected anomalies.

Source code in neurogebra/logging/adaptive.py
def get_anomaly_summary(self) -> Dict[str, Any]:
    """Return a structured summary of all detected anomalies."""
    by_type: Dict[str, int] = {}
    by_severity: Dict[str, int] = {}
    for a in self._anomalies:
        by_type[a.anomaly_type] = by_type.get(a.anomaly_type, 0) + 1
        by_severity[a.severity] = by_severity.get(a.severity, 0) + 1
    return {
        "total_anomalies": len(self._anomalies),
        "by_type": by_type,
        "by_severity": by_severity,
        "anomalies": [
            {
                "type": a.anomaly_type,
                "severity": a.severity,
                "message": a.message,
                "layer": a.layer_name,
                "epoch": a.epoch,
                "batch": a.batch,
                "timestamp": a.timestamp,
            }
            for a in self._anomalies
        ],
    }

reset()

Clear all anomaly state and go back to BASIC mode.

Source code in neurogebra/logging/adaptive.py
def reset(self) -> None:
    """Clear all anomaly state and go back to BASIC mode."""
    self._anomalies.clear()
    self._gradient_norms.clear()
    self._weight_deltas.clear()
    self._last_batch_loss = None
    self._escalated = False
    self._escalation_counter = 0

AnomalyConfig

neurogebra.logging.adaptive.AnomalyConfig dataclass

Thresholds that trigger escalation from BASIC → EXPERT logging.

Source code in neurogebra/logging/adaptive.py
@dataclass
class AnomalyConfig:
    """Thresholds that trigger escalation from BASIC → EXPERT logging."""

    # Dead neuron / zero activation threshold (percent)
    zeros_pct_threshold: float = 50.0

    # Gradient spike: current norm > rolling_mean × spike_factor
    gradient_spike_factor: float = 5.0
    gradient_rolling_window: int = 20

    # Gradient absolute thresholds
    gradient_vanish_threshold: float = 1e-7
    gradient_explode_threshold: float = 100.0

    # Loss spike between consecutive batches (percent increase)
    loss_spike_pct: float = 50.0

    # Activation saturation threshold (percent)
    saturation_threshold: float = 40.0

    # Weight delta near-zero (consecutive batches)
    weight_stagnation_threshold: float = 1e-6
    weight_stagnation_window: int = 5

    # How many events to keep in "escalated" mode after an anomaly
    escalation_cooldown: int = 10

AutoHealthWarnings

neurogebra.logging.health_warnings.AutoHealthWarnings

Stateful warning engine that tracks training metrics over time and fires threshold-based rules automatically.

Attach to a training loop and call :meth:check_batch / :meth:check_epoch each iteration. Accumulated warnings are accessible via :attr:warnings.

Source code in neurogebra/logging/health_warnings.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
class AutoHealthWarnings:
    """
    Stateful warning engine that tracks training metrics over time
    and fires threshold-based rules automatically.

    Attach to a training loop and call :meth:`check_batch` /
    :meth:`check_epoch` each iteration.  Accumulated warnings are
    accessible via :attr:`warnings`.
    """

    def __init__(self, config: Optional[WarningConfig] = None):
        self.config = config or WarningConfig()

        # Rolling state
        self._gradient_norms: Dict[str, Deque[float]] = {}
        self._weight_deltas: Dict[str, Deque[float]] = {}
        self._train_losses: List[float] = []
        self._val_losses: List[float] = []
        self._batch_losses: Deque[float] = deque(maxlen=100)

        # Collected warnings
        self._warnings: List[HealthWarning] = []

        # Dedup: avoid spamming the same warning every batch
        self._fired_rules: Dict[str, float] = {}  # rule_key → last-fired timestamp
        self._dedup_interval = 30.0  # seconds

    @property
    def warnings(self) -> List[HealthWarning]:
        return list(self._warnings)

    # ------------------------------------------------------------------
    # Per-batch check
    # ------------------------------------------------------------------

    def check_batch(
        self,
        *,
        epoch: Optional[int] = None,
        batch: Optional[int] = None,
        loss: Optional[float] = None,
        gradient_norms: Optional[Dict[str, float]] = None,
        weight_stats: Optional[Dict[str, Dict[str, Any]]] = None,
        activation_stats: Optional[Dict[str, Dict[str, Any]]] = None,
        weight_deltas: Optional[Dict[str, float]] = None,
    ) -> List[HealthWarning]:
        """Run all batch-level rules and return new warnings."""
        new: List[HealthWarning] = []

        if loss is not None:
            self._batch_losses.append(loss)
            new.extend(self._check_nan_inf_loss(loss, epoch, batch))
            new.extend(self._check_loss_divergence(epoch, batch))

        if gradient_norms:
            new.extend(self._check_gradients(gradient_norms, epoch, batch))

        if activation_stats:
            new.extend(self._check_activations(activation_stats, epoch, batch))

        if weight_stats:
            new.extend(self._check_dead_weights(weight_stats, epoch, batch))

        if weight_deltas:
            new.extend(self._check_weight_stagnation(weight_deltas, epoch, batch))

        self._warnings.extend(new)
        return new

    # ------------------------------------------------------------------
    # Per-epoch check
    # ------------------------------------------------------------------

    def check_epoch(
        self,
        *,
        epoch: int,
        train_loss: Optional[float] = None,
        val_loss: Optional[float] = None,
        train_acc: Optional[float] = None,
        val_acc: Optional[float] = None,
        gradient_norms: Optional[Dict[str, float]] = None,
        weight_stats: Optional[Dict[str, Dict[str, Any]]] = None,
        activation_stats: Optional[Dict[str, Dict[str, Any]]] = None,
    ) -> List[HealthWarning]:
        """Run all epoch-level rules and return new warnings."""
        new: List[HealthWarning] = []

        if train_loss is not None:
            self._train_losses.append(train_loss)
        if val_loss is not None:
            self._val_losses.append(val_loss)

        # Overfitting check
        new.extend(self._check_overfitting(epoch))

        # Loss stagnation
        new.extend(self._check_loss_stagnation(epoch))

        # Gradient checks (epoch-level too)
        if gradient_norms:
            new.extend(self._check_gradients(gradient_norms, epoch, None))

        # Activation / weight checks
        if activation_stats:
            new.extend(self._check_activations(activation_stats, epoch, None))
        if weight_stats:
            new.extend(self._check_dead_weights(weight_stats, epoch, None))

        self._warnings.extend(new)
        return new

    # ------------------------------------------------------------------
    # Rule implementations
    # ------------------------------------------------------------------

    def _should_fire(self, rule_key: str) -> bool:
        """De-duplicate: don't fire the same rule twice within the interval."""
        now = time.time()
        last = self._fired_rules.get(rule_key)
        if last is not None and (now - last) < self._dedup_interval:
            return False
        self._fired_rules[rule_key] = now
        return True

    # -- NaN / Inf --------------------------------------------------------

    def _check_nan_inf_loss(self, loss: float, epoch, batch) -> List[HealthWarning]:
        if not (np.isnan(loss) or np.isinf(loss)):
            return []
        key = "nan_inf_loss"
        if not self._should_fire(key):
            return []
        return [HealthWarning(
            rule_name="nan_inf_loss",
            severity="critical",
            message="NaN/Inf detected in loss!",
            diagnosis=(
                "Numerical instability has corrupted the loss. "
                "Training should be stopped immediately."
            ),
            recommendations=[
                "Lower the learning rate (current may be too high)",
                "Add gradient clipping (max_norm=1.0)",
                "Check input data for NaN/Inf values",
                "Use a more numerically stable loss function",
            ],
            epoch=epoch, batch=batch,
            data={"loss": float(loss) if np.isfinite(loss) else str(loss)},
        )]

    # -- Loss divergence --------------------------------------------------

    def _check_loss_divergence(self, epoch, batch) -> List[HealthWarning]:
        w = self.config.loss_divergence_window
        if len(self._batch_losses) < w:
            return []
        recent = list(self._batch_losses)[-w:]
        if recent[-1] > recent[0] * self.config.lr_too_high_loss_factor:
            key = "loss_divergence"
            if not self._should_fire(key):
                return []
            return [HealthWarning(
                rule_name="loss_divergence",
                severity="danger",
                message=f"Loss diverging over last {w} batches",
                diagnosis=(
                    "The loss is increasing rapidly, indicating training instability."
                ),
                recommendations=[
                    "Immediately lower the learning rate",
                    "Add gradient clipping",
                    "Check data preprocessing (normalise inputs)",
                ],
                epoch=epoch, batch=batch,
                data={"recent_losses": recent},
            )]
        return []

    # -- Gradient checks --------------------------------------------------

    def _check_gradients(self, gradient_norms: Dict[str, float],
                         epoch, batch) -> List[HealthWarning]:
        alerts: List[HealthWarning] = []
        cfg = self.config

        for layer, norm in gradient_norms.items():
            # NaN/Inf
            if np.isnan(norm) or np.isinf(norm):
                key = f"gradient_nan_{layer}"
                if self._should_fire(key):
                    alerts.append(HealthWarning(
                        rule_name="gradient_nan_inf",
                        severity="critical",
                        message=f"NaN/Inf gradient in '{layer}'",
                        diagnosis="Gradient corruption prevents learning.",
                        recommendations=[
                            "Lower the learning rate",
                            "Add gradient clipping (max_norm=1.0)",
                            "Use batch normalisation before this layer",
                        ],
                        layer_name=layer, epoch=epoch, batch=batch,
                    ))
                continue

            # Vanishing
            if norm < cfg.gradient_vanish_thresh:
                key = f"gradient_vanish_{layer}"
                if self._should_fire(key):
                    alerts.append(HealthWarning(
                        rule_name="vanishing_gradient",
                        severity="danger",
                        message=f"Vanishing gradient in '{layer}' (norm={norm:.2e})",
                        diagnosis="Gradients too small — this layer is effectively frozen.",
                        recommendations=[
                            "Switch to ReLU or LeakyReLU activation",
                            "Use batch normalisation",
                            "Try skip connections (ResNet-style)",
                        ],
                        layer_name=layer, epoch=epoch, batch=batch,
                        data={"norm": norm},
                    ))

            # Exploding
            if norm > cfg.gradient_explode_thresh:
                key = f"gradient_explode_{layer}"
                if self._should_fire(key):
                    alerts.append(HealthWarning(
                        rule_name="exploding_gradient",
                        severity="danger",
                        message=f"Exploding gradient in '{layer}' (norm={norm:.2e})",
                        diagnosis="Excessively large gradients cause unstable weight updates.",
                        recommendations=[
                            "Add gradient clipping (max_norm=1.0)",
                            "Lower the learning rate",
                            "Use batch normalisation",
                        ],
                        layer_name=layer, epoch=epoch, batch=batch,
                        data={"norm": norm},
                    ))

            # Spike
            buf = self._gradient_norms.setdefault(
                layer, deque(maxlen=cfg.gradient_rolling_window))
            if len(buf) >= 3:
                rolling_mean = float(np.mean(buf))
                if rolling_mean > 0 and norm > rolling_mean * cfg.gradient_spike_factor:
                    key = f"gradient_spike_{layer}"
                    if self._should_fire(key):
                        alerts.append(HealthWarning(
                            rule_name="gradient_spike",
                            severity="warning",
                            message=(f"Possible exploding gradient in '{layer}': "
                                     f"norm {norm:.2e} vs rolling avg {rolling_mean:.2e}"),
                            diagnosis="A sudden gradient spike may indicate instability.",
                            recommendations=[
                                "Add gradient clipping",
                                "Reduce learning rate temporarily",
                                "Check for outlier data in the current batch",
                            ],
                            layer_name=layer, epoch=epoch, batch=batch,
                            data={"norm": norm, "rolling_mean": rolling_mean},
                        ))
            buf.append(norm)

        return alerts

    # -- Activation checks ------------------------------------------------

    def _check_activations(self, activation_stats: Dict[str, Dict],
                           epoch, batch) -> List[HealthWarning]:
        alerts: List[HealthWarning] = []
        for layer, stats in activation_stats.items():
            zeros_pct = stats.get("zeros_pct", 0)
            act_type = stats.get("activation_type", "")

            # Dead ReLU
            if act_type in ("relu", "leaky_relu") and zeros_pct > self.config.dead_relu_zeros_pct:
                key = f"dead_relu_{layer}"
                if self._should_fire(key):
                    alerts.append(HealthWarning(
                        rule_name="dead_relu",
                        severity="warning",
                        message=f"Possible dying ReLU in '{layer}' ({zeros_pct:.1f}% zeros)",
                        diagnosis=(
                            "Neurons producing zero outputs will receive zero gradients "
                            "and never recover."
                        ),
                        recommendations=[
                            "Use LeakyReLU(negative_slope=0.01) instead of ReLU",
                            "Lower the learning rate",
                            "Use He initialisation",
                        ],
                        layer_name=layer, epoch=epoch, batch=batch,
                        data={"zeros_pct": zeros_pct},
                    ))

            # Saturation
            sat_pct = stats.get("saturation_pct", 0)
            if sat_pct > self.config.saturation_pct_thresh:
                key = f"saturation_{layer}"
                if self._should_fire(key):
                    alerts.append(HealthWarning(
                        rule_name="activation_saturation",
                        severity="warning",
                        message=f"{sat_pct:.1f}% activations saturated in '{layer}'",
                        diagnosis="Saturated activations produce near-zero gradients.",
                        recommendations=[
                            "Switch to ReLU or GELU activation",
                            "Normalise inputs to the layer",
                            "Use batch normalisation",
                        ],
                        layer_name=layer, epoch=epoch, batch=batch,
                        data={"saturation_pct": sat_pct},
                    ))
        return alerts

    # -- Weight checks ----------------------------------------------------

    def _check_dead_weights(self, weight_stats: Dict[str, Dict],
                            epoch, batch) -> List[HealthWarning]:
        alerts: List[HealthWarning] = []
        for layer, stats in weight_stats.items():
            zeros_pct = stats.get("zeros_pct", 0)
            if zeros_pct > self.config.dead_relu_zeros_pct:
                key = f"dead_weights_{layer}"
                if self._should_fire(key):
                    alerts.append(HealthWarning(
                        rule_name="dead_weights",
                        severity="warning",
                        message=f"{zeros_pct:.1f}% dead neurons in '{layer}'",
                        diagnosis="Most weights near zero — layer contributes nothing.",
                        recommendations=[
                            "Switch to LeakyReLU or ELU",
                            "Use a different weight initialisation",
                            "Lower the learning rate",
                        ],
                        layer_name=layer, epoch=epoch, batch=batch,
                        data={"zeros_pct": zeros_pct},
                    ))
        return alerts

    # -- Weight stagnation ------------------------------------------------

    def _check_weight_stagnation(self, weight_deltas: Dict[str, float],
                                 epoch, batch) -> List[HealthWarning]:
        alerts: List[HealthWarning] = []
        cfg = self.config
        for param, delta in weight_deltas.items():
            buf = self._weight_deltas.setdefault(
                param, deque(maxlen=cfg.weight_stagnation_window))
            buf.append(delta)
            if len(buf) >= cfg.weight_stagnation_window:
                if all(d < cfg.weight_stagnation_eps for d in buf):
                    key = f"weight_stagnation_{param}"
                    if self._should_fire(key):
                        alerts.append(HealthWarning(
                            rule_name="weight_stagnation",
                            severity="warning",
                            message=(f"Optimizer may have stagnated for '{param}' "
                                     f"({cfg.weight_stagnation_window} batches, "
                                     f"max Δ={max(buf):.2e})"),
                            diagnosis=(
                                "Weight updates are near-zero for several consecutive "
                                "batches, suggesting the optimizer has plateaued."
                            ),
                            recommendations=[
                                "Reduce learning rate and use a scheduler",
                                "Try a different optimizer (switch SGD↔Adam)",
                                "Check that gradients are flowing to this parameter",
                            ],
                            layer_name=param, epoch=epoch, batch=batch,
                            data={"max_delta": float(max(buf))},
                        ))
        return alerts

    # -- Overfitting ------------------------------------------------------

    def _check_overfitting(self, epoch: int) -> List[HealthWarning]:
        p = self.config.overfit_patience
        if len(self._train_losses) < p or len(self._val_losses) < p:
            return []
        recent_train = float(np.mean(self._train_losses[-p:]))
        recent_val = float(np.mean(self._val_losses[-p:]))
        if recent_train < 1e-12:
            return []
        ratio = recent_val / max(recent_train, 1e-12)
        if ratio > self.config.overfit_ratio:
            key = "overfitting"
            if not self._should_fire(key):
                return []
            return [HealthWarning(
                rule_name="overfitting",
                severity="warning",
                message=f"Possible overfitting (val/train loss ratio = {ratio:.2f})",
                diagnosis=(
                    "Validation loss is diverging from training loss, "
                    "indicating the model is memorising rather than learning."
                ),
                recommendations=[
                    "Add Dropout layers (rate=0.2-0.5)",
                    "Use L2 regularization (weight_decay=1e-4)",
                    "Get more training data or use data augmentation",
                    "Reduce model complexity (fewer layers / neurons)",
                ],
                epoch=epoch,
                data={"ratio": ratio, "train": recent_train, "val": recent_val},
            )]
        return []

    # -- Loss stagnation --------------------------------------------------

    def _check_loss_stagnation(self, epoch: int) -> List[HealthWarning]:
        w = self.config.loss_stagnation_window
        if len(self._train_losses) < w:
            return []
        recent = self._train_losses[-w:]
        delta = abs(recent[-1] - recent[0])
        if delta < self.config.loss_stagnation_eps:
            key = "loss_stagnation"
            if not self._should_fire(key):
                return []
            return [HealthWarning(
                rule_name="loss_stagnation",
                severity="warning",
                message=f"Loss stagnant for {w} epochs (Δ={delta:.2e})",
                diagnosis="Training progress has plateaued.",
                recommendations=[
                    "Reduce learning rate (try lr × 0.1)",
                    "Use learning rate scheduling (e.g., cosine annealing)",
                    "Try a different optimizer (switch SGD↔Adam)",
                ],
                epoch=epoch,
                data={"delta": delta, "window": w},
            )]
        return []

    # ------------------------------------------------------------------
    # Utilities
    # ------------------------------------------------------------------

    def get_summary(self) -> Dict[str, Any]:
        """Return a structured summary of all warnings fired."""
        by_rule: Dict[str, int] = {}
        by_severity: Dict[str, int] = {}
        for w in self._warnings:
            by_rule[w.rule_name] = by_rule.get(w.rule_name, 0) + 1
            by_severity[w.severity] = by_severity.get(w.severity, 0) + 1
        return {
            "total_warnings": len(self._warnings),
            "by_rule": by_rule,
            "by_severity": by_severity,
            "warnings": [
                {
                    "rule": w.rule_name,
                    "severity": w.severity,
                    "message": w.message,
                    "layer": w.layer_name,
                    "epoch": w.epoch,
                    "batch": w.batch,
                }
                for w in self._warnings
            ],
        }

    def reset(self) -> None:
        """Clear all state."""
        self._warnings.clear()
        self._gradient_norms.clear()
        self._weight_deltas.clear()
        self._train_losses.clear()
        self._val_losses.clear()
        self._batch_losses.clear()
        self._fired_rules.clear()

Functions

check_batch(*, epoch=None, batch=None, loss=None, gradient_norms=None, weight_stats=None, activation_stats=None, weight_deltas=None)

Run all batch-level rules and return new warnings.

Source code in neurogebra/logging/health_warnings.py
def check_batch(
    self,
    *,
    epoch: Optional[int] = None,
    batch: Optional[int] = None,
    loss: Optional[float] = None,
    gradient_norms: Optional[Dict[str, float]] = None,
    weight_stats: Optional[Dict[str, Dict[str, Any]]] = None,
    activation_stats: Optional[Dict[str, Dict[str, Any]]] = None,
    weight_deltas: Optional[Dict[str, float]] = None,
) -> List[HealthWarning]:
    """Run all batch-level rules and return new warnings."""
    new: List[HealthWarning] = []

    if loss is not None:
        self._batch_losses.append(loss)
        new.extend(self._check_nan_inf_loss(loss, epoch, batch))
        new.extend(self._check_loss_divergence(epoch, batch))

    if gradient_norms:
        new.extend(self._check_gradients(gradient_norms, epoch, batch))

    if activation_stats:
        new.extend(self._check_activations(activation_stats, epoch, batch))

    if weight_stats:
        new.extend(self._check_dead_weights(weight_stats, epoch, batch))

    if weight_deltas:
        new.extend(self._check_weight_stagnation(weight_deltas, epoch, batch))

    self._warnings.extend(new)
    return new

check_epoch(*, epoch, train_loss=None, val_loss=None, train_acc=None, val_acc=None, gradient_norms=None, weight_stats=None, activation_stats=None)

Run all epoch-level rules and return new warnings.

Source code in neurogebra/logging/health_warnings.py
def check_epoch(
    self,
    *,
    epoch: int,
    train_loss: Optional[float] = None,
    val_loss: Optional[float] = None,
    train_acc: Optional[float] = None,
    val_acc: Optional[float] = None,
    gradient_norms: Optional[Dict[str, float]] = None,
    weight_stats: Optional[Dict[str, Dict[str, Any]]] = None,
    activation_stats: Optional[Dict[str, Dict[str, Any]]] = None,
) -> List[HealthWarning]:
    """Run all epoch-level rules and return new warnings."""
    new: List[HealthWarning] = []

    if train_loss is not None:
        self._train_losses.append(train_loss)
    if val_loss is not None:
        self._val_losses.append(val_loss)

    # Overfitting check
    new.extend(self._check_overfitting(epoch))

    # Loss stagnation
    new.extend(self._check_loss_stagnation(epoch))

    # Gradient checks (epoch-level too)
    if gradient_norms:
        new.extend(self._check_gradients(gradient_norms, epoch, None))

    # Activation / weight checks
    if activation_stats:
        new.extend(self._check_activations(activation_stats, epoch, None))
    if weight_stats:
        new.extend(self._check_dead_weights(weight_stats, epoch, None))

    self._warnings.extend(new)
    return new

get_summary()

Return a structured summary of all warnings fired.

Source code in neurogebra/logging/health_warnings.py
def get_summary(self) -> Dict[str, Any]:
    """Return a structured summary of all warnings fired."""
    by_rule: Dict[str, int] = {}
    by_severity: Dict[str, int] = {}
    for w in self._warnings:
        by_rule[w.rule_name] = by_rule.get(w.rule_name, 0) + 1
        by_severity[w.severity] = by_severity.get(w.severity, 0) + 1
    return {
        "total_warnings": len(self._warnings),
        "by_rule": by_rule,
        "by_severity": by_severity,
        "warnings": [
            {
                "rule": w.rule_name,
                "severity": w.severity,
                "message": w.message,
                "layer": w.layer_name,
                "epoch": w.epoch,
                "batch": w.batch,
            }
            for w in self._warnings
        ],
    }

reset()

Clear all state.

Source code in neurogebra/logging/health_warnings.py
def reset(self) -> None:
    """Clear all state."""
    self._warnings.clear()
    self._gradient_norms.clear()
    self._weight_deltas.clear()
    self._train_losses.clear()
    self._val_losses.clear()
    self._batch_losses.clear()
    self._fired_rules.clear()

WarningConfig

neurogebra.logging.health_warnings.WarningConfig dataclass

Configurable thresholds for the automated health warning system.

Source code in neurogebra/logging/health_warnings.py
@dataclass
class WarningConfig:
    """Configurable thresholds for the automated health warning system."""

    # Dead ReLU / zero activation
    dead_relu_zeros_pct: float = 50.0

    # Gradient norms
    gradient_vanish_thresh: float = 1e-7
    gradient_explode_thresh: float = 100.0
    gradient_spike_factor: float = 5.0
    gradient_rolling_window: int = 20

    # Overfitting
    overfit_patience: int = 3
    overfit_ratio: float = 1.3          # val_loss / train_loss

    # Stagnation
    weight_stagnation_eps: float = 1e-6
    weight_stagnation_window: int = 5
    loss_stagnation_eps: float = 1e-4
    loss_stagnation_window: int = 5

    # Divergence
    loss_divergence_window: int = 3

    # Activation saturation
    saturation_pct_thresh: float = 40.0

    # Learning rate heuristic
    lr_too_high_loss_factor: float = 3.0

EpochSummarizer

neurogebra.logging.epoch_summary.EpochSummarizer

Accumulates batch-level data and produces per-epoch statistical summaries.

Call :meth:record_batch for every batch, then :meth:finalize_epoch at the end of the epoch to get an :class:EpochSummary.

Source code in neurogebra/logging/epoch_summary.py
class EpochSummarizer:
    """
    Accumulates batch-level data and produces per-epoch statistical summaries.

    Call :meth:`record_batch` for every batch, then :meth:`finalize_epoch`
    at the end of the epoch to get an :class:`EpochSummary`.
    """

    def __init__(self):
        # {epoch: {metric_name: [values]}}
        self._metric_buffers: Dict[int, Dict[str, List[float]]] = defaultdict(lambda: defaultdict(list))
        # {epoch: {layer: [norm_values]}}
        self._gradient_buffers: Dict[int, Dict[str, List[float]]] = defaultdict(lambda: defaultdict(list))
        # {epoch: {layer: {stat_name: [values]}}}
        self._weight_buffers: Dict[int, Dict[str, Dict[str, List[float]]]] = defaultdict(
            lambda: defaultdict(lambda: defaultdict(list))
        )
        self._activation_buffers: Dict[int, Dict[str, Dict[str, List[float]]]] = defaultdict(
            lambda: defaultdict(lambda: defaultdict(list))
        )
        self._batch_counts: Dict[int, int] = defaultdict(int)
        self._summaries: List[EpochSummary] = []

    @property
    def summaries(self) -> List[EpochSummary]:
        return list(self._summaries)

    def record_batch(
        self,
        epoch: int,
        *,
        metrics: Optional[Dict[str, float]] = None,
        gradient_norms: Optional[Dict[str, float]] = None,
        weight_stats: Optional[Dict[str, Dict[str, float]]] = None,
        activation_stats: Optional[Dict[str, Dict[str, float]]] = None,
    ) -> None:
        """Buffer one batch of data for the given epoch."""
        self._batch_counts[epoch] += 1

        if metrics:
            buf = self._metric_buffers[epoch]
            for key, val in metrics.items():
                if isinstance(val, (int, float)) and np.isfinite(val):
                    buf[key].append(float(val))

        if gradient_norms:
            buf = self._gradient_buffers[epoch]
            for layer, norm in gradient_norms.items():
                if np.isfinite(norm):
                    buf[layer].append(float(norm))

        if weight_stats:
            buf = self._weight_buffers[epoch]
            for layer, stats in weight_stats.items():
                for key, val in stats.items():
                    if isinstance(val, (int, float)) and np.isfinite(val):
                        buf[layer][key].append(float(val))

        if activation_stats:
            buf = self._activation_buffers[epoch]
            for layer, stats in activation_stats.items():
                for key, val in stats.items():
                    if isinstance(val, (int, float)) and np.isfinite(val):
                        buf[layer][key].append(float(val))

    def finalize_epoch(self, epoch: int) -> EpochSummary:
        """
        Compute and return the statistical summary for *epoch*.

        Automatically clears batch buffers for that epoch.
        """
        n_batches = self._batch_counts.get(epoch, 0)

        # Metrics
        metric_stats: Dict[str, EpochStats] = {}
        for name, vals in self._metric_buffers.get(epoch, {}).items():
            if vals:
                metric_stats[name] = _compute_stats(name, vals)

        # Gradient norms
        grad_stats: Dict[str, EpochStats] = {}
        for layer, vals in self._gradient_buffers.get(epoch, {}).items():
            if vals:
                grad_stats[layer] = _compute_stats(layer, vals)

        # Weight summaries
        weight_sums: Dict[str, Dict[str, EpochStats]] = {}
        for layer, keys in self._weight_buffers.get(epoch, {}).items():
            weight_sums[layer] = {}
            for key, vals in keys.items():
                if vals:
                    weight_sums[layer][key] = _compute_stats(key, vals)

        # Activation summaries
        act_sums: Dict[str, Dict[str, EpochStats]] = {}
        for layer, keys in self._activation_buffers.get(epoch, {}).items():
            act_sums[layer] = {}
            for key, vals in keys.items():
                if vals:
                    act_sums[layer][key] = _compute_stats(key, vals)

        summary = EpochSummary(
            epoch=epoch,
            num_batches=n_batches,
            metrics=metric_stats,
            gradient_norms=grad_stats,
            weight_summaries=weight_sums,
            activation_summaries=act_sums,
        )
        self._summaries.append(summary)

        # Cleanup
        self._metric_buffers.pop(epoch, None)
        self._gradient_buffers.pop(epoch, None)
        self._weight_buffers.pop(epoch, None)
        self._activation_buffers.pop(epoch, None)
        self._batch_counts.pop(epoch, None)

        return summary

    def get_all_summaries(self) -> List[Dict[str, Any]]:
        """Return all epoch summaries as dicts."""
        return [s.to_dict() for s in self._summaries]

    def reset(self) -> None:
        """Clear all state."""
        self._metric_buffers.clear()
        self._gradient_buffers.clear()
        self._weight_buffers.clear()
        self._activation_buffers.clear()
        self._batch_counts.clear()
        self._summaries.clear()

Functions

record_batch(epoch, *, metrics=None, gradient_norms=None, weight_stats=None, activation_stats=None)

Buffer one batch of data for the given epoch.

Source code in neurogebra/logging/epoch_summary.py
def record_batch(
    self,
    epoch: int,
    *,
    metrics: Optional[Dict[str, float]] = None,
    gradient_norms: Optional[Dict[str, float]] = None,
    weight_stats: Optional[Dict[str, Dict[str, float]]] = None,
    activation_stats: Optional[Dict[str, Dict[str, float]]] = None,
) -> None:
    """Buffer one batch of data for the given epoch."""
    self._batch_counts[epoch] += 1

    if metrics:
        buf = self._metric_buffers[epoch]
        for key, val in metrics.items():
            if isinstance(val, (int, float)) and np.isfinite(val):
                buf[key].append(float(val))

    if gradient_norms:
        buf = self._gradient_buffers[epoch]
        for layer, norm in gradient_norms.items():
            if np.isfinite(norm):
                buf[layer].append(float(norm))

    if weight_stats:
        buf = self._weight_buffers[epoch]
        for layer, stats in weight_stats.items():
            for key, val in stats.items():
                if isinstance(val, (int, float)) and np.isfinite(val):
                    buf[layer][key].append(float(val))

    if activation_stats:
        buf = self._activation_buffers[epoch]
        for layer, stats in activation_stats.items():
            for key, val in stats.items():
                if isinstance(val, (int, float)) and np.isfinite(val):
                    buf[layer][key].append(float(val))

finalize_epoch(epoch)

Compute and return the statistical summary for epoch.

Automatically clears batch buffers for that epoch.

Source code in neurogebra/logging/epoch_summary.py
def finalize_epoch(self, epoch: int) -> EpochSummary:
    """
    Compute and return the statistical summary for *epoch*.

    Automatically clears batch buffers for that epoch.
    """
    n_batches = self._batch_counts.get(epoch, 0)

    # Metrics
    metric_stats: Dict[str, EpochStats] = {}
    for name, vals in self._metric_buffers.get(epoch, {}).items():
        if vals:
            metric_stats[name] = _compute_stats(name, vals)

    # Gradient norms
    grad_stats: Dict[str, EpochStats] = {}
    for layer, vals in self._gradient_buffers.get(epoch, {}).items():
        if vals:
            grad_stats[layer] = _compute_stats(layer, vals)

    # Weight summaries
    weight_sums: Dict[str, Dict[str, EpochStats]] = {}
    for layer, keys in self._weight_buffers.get(epoch, {}).items():
        weight_sums[layer] = {}
        for key, vals in keys.items():
            if vals:
                weight_sums[layer][key] = _compute_stats(key, vals)

    # Activation summaries
    act_sums: Dict[str, Dict[str, EpochStats]] = {}
    for layer, keys in self._activation_buffers.get(epoch, {}).items():
        act_sums[layer] = {}
        for key, vals in keys.items():
            if vals:
                act_sums[layer][key] = _compute_stats(key, vals)

    summary = EpochSummary(
        epoch=epoch,
        num_batches=n_batches,
        metrics=metric_stats,
        gradient_norms=grad_stats,
        weight_summaries=weight_sums,
        activation_summaries=act_sums,
    )
    self._summaries.append(summary)

    # Cleanup
    self._metric_buffers.pop(epoch, None)
    self._gradient_buffers.pop(epoch, None)
    self._weight_buffers.pop(epoch, None)
    self._activation_buffers.pop(epoch, None)
    self._batch_counts.pop(epoch, None)

    return summary

get_all_summaries()

Return all epoch summaries as dicts.

Source code in neurogebra/logging/epoch_summary.py
def get_all_summaries(self) -> List[Dict[str, Any]]:
    """Return all epoch summaries as dicts."""
    return [s.to_dict() for s in self._summaries]

reset()

Clear all state.

Source code in neurogebra/logging/epoch_summary.py
def reset(self) -> None:
    """Clear all state."""
    self._metric_buffers.clear()
    self._gradient_buffers.clear()
    self._weight_buffers.clear()
    self._activation_buffers.clear()
    self._batch_counts.clear()
    self._summaries.clear()

TieredStorage

neurogebra.logging.tiered_storage.TieredStorage

Backend for :class:TrainingLogger that writes events into three separate NDJSON files based on their tier.

Attributes:

Name Type Description
basic_path

Path to basic.log.

health_path

Path to health.log.

debug_path

Path to debug.log.

Source code in neurogebra/logging/tiered_storage.py
class TieredStorage:
    """
    Backend for :class:`TrainingLogger` that writes events into three
    separate NDJSON files based on their tier.

    Attributes:
        basic_path: Path to ``basic.log``.
        health_path: Path to ``health.log``.
        debug_path: Path to ``debug.log``.
    """

    def __init__(
        self,
        base_dir: str = "./training_logs",
        basic_filename: str = "basic.log",
        health_filename: str = "health.log",
        debug_filename: str = "debug.log",
        write_debug: bool = True,
        buffer_size: int = 50,
    ):
        """
        Args:
            base_dir: Directory for log files.
            basic_filename: Name of the epoch-metrics log file.
            health_filename: Name of the health/warnings log file.
            debug_filename: Name of the debug-level log file.
            write_debug: Whether to write debug-tier events at all.
                         Set to ``False`` in production to save I/O.
            buffer_size: Number of events to buffer before flushing to disk.
        """
        self.base_dir = base_dir
        self.basic_path = os.path.join(base_dir, basic_filename)
        self.health_path = os.path.join(base_dir, health_filename)
        self.debug_path = os.path.join(base_dir, debug_filename)
        self.write_debug = write_debug

        self._buffer_size = buffer_size
        self._basic_buffer: List[str] = []
        self._health_buffer: List[str] = []
        self._debug_buffer: List[str] = []

        self._opened = False
        self._basic_fh = None
        self._health_fh = None
        self._debug_fh = None

        # Stats
        self.basic_count = 0
        self.health_count = 0
        self.debug_count = 0

    # ------------------------------------------------------------------
    # Backend interface (called by TrainingLogger._emit)
    # ------------------------------------------------------------------

    def handle_event(self, event: LogEvent) -> None:
        """Classify and route the event to the appropriate tier."""
        record = self._serialise(event)
        line = json.dumps(record, default=str)

        tier = self._classify(event)

        if tier == "basic":
            self._basic_buffer.append(line)
            self.basic_count += 1
            if len(self._basic_buffer) >= self._buffer_size:
                self._flush_buffer("basic")

        elif tier == "health":
            self._health_buffer.append(line)
            self.health_count += 1
            if len(self._health_buffer) >= self._buffer_size:
                self._flush_buffer("health")

        else:  # debug
            if self.write_debug:
                self._debug_buffer.append(line)
                self.debug_count += 1
                if len(self._debug_buffer) >= self._buffer_size:
                    self._flush_buffer("debug")

    # ------------------------------------------------------------------
    # Specific event handlers for named dispatch
    # ------------------------------------------------------------------

    def handle_train_start(self, event: LogEvent) -> None:
        self.handle_event(event)

    def handle_train_end(self, event: LogEvent) -> None:
        self.handle_event(event)
        self.flush()

    def handle_epoch_start(self, event: LogEvent) -> None:
        self.handle_event(event)

    def handle_epoch_end(self, event: LogEvent) -> None:
        self.handle_event(event)
        # Flush basic tier at end of each epoch
        self._flush_buffer("basic")

    def handle_health_check(self, event: LogEvent) -> None:
        self.handle_event(event)
        # Health events are flushed immediately (important)
        self._flush_buffer("health")

    # ------------------------------------------------------------------
    # Classification
    # ------------------------------------------------------------------

    @staticmethod
    def _classify(event: LogEvent) -> str:
        """Return 'basic', 'health', or 'debug'."""
        if event.event_type in _HEALTH_EVENTS:
            return "health"
        if event.severity in _HEALTH_SEVERITIES:
            return "health"
        if event.event_type in _BASIC_EVENTS:
            return "basic"
        return "debug"

    # ------------------------------------------------------------------
    # Serialisation
    # ------------------------------------------------------------------

    @staticmethod
    def _serialise(event: LogEvent) -> Dict[str, Any]:
        return {
            "event_type": event.event_type,
            "level": event.level.name,
            "timestamp": event.timestamp,
            "epoch": event.epoch,
            "batch": event.batch,
            "layer_name": event.layer_name,
            "layer_index": event.layer_index,
            "severity": event.severity,
            "message": event.message,
            "data": _safe(event.data),
        }

    # ------------------------------------------------------------------
    # File I/O
    # ------------------------------------------------------------------

    def _ensure_dir(self) -> None:
        if not self._opened:
            os.makedirs(self.base_dir, exist_ok=True)
            self._opened = True

    def _flush_buffer(self, tier: str) -> None:
        buf_attr = f"_{tier}_buffer"
        path_attr = f"{tier}_path"
        buf: List[str] = getattr(self, buf_attr)
        if not buf:
            return
        self._ensure_dir()
        path = getattr(self, path_attr)
        with open(path, "a", encoding="utf-8") as f:
            for line in buf:
                f.write(line + "\n")
        buf.clear()

    def flush(self) -> None:
        """Flush all buffered events to disk."""
        self._flush_buffer("basic")
        self._flush_buffer("health")
        if self.write_debug:
            self._flush_buffer("debug")

    def close(self) -> None:
        """Flush and release resources."""
        self.flush()

    # ------------------------------------------------------------------
    # Reading helpers
    # ------------------------------------------------------------------

    def read_basic(self) -> List[Dict[str, Any]]:
        """Read all basic-tier events from disk."""
        return self._read_ndjson(self.basic_path)

    def read_health(self) -> List[Dict[str, Any]]:
        """Read all health-tier events from disk."""
        return self._read_ndjson(self.health_path)

    def read_debug(self) -> List[Dict[str, Any]]:
        """Read all debug-tier events from disk."""
        return self._read_ndjson(self.debug_path)

    @staticmethod
    def _read_ndjson(path: str) -> List[Dict[str, Any]]:
        if not os.path.exists(path):
            return []
        events = []
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if line:
                    try:
                        events.append(json.loads(line))
                    except json.JSONDecodeError:
                        continue
        return events

    # ------------------------------------------------------------------
    # Utilities
    # ------------------------------------------------------------------

    def summary(self) -> Dict[str, Any]:
        """Return file-size and event-count statistics."""
        def _size(path):
            try:
                return os.path.getsize(path)
            except OSError:
                return 0

        return {
            "basic": {"events": self.basic_count, "size_bytes": _size(self.basic_path)},
            "health": {"events": self.health_count, "size_bytes": _size(self.health_path)},
            "debug": {"events": self.debug_count, "size_bytes": _size(self.debug_path)},
            "total_events": self.basic_count + self.health_count + self.debug_count,
        }

Functions

__init__(base_dir='./training_logs', basic_filename='basic.log', health_filename='health.log', debug_filename='debug.log', write_debug=True, buffer_size=50)

Parameters:

Name Type Description Default
base_dir str

Directory for log files.

'./training_logs'
basic_filename str

Name of the epoch-metrics log file.

'basic.log'
health_filename str

Name of the health/warnings log file.

'health.log'
debug_filename str

Name of the debug-level log file.

'debug.log'
write_debug bool

Whether to write debug-tier events at all. Set to False in production to save I/O.

True
buffer_size int

Number of events to buffer before flushing to disk.

50
Source code in neurogebra/logging/tiered_storage.py
def __init__(
    self,
    base_dir: str = "./training_logs",
    basic_filename: str = "basic.log",
    health_filename: str = "health.log",
    debug_filename: str = "debug.log",
    write_debug: bool = True,
    buffer_size: int = 50,
):
    """
    Args:
        base_dir: Directory for log files.
        basic_filename: Name of the epoch-metrics log file.
        health_filename: Name of the health/warnings log file.
        debug_filename: Name of the debug-level log file.
        write_debug: Whether to write debug-tier events at all.
                     Set to ``False`` in production to save I/O.
        buffer_size: Number of events to buffer before flushing to disk.
    """
    self.base_dir = base_dir
    self.basic_path = os.path.join(base_dir, basic_filename)
    self.health_path = os.path.join(base_dir, health_filename)
    self.debug_path = os.path.join(base_dir, debug_filename)
    self.write_debug = write_debug

    self._buffer_size = buffer_size
    self._basic_buffer: List[str] = []
    self._health_buffer: List[str] = []
    self._debug_buffer: List[str] = []

    self._opened = False
    self._basic_fh = None
    self._health_fh = None
    self._debug_fh = None

    # Stats
    self.basic_count = 0
    self.health_count = 0
    self.debug_count = 0

handle_event(event)

Classify and route the event to the appropriate tier.

Source code in neurogebra/logging/tiered_storage.py
def handle_event(self, event: LogEvent) -> None:
    """Classify and route the event to the appropriate tier."""
    record = self._serialise(event)
    line = json.dumps(record, default=str)

    tier = self._classify(event)

    if tier == "basic":
        self._basic_buffer.append(line)
        self.basic_count += 1
        if len(self._basic_buffer) >= self._buffer_size:
            self._flush_buffer("basic")

    elif tier == "health":
        self._health_buffer.append(line)
        self.health_count += 1
        if len(self._health_buffer) >= self._buffer_size:
            self._flush_buffer("health")

    else:  # debug
        if self.write_debug:
            self._debug_buffer.append(line)
            self.debug_count += 1
            if len(self._debug_buffer) >= self._buffer_size:
                self._flush_buffer("debug")

flush()

Flush all buffered events to disk.

Source code in neurogebra/logging/tiered_storage.py
def flush(self) -> None:
    """Flush all buffered events to disk."""
    self._flush_buffer("basic")
    self._flush_buffer("health")
    if self.write_debug:
        self._flush_buffer("debug")

close()

Flush and release resources.

Source code in neurogebra/logging/tiered_storage.py
def close(self) -> None:
    """Flush and release resources."""
    self.flush()

read_basic()

Read all basic-tier events from disk.

Source code in neurogebra/logging/tiered_storage.py
def read_basic(self) -> List[Dict[str, Any]]:
    """Read all basic-tier events from disk."""
    return self._read_ndjson(self.basic_path)

read_health()

Read all health-tier events from disk.

Source code in neurogebra/logging/tiered_storage.py
def read_health(self) -> List[Dict[str, Any]]:
    """Read all health-tier events from disk."""
    return self._read_ndjson(self.health_path)

read_debug()

Read all debug-tier events from disk.

Source code in neurogebra/logging/tiered_storage.py
def read_debug(self) -> List[Dict[str, Any]]:
    """Read all debug-tier events from disk."""
    return self._read_ndjson(self.debug_path)

summary()

Return file-size and event-count statistics.

Source code in neurogebra/logging/tiered_storage.py
def summary(self) -> Dict[str, Any]:
    """Return file-size and event-count statistics."""
    def _size(path):
        try:
            return os.path.getsize(path)
        except OSError:
            return 0

    return {
        "basic": {"events": self.basic_count, "size_bytes": _size(self.basic_path)},
        "health": {"events": self.health_count, "size_bytes": _size(self.health_path)},
        "debug": {"events": self.debug_count, "size_bytes": _size(self.debug_path)},
        "total_events": self.basic_count + self.health_count + self.debug_count,
    }

DashboardExporter

neurogebra.logging.dashboard.DashboardExporter

Advanced HTML dashboard backend.

Collects metrics during training and generates a self-contained interactive HTML file with Chart.js visualisations.

Source code in neurogebra/logging/dashboard.py
class DashboardExporter:
    """
    Advanced HTML dashboard backend.

    Collects metrics during training and generates a self-contained
    interactive HTML file with Chart.js visualisations.
    """

    def __init__(self, path: str = "training_logs/dashboard.html"):
        self.path = path
        self._events: List[LogEvent] = []
        self._epoch_metrics: List[Dict] = []
        self._gradient_data: Dict[str, List[float]] = {}  # layer → [norms per epoch]
        self._weight_data: Dict[str, List[Dict]] = {}     # layer → [stats per epoch]
        self._health_events: List[Dict] = []
        self._train_info: Dict[str, Any] = {}
        self._batch_losses: List[float] = []

    # ------------------------------------------------------------------
    # Backend interface
    # ------------------------------------------------------------------

    def handle_event(self, event: LogEvent) -> None:
        self._events.append(event)

    def handle_train_start(self, event: LogEvent) -> None:
        self._train_info = event.data
        self._events.append(event)

    def handle_train_end(self, event: LogEvent) -> None:
        self._events.append(event)

    def handle_epoch_end(self, event: LogEvent) -> None:
        self._events.append(event)
        metrics = dict(event.data.get("metrics", {}))
        metrics["epoch"] = event.epoch
        metrics["epoch_time"] = event.data.get("epoch_time", 0)
        self._epoch_metrics.append(metrics)

    def handle_batch_end(self, event: LogEvent) -> None:
        self._events.append(event)
        loss = event.data.get("loss")
        if loss is not None:
            self._batch_losses.append(float(loss))

    def handle_health_check(self, event: LogEvent) -> None:
        self._events.append(event)
        self._health_events.append({
            "epoch": event.epoch,
            "severity": event.severity,
            "message": event.message,
            "check": event.data.get("check", ""),
            "recommendations": event.data.get("recommendations", []),
            "timestamp": event.timestamp,
        })

    def handle_layer_forward(self, event: LogEvent) -> None:
        self._events.append(event)

    def handle_layer_backward(self, event: LogEvent) -> None:
        self._events.append(event)
        grad_stats = event.data.get("grad_weights_stats")
        if grad_stats and event.layer_name:
            norms = self._gradient_data.setdefault(event.layer_name, [])
            norms.append(grad_stats.get("norm_l2", 0))

    def handle_weight_updated(self, event: LogEvent) -> None:
        self._events.append(event)

    # ------------------------------------------------------------------
    # Save
    # ------------------------------------------------------------------

    def save(self) -> str:
        """Generate and save the HTML dashboard. Returns the file path."""
        os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True)

        epochs_list = list(range(1, len(self._epoch_metrics) + 1))
        losses = [m.get("loss", 0) for m in self._epoch_metrics]
        val_losses = [m.get("val_loss", 0) for m in self._epoch_metrics]
        accs = [m.get("accuracy", 0) for m in self._epoch_metrics]
        val_accs = [m.get("val_accuracy", 0) for m in self._epoch_metrics]
        epoch_times = [m.get("epoch_time", 0) for m in self._epoch_metrics]

        # Gradient data for heatmap
        grad_layers = list(self._gradient_data.keys())
        grad_matrix = [self._gradient_data.get(l, []) for l in grad_layers]

        # Health timeline
        health_rows = ""
        for h in self._health_events:
            sev = h["severity"]
            colour = {
                "danger": "#e74c3c", "warning": "#f39c12",
                "critical": "#c0392b", "success": "#2ecc71",
                "info": "#3498db",
            }.get(sev, "#95a5a6")
            recs = h.get("recommendations", [])
            rec_html = "<ul>" + "".join(f"<li>{r}</li>" for r in recs) + "</ul>" if recs else ""
            health_rows += (
                f'<tr style="border-left:4px solid {colour}">'
                f'<td>E{h.get("epoch", "?")}</td>'
                f"<td><span class='badge' style='background:{colour}'>{sev.upper()}</span></td>"
                f"<td>{h['message']}</td>"
                f"<td>{rec_html}</td></tr>\n"
            )

        # Model info
        model_info = self._train_info.get("model_info", {})
        n_epochs = self._train_info.get("total_epochs", len(epochs_list))
        batch_size = self._train_info.get("batch_size", "?")

        html = _DASHBOARD_TEMPLATE.format(
            timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
            n_epochs=n_epochs,
            batch_size=batch_size,
            final_loss=f"{losses[-1]:.6f}" if losses else "—",
            final_acc=f"{accs[-1]:.4f}" if accs else "—",
            final_val_loss=f"{val_losses[-1]:.6f}" if val_losses else "—",
            final_val_acc=f"{val_accs[-1]:.4f}" if val_accs else "—",
            total_events=len(self._events),
            n_warnings=sum(1 for h in self._health_events if h["severity"] in ("warning", "danger", "critical")),
            epochs=json.dumps(epochs_list),
            losses=json.dumps(losses),
            val_losses=json.dumps(val_losses),
            accs=json.dumps(accs),
            val_accs=json.dumps(val_accs),
            epoch_times=json.dumps(epoch_times),
            batch_losses=json.dumps(self._batch_losses[:2000]),  # cap for perf
            grad_layers=json.dumps(grad_layers),
            grad_matrix=json.dumps(grad_matrix),
            health_rows=health_rows,
            model_info=json.dumps(model_info, indent=2, default=str),
        )

        with open(self.path, "w", encoding="utf-8") as f:
            f.write(html)
        return self.path

Functions

save()

Generate and save the HTML dashboard. Returns the file path.

Source code in neurogebra/logging/dashboard.py
def save(self) -> str:
    """Generate and save the HTML dashboard. Returns the file path."""
    os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True)

    epochs_list = list(range(1, len(self._epoch_metrics) + 1))
    losses = [m.get("loss", 0) for m in self._epoch_metrics]
    val_losses = [m.get("val_loss", 0) for m in self._epoch_metrics]
    accs = [m.get("accuracy", 0) for m in self._epoch_metrics]
    val_accs = [m.get("val_accuracy", 0) for m in self._epoch_metrics]
    epoch_times = [m.get("epoch_time", 0) for m in self._epoch_metrics]

    # Gradient data for heatmap
    grad_layers = list(self._gradient_data.keys())
    grad_matrix = [self._gradient_data.get(l, []) for l in grad_layers]

    # Health timeline
    health_rows = ""
    for h in self._health_events:
        sev = h["severity"]
        colour = {
            "danger": "#e74c3c", "warning": "#f39c12",
            "critical": "#c0392b", "success": "#2ecc71",
            "info": "#3498db",
        }.get(sev, "#95a5a6")
        recs = h.get("recommendations", [])
        rec_html = "<ul>" + "".join(f"<li>{r}</li>" for r in recs) + "</ul>" if recs else ""
        health_rows += (
            f'<tr style="border-left:4px solid {colour}">'
            f'<td>E{h.get("epoch", "?")}</td>'
            f"<td><span class='badge' style='background:{colour}'>{sev.upper()}</span></td>"
            f"<td>{h['message']}</td>"
            f"<td>{rec_html}</td></tr>\n"
        )

    # Model info
    model_info = self._train_info.get("model_info", {})
    n_epochs = self._train_info.get("total_epochs", len(epochs_list))
    batch_size = self._train_info.get("batch_size", "?")

    html = _DASHBOARD_TEMPLATE.format(
        timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
        n_epochs=n_epochs,
        batch_size=batch_size,
        final_loss=f"{losses[-1]:.6f}" if losses else "—",
        final_acc=f"{accs[-1]:.4f}" if accs else "—",
        final_val_loss=f"{val_losses[-1]:.6f}" if val_losses else "—",
        final_val_acc=f"{val_accs[-1]:.4f}" if val_accs else "—",
        total_events=len(self._events),
        n_warnings=sum(1 for h in self._health_events if h["severity"] in ("warning", "danger", "critical")),
        epochs=json.dumps(epochs_list),
        losses=json.dumps(losses),
        val_losses=json.dumps(val_losses),
        accs=json.dumps(accs),
        val_accs=json.dumps(val_accs),
        epoch_times=json.dumps(epoch_times),
        batch_losses=json.dumps(self._batch_losses[:2000]),  # cap for perf
        grad_layers=json.dumps(grad_layers),
        grad_matrix=json.dumps(grad_matrix),
        health_rows=health_rows,
        model_info=json.dumps(model_info, indent=2, default=str),
    )

    with open(self.path, "w", encoding="utf-8") as f:
        f.write(html)
    return self.path

TensorBoardBridge

neurogebra.logging.dashboard.TensorBoardBridge

Write Training Observatory events to TensorBoard.

Requires tensorboard to be installed (pip install neurogebra[logging]).

Source code in neurogebra/logging/dashboard.py
class TensorBoardBridge:
    """
    Write Training Observatory events to TensorBoard.

    Requires ``tensorboard`` to be installed
    (``pip install neurogebra[logging]``).
    """

    def __init__(self, log_dir: str = "./tb_logs"):
        self.log_dir = log_dir
        self._writer = None
        self._step = 0
        try:
            from torch.utils.tensorboard import SummaryWriter
            self._writer = SummaryWriter(log_dir=log_dir)
        except ImportError:
            pass  # TensorBoard not available

    @property
    def available(self) -> bool:
        return self._writer is not None

    def handle_event(self, event: LogEvent) -> None:
        if not self._writer:
            return
        if event.event_type == "epoch_end":
            metrics = event.data.get("metrics", {})
            epoch = event.epoch or self._step
            for key, val in metrics.items():
                if isinstance(val, (int, float)):
                    self._writer.add_scalar(f"metrics/{key}", val, epoch)
            self._step += 1

    def handle_epoch_end(self, event: LogEvent) -> None:
        self.handle_event(event)

    def handle_health_check(self, event: LogEvent) -> None:
        if not self._writer:
            return
        self._writer.add_text(
            "health_checks",
            f"**[{event.severity.upper()}]** {event.message}",
            self._step,
        )

    def close(self) -> None:
        if self._writer:
            self._writer.close()

WandBBridge

neurogebra.logging.dashboard.WandBBridge

Log Training Observatory events to Weights & Biases.

Requires wandb to be installed (pip install neurogebra[logging]).

Source code in neurogebra/logging/dashboard.py
class WandBBridge:
    """
    Log Training Observatory events to Weights & Biases.

    Requires ``wandb`` to be installed (``pip install neurogebra[logging]``).
    """

    def __init__(self, project: str = "neurogebra", run_name: Optional[str] = None,
                 config: Optional[Dict] = None):
        self._run = None
        try:
            import wandb
            self._run = wandb.init(
                project=project,
                name=run_name,
                config=config or {},
                reinit=True,
            )
        except ImportError:
            pass

    @property
    def available(self) -> bool:
        return self._run is not None

    def handle_event(self, event: LogEvent) -> None:
        if not self._run:
            return
        import wandb
        if event.event_type == "epoch_end":
            metrics = event.data.get("metrics", {})
            wandb.log({k: v for k, v in metrics.items() if isinstance(v, (int, float))},
                      step=event.epoch)

    def handle_epoch_end(self, event: LogEvent) -> None:
        self.handle_event(event)

    def handle_health_check(self, event: LogEvent) -> None:
        if not self._run:
            return
        import wandb
        wandb.alert(
            title=f"Health: {event.severity.upper()}",
            text=event.message,
            level=wandb.AlertLevel.WARN if event.severity == "warning" else wandb.AlertLevel.ERROR,
        )

    def close(self) -> None:
        if self._run:
            import wandb
            wandb.finish()

TrainingFingerprint

neurogebra.logging.fingerprint.TrainingFingerprint dataclass

Immutable reproducibility block for a training run.

Source code in neurogebra/logging/fingerprint.py
@dataclass
class TrainingFingerprint:
    """Immutable reproducibility block for a training run."""

    # Identifiers
    run_id: str = ""
    timestamp: str = ""
    timestamp_unix: float = 0.0

    # Seeds
    random_seed: Optional[int] = None
    numpy_seed: Optional[int] = None

    # Dataset
    dataset_hash: Optional[str] = None
    dataset_shape: Optional[tuple] = None
    dataset_dtype: Optional[str] = None
    dataset_samples: Optional[int] = None

    # Versions
    neurogebra_version: str = ""
    python_version: str = ""
    numpy_version: str = ""
    dependency_versions: Dict[str, str] = field(default_factory=dict)

    # Hardware
    cpu: str = ""
    cpu_count: int = 0
    ram_gb: float = 0.0
    gpu: Optional[str] = None
    os_info: str = ""
    machine: str = ""

    # Model
    model_architecture_hash: Optional[str] = None
    model_info: Dict[str, Any] = field(default_factory=dict)

    # Hyperparameters
    hyperparameters: Dict[str, Any] = field(default_factory=dict)

    # Git
    git_commit: Optional[str] = None
    git_branch: Optional[str] = None
    git_dirty: Optional[bool] = None

    # ------------------------------------------------------------------
    # Factory
    # ------------------------------------------------------------------

    @classmethod
    def capture(
        cls,
        *,
        model_info: Optional[Dict[str, Any]] = None,
        hyperparameters: Optional[Dict[str, Any]] = None,
        dataset: Optional[Union[np.ndarray, str]] = None,
        random_seed: Optional[int] = None,
    ) -> "TrainingFingerprint":
        """
        Capture the current environment and return a fingerprint.

        Args:
            model_info: Dict describing the model architecture.
            hyperparameters: Training hyperparameters dict.
            dataset: Either a numpy array (will be hashed) or a
                     pre-computed hash string.
            random_seed: The seed used for reproducibility.
        """
        fp = cls()
        fp.timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        fp.timestamp_unix = time.time()
        fp.run_id = hashlib.md5(str(fp.timestamp_unix).encode()).hexdigest()[:12]

        # Seeds
        fp.random_seed = random_seed
        fp.numpy_seed = random_seed  # typically same

        # Dataset
        if isinstance(dataset, np.ndarray):
            fp.dataset_hash = hashlib.sha256(dataset.tobytes()).hexdigest()[:16]
            fp.dataset_shape = dataset.shape
            fp.dataset_dtype = str(dataset.dtype)
            fp.dataset_samples = dataset.shape[0]
        elif isinstance(dataset, str):
            fp.dataset_hash = dataset

        # Versions
        fp.neurogebra_version = _get_neurogebra_version()
        fp.python_version = platform.python_version()
        fp.numpy_version = np.__version__
        fp.dependency_versions = _get_dependency_versions()

        # Hardware
        fp.cpu = platform.processor() or platform.machine()
        fp.cpu_count = os.cpu_count() or 0
        fp.ram_gb = _get_ram_gb()
        fp.gpu = _detect_gpu()
        fp.os_info = f"{platform.system()} {platform.release()}"
        fp.machine = platform.machine()

        # Model
        fp.model_info = model_info or {}
        if model_info:
            fp.model_architecture_hash = hashlib.md5(
                str(sorted(model_info.items())).encode()
            ).hexdigest()[:12]

        # Hyperparameters
        fp.hyperparameters = hyperparameters or {}

        # Git
        fp.git_commit, fp.git_branch, fp.git_dirty = _get_git_info()

        return fp

    # ------------------------------------------------------------------
    # Serialisation
    # ------------------------------------------------------------------

    def to_dict(self) -> Dict[str, Any]:
        """Return a JSON-serialisable dict."""
        return {
            "run_id": self.run_id,
            "timestamp": self.timestamp,
            "timestamp_unix": self.timestamp_unix,
            "seeds": {
                "random_seed": self.random_seed,
                "numpy_seed": self.numpy_seed,
            },
            "dataset": {
                "hash": self.dataset_hash,
                "shape": list(self.dataset_shape) if self.dataset_shape else None,
                "dtype": self.dataset_dtype,
                "samples": self.dataset_samples,
            },
            "versions": {
                "neurogebra": self.neurogebra_version,
                "python": self.python_version,
                "numpy": self.numpy_version,
                **self.dependency_versions,
            },
            "hardware": {
                "cpu": self.cpu,
                "cpu_count": self.cpu_count,
                "ram_gb": round(self.ram_gb, 2),
                "gpu": self.gpu,
                "os": self.os_info,
                "machine": self.machine,
            },
            "model": {
                "architecture_hash": self.model_architecture_hash,
                "info": self.model_info,
            },
            "hyperparameters": self.hyperparameters,
            "git": {
                "commit": self.git_commit,
                "branch": self.git_branch,
                "dirty": self.git_dirty,
            },
        }

    @classmethod
    def from_dict(cls, d: Dict[str, Any]) -> "TrainingFingerprint":
        """Reconstruct from a dict (e.g. loaded from JSON)."""
        fp = cls()
        fp.run_id = d.get("run_id", "")
        fp.timestamp = d.get("timestamp", "")
        fp.timestamp_unix = d.get("timestamp_unix", 0.0)

        seeds = d.get("seeds", {})
        fp.random_seed = seeds.get("random_seed")
        fp.numpy_seed = seeds.get("numpy_seed")

        ds = d.get("dataset", {})
        fp.dataset_hash = ds.get("hash")
        fp.dataset_shape = tuple(ds["shape"]) if ds.get("shape") else None
        fp.dataset_dtype = ds.get("dtype")
        fp.dataset_samples = ds.get("samples")

        vers = d.get("versions", {})
        fp.neurogebra_version = vers.get("neurogebra", "")
        fp.python_version = vers.get("python", "")
        fp.numpy_version = vers.get("numpy", "")
        fp.dependency_versions = {
            k: v for k, v in vers.items()
            if k not in ("neurogebra", "python", "numpy")
        }

        hw = d.get("hardware", {})
        fp.cpu = hw.get("cpu", "")
        fp.cpu_count = hw.get("cpu_count", 0)
        fp.ram_gb = hw.get("ram_gb", 0.0)
        fp.gpu = hw.get("gpu")
        fp.os_info = hw.get("os", "")
        fp.machine = hw.get("machine", "")

        model = d.get("model", {})
        fp.model_architecture_hash = model.get("architecture_hash")
        fp.model_info = model.get("info", {})

        fp.hyperparameters = d.get("hyperparameters", {})

        git = d.get("git", {})
        fp.git_commit = git.get("commit")
        fp.git_branch = git.get("branch")
        fp.git_dirty = git.get("dirty")

        return fp

    def format_text(self) -> str:
        """Human-readable fingerprint summary."""
        lines = [
            f"╔══ Training Fingerprint ══╗",
            f"  Run ID:       {self.run_id}",
            f"  Timestamp:    {self.timestamp}",
            f"  Seed:         {self.random_seed}",
        ]
        if self.dataset_hash:
            lines.append(f"  Dataset Hash: {self.dataset_hash}")
        if self.dataset_shape:
            lines.append(f"  Dataset:      {self.dataset_shape} ({self.dataset_dtype})")
        lines.extend([
            f"  Neurogebra:   {self.neurogebra_version}",
            f"  Python:       {self.python_version}",
            f"  NumPy:        {self.numpy_version}",
            f"  CPU:          {self.cpu} ({self.cpu_count} cores)",
            f"  RAM:          {self.ram_gb:.1f} GB",
            f"  GPU:          {self.gpu or 'None'}",
            f"  OS:           {self.os_info}",
        ])
        if self.git_commit:
            dirty = " (dirty)" if self.git_dirty else ""
            lines.append(f"  Git:          {self.git_branch}@{self.git_commit[:8]}{dirty}")
        if self.model_architecture_hash:
            lines.append(f"  Model Hash:   {self.model_architecture_hash}")
        if self.hyperparameters:
            lines.append(f"  Hyperparams:  {self.hyperparameters}")
        lines.append("╚═════════════════════════╝")
        return "\n".join(lines)

Functions

capture(*, model_info=None, hyperparameters=None, dataset=None, random_seed=None) classmethod

Capture the current environment and return a fingerprint.

Parameters:

Name Type Description Default
model_info Optional[Dict[str, Any]]

Dict describing the model architecture.

None
hyperparameters Optional[Dict[str, Any]]

Training hyperparameters dict.

None
dataset Optional[Union[ndarray, str]]

Either a numpy array (will be hashed) or a pre-computed hash string.

None
random_seed Optional[int]

The seed used for reproducibility.

None
Source code in neurogebra/logging/fingerprint.py
@classmethod
def capture(
    cls,
    *,
    model_info: Optional[Dict[str, Any]] = None,
    hyperparameters: Optional[Dict[str, Any]] = None,
    dataset: Optional[Union[np.ndarray, str]] = None,
    random_seed: Optional[int] = None,
) -> "TrainingFingerprint":
    """
    Capture the current environment and return a fingerprint.

    Args:
        model_info: Dict describing the model architecture.
        hyperparameters: Training hyperparameters dict.
        dataset: Either a numpy array (will be hashed) or a
                 pre-computed hash string.
        random_seed: The seed used for reproducibility.
    """
    fp = cls()
    fp.timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    fp.timestamp_unix = time.time()
    fp.run_id = hashlib.md5(str(fp.timestamp_unix).encode()).hexdigest()[:12]

    # Seeds
    fp.random_seed = random_seed
    fp.numpy_seed = random_seed  # typically same

    # Dataset
    if isinstance(dataset, np.ndarray):
        fp.dataset_hash = hashlib.sha256(dataset.tobytes()).hexdigest()[:16]
        fp.dataset_shape = dataset.shape
        fp.dataset_dtype = str(dataset.dtype)
        fp.dataset_samples = dataset.shape[0]
    elif isinstance(dataset, str):
        fp.dataset_hash = dataset

    # Versions
    fp.neurogebra_version = _get_neurogebra_version()
    fp.python_version = platform.python_version()
    fp.numpy_version = np.__version__
    fp.dependency_versions = _get_dependency_versions()

    # Hardware
    fp.cpu = platform.processor() or platform.machine()
    fp.cpu_count = os.cpu_count() or 0
    fp.ram_gb = _get_ram_gb()
    fp.gpu = _detect_gpu()
    fp.os_info = f"{platform.system()} {platform.release()}"
    fp.machine = platform.machine()

    # Model
    fp.model_info = model_info or {}
    if model_info:
        fp.model_architecture_hash = hashlib.md5(
            str(sorted(model_info.items())).encode()
        ).hexdigest()[:12]

    # Hyperparameters
    fp.hyperparameters = hyperparameters or {}

    # Git
    fp.git_commit, fp.git_branch, fp.git_dirty = _get_git_info()

    return fp

to_dict()

Return a JSON-serialisable dict.

Source code in neurogebra/logging/fingerprint.py
def to_dict(self) -> Dict[str, Any]:
    """Return a JSON-serialisable dict."""
    return {
        "run_id": self.run_id,
        "timestamp": self.timestamp,
        "timestamp_unix": self.timestamp_unix,
        "seeds": {
            "random_seed": self.random_seed,
            "numpy_seed": self.numpy_seed,
        },
        "dataset": {
            "hash": self.dataset_hash,
            "shape": list(self.dataset_shape) if self.dataset_shape else None,
            "dtype": self.dataset_dtype,
            "samples": self.dataset_samples,
        },
        "versions": {
            "neurogebra": self.neurogebra_version,
            "python": self.python_version,
            "numpy": self.numpy_version,
            **self.dependency_versions,
        },
        "hardware": {
            "cpu": self.cpu,
            "cpu_count": self.cpu_count,
            "ram_gb": round(self.ram_gb, 2),
            "gpu": self.gpu,
            "os": self.os_info,
            "machine": self.machine,
        },
        "model": {
            "architecture_hash": self.model_architecture_hash,
            "info": self.model_info,
        },
        "hyperparameters": self.hyperparameters,
        "git": {
            "commit": self.git_commit,
            "branch": self.git_branch,
            "dirty": self.git_dirty,
        },
    }

from_dict(d) classmethod

Reconstruct from a dict (e.g. loaded from JSON).

Source code in neurogebra/logging/fingerprint.py
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "TrainingFingerprint":
    """Reconstruct from a dict (e.g. loaded from JSON)."""
    fp = cls()
    fp.run_id = d.get("run_id", "")
    fp.timestamp = d.get("timestamp", "")
    fp.timestamp_unix = d.get("timestamp_unix", 0.0)

    seeds = d.get("seeds", {})
    fp.random_seed = seeds.get("random_seed")
    fp.numpy_seed = seeds.get("numpy_seed")

    ds = d.get("dataset", {})
    fp.dataset_hash = ds.get("hash")
    fp.dataset_shape = tuple(ds["shape"]) if ds.get("shape") else None
    fp.dataset_dtype = ds.get("dtype")
    fp.dataset_samples = ds.get("samples")

    vers = d.get("versions", {})
    fp.neurogebra_version = vers.get("neurogebra", "")
    fp.python_version = vers.get("python", "")
    fp.numpy_version = vers.get("numpy", "")
    fp.dependency_versions = {
        k: v for k, v in vers.items()
        if k not in ("neurogebra", "python", "numpy")
    }

    hw = d.get("hardware", {})
    fp.cpu = hw.get("cpu", "")
    fp.cpu_count = hw.get("cpu_count", 0)
    fp.ram_gb = hw.get("ram_gb", 0.0)
    fp.gpu = hw.get("gpu")
    fp.os_info = hw.get("os", "")
    fp.machine = hw.get("machine", "")

    model = d.get("model", {})
    fp.model_architecture_hash = model.get("architecture_hash")
    fp.model_info = model.get("info", {})

    fp.hyperparameters = d.get("hyperparameters", {})

    git = d.get("git", {})
    fp.git_commit = git.get("commit")
    fp.git_branch = git.get("branch")
    fp.git_dirty = git.get("dirty")

    return fp

format_text()

Human-readable fingerprint summary.

Source code in neurogebra/logging/fingerprint.py
def format_text(self) -> str:
    """Human-readable fingerprint summary."""
    lines = [
        f"╔══ Training Fingerprint ══╗",
        f"  Run ID:       {self.run_id}",
        f"  Timestamp:    {self.timestamp}",
        f"  Seed:         {self.random_seed}",
    ]
    if self.dataset_hash:
        lines.append(f"  Dataset Hash: {self.dataset_hash}")
    if self.dataset_shape:
        lines.append(f"  Dataset:      {self.dataset_shape} ({self.dataset_dtype})")
    lines.extend([
        f"  Neurogebra:   {self.neurogebra_version}",
        f"  Python:       {self.python_version}",
        f"  NumPy:        {self.numpy_version}",
        f"  CPU:          {self.cpu} ({self.cpu_count} cores)",
        f"  RAM:          {self.ram_gb:.1f} GB",
        f"  GPU:          {self.gpu or 'None'}",
        f"  OS:           {self.os_info}",
    ])
    if self.git_commit:
        dirty = " (dirty)" if self.git_dirty else ""
        lines.append(f"  Git:          {self.git_branch}@{self.git_commit[:8]}{dirty}")
    if self.model_architecture_hash:
        lines.append(f"  Model Hash:   {self.model_architecture_hash}")
    if self.hyperparameters:
        lines.append(f"  Hyperparams:  {self.hyperparameters}")
    lines.append("╚═════════════════════════╝")
    return "\n".join(lines)