What You Will Learn
In Guide 08, you built an Isolation Forest and a basic autoencoder to flag anomalous AMI voltage readings. Both methods work well for batch analysis, but they have limitations: the autoencoder gives a reconstruction error with no probabilistic interpretation, and neither method handles streaming data natively. In this guide you will:
- Build a Variational Autoencoder (VAE) that learns a probability distribution over voltage patterns, not just a point encoding
- Use the Evidence Lower Bound (ELBO) to compute principled, probabilistic anomaly scores
- Compare the VAE against the basic autoencoder and Isolation Forest with precision-recall analysis
- Design and implement a streaming sliding window pipeline for real-time anomaly detection
- Add adaptive thresholding that adjusts for seasonal voltage variance
- Simulate real-time operation by replaying one week of AMI data and correlating detections with actual outage events
SP&L Data You Will Use
- timeseries/ami_15min_sample.parquet — 15-minute voltage and consumption readings from 2,400 service points
- weather/hourly_observations.csv — temperature and weather conditions for seasonal context
- outages/outage_events.csv — actual outage events for validation and correlation
Additional Libraries
pip install torch pyarrow
Same dependencies as Guide 08. torch (PyTorch) powers the VAE architecture. All streaming logic uses standard Python and pandas.
Part A: Variational Autoencoder
In Guide 08, the basic autoencoder compressed voltage features into a fixed latent vector and reconstructed them. Anomalies had high reconstruction error. This works, but the latent space is unstructured—there is no guarantee that nearby latent points represent similar voltage patterns, and there is no way to quantify how unlikely a given reading is.
A Variational Autoencoder (VAE) fixes this by learning a probability distribution over the latent space. Instead of encoding each input to a single point, the encoder outputs the parameters of a Gaussian distribution—a mean vector and a variance vector. We then sample from this distribution to decode. This gives us two major advantages:
- Probabilistic anomaly scores: We can compute how likely a reading is under the learned distribution, not just a reconstruction error
- Structured latent space: The KL divergence regularizer forces the latent space to be smooth and continuous, meaning similar voltage patterns cluster together
Intuition: Think of the basic autoencoder as a filing cabinet—it stores each voltage pattern in a specific drawer. The VAE is more like a map: it places each pattern on a continuous landscape where nearby points are similar. An anomaly is a reading that lands far from any populated region on the map, and we can measure exactly how far.
Let's start by loading the same data from Guide 08 and preparing our features.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.metrics import precision_recall_curve, average_precision_score
DATA_DIR = "sisyphean-power-and-light/"
ami = pd.read_parquet(DATA_DIR + "timeseries/ami_15min_sample.parquet")
ami["timestamp"] = pd.to_datetime(ami["timestamp"])
hourly = ami.groupby(["meter_id", ami["timestamp"].dt.floor("h")]).agg(
voltage_mean=("voltage", "mean"),
voltage_std=("voltage", "std"),
voltage_min=("voltage", "min"),
voltage_max=("voltage", "max"),
consumption_kwh=("consumption_kwh", "sum"),
).reset_index()
hourly["voltage_range"] = hourly["voltage_max"] - hourly["voltage_min"]
hourly = hourly.fillna(0)
feature_cols = ["voltage_mean", "voltage_std", "voltage_range",
"voltage_min", "voltage_max", "consumption_kwh"]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(hourly[feature_cols])
print(f"Hourly feature rows: {len(hourly):,}")
print(f"Features: {feature_cols}")
The VAE has three key differences from the basic autoencoder in Guide 08:
- The encoder outputs two vectors: mu (mean) and log_var (log-variance) instead of a single latent vector
- We use the reparameterization trick to sample from the latent distribution while keeping backpropagation working
- The loss function has two terms: reconstruction loss plus KL divergence
The reparameterization trick: We cannot backpropagate through a random sampling operation. The trick is to sample epsilon ~ N(0, 1) and then compute z = mu + sigma * epsilon. The randomness is in epsilon (which has no learnable parameters), so gradients flow through mu and sigma normally. This simple algebraic rearrangement is what makes VAE training possible.
class VoltageVAE(nn.Module):
def __init__(self, input_dim, latent_dim=3):
super().__init__()
self.input_dim = input_dim
self.latent_dim = latent_dim
self.encoder_shared = nn.Sequential(
nn.Linear(input_dim, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
)
self.fc_mu = nn.Linear(16, latent_dim)
self.fc_log_var = nn.Linear(16, latent_dim)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 16),
nn.ReLU(),
nn.Linear(16, 32),
nn.ReLU(),
nn.Linear(32, input_dim),
)
def encode(self, x):
"""Return mu and log_var for the latent distribution."""
h = self.encoder_shared(x)
mu = self.fc_mu(h)
log_var = self.fc_log_var(h)
return mu, log_var
def reparameterize(self, mu, log_var):
"""Sample z using the reparameterization trick."""
std = torch.exp(0.5 * log_var)
eps = torch.randn_like(std)
z = mu + std * eps
return z
def decode(self, z):
return self.decoder(z)
def forward(self, x):
mu, log_var = self.encode(x)
z = self.reparameterize(mu, log_var)
x_recon = self.decode(z)
return x_recon, mu, log_var
def vae_loss(x, x_recon, mu, log_var):
"""
ELBO loss = Reconstruction loss + KL divergence.
Returns total loss and both components separately for monitoring.
"""
recon_loss = F.mse_loss(x_recon, x, reduction="mean")
kl_loss = -0.5 * torch.mean(
torch.sum(1 + log_var - mu.pow(2) - log_var.exp(), dim=1)
)
total_loss = recon_loss + kl_loss
return total_loss, recon_loss, kl_loss
input_dim = len(feature_cols)
vae = VoltageVAE(input_dim, latent_dim=3)
print(vae)
print(f"\nTotal parameters: {sum(p.numel() for p in vae.parameters()):,}")
Why KL divergence? The KL term acts as a regularizer. Without it, the encoder could cheat by mapping each input to a tiny, isolated region of latent space—essentially memorizing the data. The KL term pulls the learned distribution toward a standard normal N(0, I), ensuring the latent space stays smooth and well-organized. Think of it as a "tidiness penalty" that prevents the model from scattering its encoding across unrelated regions.
Following the same approach as Guide 08, we train exclusively on normal data. We first run an Isolation Forest to identify and exclude obvious anomalies from the training set, then train the VAE to learn the distribution of normal voltage patterns.
iso_forest = IsolationForest(n_estimators=200, contamination=0.01, random_state=42)
iso_forest.fit(X_scaled)
iso_labels = iso_forest.predict(X_scaled)
normal_mask = iso_labels == 1
X_normal = X_scaled[normal_mask]
print(f"Training samples (normal only): {len(X_normal):,}")
split = int(len(X_normal) * 0.8)
train_tensor = torch.FloatTensor(X_normal[:split])
val_tensor = torch.FloatTensor(X_normal[split:])
train_loader = DataLoader(
TensorDataset(train_tensor, train_tensor),
batch_size=128, shuffle=True
)
optimizer = torch.optim.Adam(vae.parameters(), lr=0.001)
history = {"total": [], "recon": [], "kl": []}
for epoch in range(80):
vae.train()
epoch_total, epoch_recon, epoch_kl = 0, 0, 0
for batch_x, _ in train_loader:
x_recon, mu, log_var = vae(batch_x)
total, recon, kl = vae_loss(batch_x, x_recon, mu, log_var)
optimizer.zero_grad()
total.backward()
optimizer.step()
epoch_total += total.item()
epoch_recon += recon.item()
epoch_kl += kl.item()
n_batches = len(train_loader)
history["total"].append(epoch_total / n_batches)
history["recon"].append(epoch_recon / n_batches)
history["kl"].append(epoch_kl / n_batches)
if (epoch + 1) % 20 == 0:
print(f"Epoch {epoch+1:>3}/80 "
f"Total: {history['total'][-1]:.4f} "
f"Recon: {history['recon'][-1]:.4f} "
f"KL: {history['kl'][-1]:.4f}")
Epoch 20/80 Total: 0.4821 Recon: 0.3147 KL: 0.1674
Epoch 40/80 Total: 0.3956 Recon: 0.2483 KL: 0.1473
Epoch 60/80 Total: 0.3612 Recon: 0.2201 KL: 0.1411
Epoch 80/80 Total: 0.3498 Recon: 0.2104 KL: 0.1394
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
ax1.plot(history["recon"], color="#5FCCDB", label="Reconstruction Loss")
ax1.plot(history["kl"], color="#D69E2E", label="KL Divergence")
ax1.set_xlabel("Epoch")
ax1.set_ylabel("Loss")
ax1.set_title("VAE Training: Loss Components")
ax1.legend()
ax2.plot(history["total"], color="#2D6A7A", linewidth=2)
ax2.set_xlabel("Epoch")
ax2.set_ylabel("Total ELBO Loss")
ax2.set_title("VAE Training: Total Loss (ELBO)")
plt.tight_layout()
plt.show()
You should see the reconstruction loss decrease steadily while the KL divergence stabilizes after an initial drop. This balance is expected: the model learns to reconstruct well while keeping the latent space organized. If KL collapses to near zero, the model is ignoring the latent structure (a known problem called "posterior collapse").
Watch for posterior collapse: If the KL divergence drops below 0.01 while reconstruction loss remains high, the decoder may be ignoring the latent variable entirely. Remedies include KL annealing (gradually increasing the KL weight from 0 to 1 during training) or using a free-bits strategy. For this guide, the default hyperparameters should avoid this issue.
The basic autoencoder from Guide 08 uses reconstruction error as its anomaly score. The VAE gives us a richer signal: the Evidence Lower Bound (ELBO), which combines reconstruction probability with the KL divergence. A low ELBO means the data point is unlikely under the learned model—it is both hard to reconstruct and its latent encoding is far from the prior distribution.
ELBO as anomaly score: For each data point, the ELBO measures the total "surprise" of the model. The reconstruction term asks "can I reproduce this accurately?" and the KL term asks "does this encode to a reasonable location in latent space?" Normal data scores low on both. An anomaly might have a high reconstruction error (unusual voltage pattern), a high KL divergence (encoding far from the normal cluster), or both.
vae.eval()
all_tensor = torch.FloatTensor(X_scaled)
with torch.no_grad():
x_recon, mu, log_var = vae(all_tensor)
recon_error = torch.mean((all_tensor - x_recon) ** 2, dim=1)
kl_per_sample = -0.5 * torch.sum(
1 + log_var - mu.pow(2) - log_var.exp(), dim=1
)
elbo_score = recon_error + kl_per_sample
hourly["vae_recon_error"] = recon_error.numpy()
hourly["vae_kl_div"] = kl_per_sample.numpy()
hourly["vae_elbo_score"] = elbo_score.numpy()
elbo_threshold = hourly["vae_elbo_score"].quantile(0.99)
hourly["vae_anomaly"] = (hourly["vae_elbo_score"] > elbo_threshold).astype(int)
print(f"ELBO threshold (99th pctile): {elbo_threshold:.4f}")
print(f"VAE anomalies detected: {hourly['vae_anomaly'].sum()}")
print(f"\nScore components (anomalies only):")
anomalous = hourly[hourly["vae_anomaly"] == 1]
print(f" Avg reconstruction error: {anomalous['vae_recon_error'].mean():.4f}")
print(f" Avg KL divergence: {anomalous['vae_kl_div'].mean():.4f}")
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
normal_pts = hourly[hourly["vae_anomaly"] == 0]
anomaly_pts = hourly[hourly["vae_anomaly"] == 1]
ax1.scatter(normal_pts["vae_recon_error"], normal_pts["vae_kl_div"],
c="#5FCCDB", s=3, alpha=0.2, label="Normal")
ax1.scatter(anomaly_pts["vae_recon_error"], anomaly_pts["vae_kl_div"],
c="red", s=25, marker="x", label="Anomaly")
ax1.set_xlabel("Reconstruction Error")
ax1.set_ylabel("KL Divergence")
ax1.set_title("VAE Anomaly Score Components")
ax1.legend()
ax2.hist(hourly["vae_elbo_score"], bins=150, color="#5FCCDB", edgecolor="white")
ax2.axvline(x=elbo_threshold, color="red", linestyle="--",
label=f"Threshold ({elbo_threshold:.3f})")
ax2.set_xlabel("ELBO Anomaly Score")
ax2.set_ylabel("Frequency")
ax2.set_title("ELBO Score Distribution")
ax2.set_yscale("log")
ax2.legend()
plt.tight_layout()
plt.show()
In the left plot, notice how anomalies can be anomalous for different reasons: some have high reconstruction error but moderate KL divergence (unusual voltage patterns), while others have high KL divergence (encoding far from normal patterns). The ELBO score captures both failure modes.
To fairly compare all three approaches, we need ground-truth labels that are independent of any model. Using one model’s output (e.g., Isolation Forest flags) as pseudo-ground-truth to evaluate that same model would be circular and methodologically invalid. Instead, we inject synthetic anomalies—voltage sags, spikes, and drift—into a held-out portion of the data at known timestamps. This gives us an objective, model-independent ground truth for precision-recall evaluation.
np.random.seed(42)
n_total = len(X_scaled)
y_true = np.zeros(n_total, dtype=int)
n_inject = int(n_total * 0.02)
inject_idx = np.random.choice(n_total, size=n_inject, replace=False)
y_true[inject_idx] = 1
X_eval = X_scaled.copy()
for i, idx in enumerate(inject_idx):
anomaly_type = i % 3
if anomaly_type == 0:
X_eval[idx, 0] -= np.random.uniform(3.0, 5.0)
elif anomaly_type == 1:
X_eval[idx, 0] += np.random.uniform(3.0, 5.0)
else:
X_eval[idx, 1] += np.random.uniform(3.0, 6.0)
X_eval[idx, 2] += np.random.uniform(3.0, 6.0)
eval_tensor = torch.FloatTensor(X_eval)
print(f"Injected {n_inject} synthetic anomalies ({n_inject/n_total*100:.1f}%)")
print(f" Types: {n_inject//3} sags, {n_inject//3} spikes, {n_inject - 2*(n_inject//3)} high-variance")
class BasicAutoencoder(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 32), nn.ReLU(),
nn.Linear(32, 16), nn.ReLU(),
nn.Linear(16, 3),
)
self.decoder = nn.Sequential(
nn.Linear(3, 16), nn.ReLU(),
nn.Linear(16, 32), nn.ReLU(),
nn.Linear(32, input_dim),
)
def forward(self, x):
return self.decoder(self.encoder(x))
ae = BasicAutoencoder(input_dim)
ae_optimizer = torch.optim.Adam(ae.parameters(), lr=0.001)
ae_criterion = nn.MSELoss()
ae_loader = DataLoader(TensorDataset(train_tensor, train_tensor),
batch_size=128, shuffle=True)
for epoch in range(50):
ae.train()
for bx, by in ae_loader:
loss = ae_criterion(ae(bx), by)
ae_optimizer.zero_grad()
loss.backward()
ae_optimizer.step()
ae.eval()
with torch.no_grad():
ae_recon = ae(eval_tensor)
ae_error = torch.mean((eval_tensor - ae_recon) ** 2, dim=1).numpy()
iso_scores = -iso_forest.decision_function(X_eval)
vae.eval()
with torch.no_grad():
mu, log_var = vae.encode(eval_tensor)
z = vae.reparameterize(mu, log_var)
recon = vae.decode(z)
recon_err = torch.mean((eval_tensor - recon) ** 2, dim=1)
kl_div = -0.5 * torch.sum(1 + log_var - mu**2 - log_var.exp(), dim=1)
vae_elbo_scores = (recon_err + kl_div).numpy()
fig, ax = plt.subplots(figsize=(10, 7))
methods = {
"Isolation Forest": iso_scores,
"Basic Autoencoder": ae_error,
"VAE (ELBO Score)": vae_elbo_scores,
}
colors = ["#718096", "#D69E2E", "#2D6A7A"]
for (name, scores), color in zip(methods.items(), colors):
precision, recall, _ = precision_recall_curve(y_true, scores)
ap = average_precision_score(y_true, scores)
ax.plot(recall, precision, label=f"{name} (AP={ap:.3f})",
color=color, linewidth=2)
ax.set_xlabel("Recall")
ax.set_ylabel("Precision")
ax.set_title("Precision-Recall: Anomaly Detection Methods Compared")
ax.legend(fontsize=12)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
for ax, (name, scores) in zip(axes, methods.items()):
normal_scores = scores[y_true == 0]
anomaly_scores = scores[y_true == 1]
ax.hist(normal_scores, bins=80, alpha=0.6, color="#5FCCDB",
label="Normal", density=True)
ax.hist(anomaly_scores, bins=30, alpha=0.7, color="red",
label="Anomaly", density=True)
ax.set_title(name)
ax.set_xlabel("Anomaly Score")
ax.set_ylabel("Density")
ax.legend()
plt.suptitle("Score Distributions: Normal vs Anomalous Points", fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
The VAE's ELBO score should show better separation between normal and anomalous distributions than the basic autoencoder. The key advantage is that the KL component catches anomalies that have reasonable reconstructions but unusual latent encodings—subtle anomalies that the basic autoencoder misses.
Interpreting average precision (AP): AP summarizes the precision-recall curve into a single number. A perfect detector has AP = 1.0; random guessing gives AP equal to the anomaly rate. Higher AP means the method ranks true anomalies above normal points more consistently. Even a modest AP improvement (e.g., 0.72 to 0.78) can translate to significantly fewer false alarms in production.
Part B: Streaming Sliding Window Detection
Everything so far has been batch detection: we load all the data, train a model, and score everything at once. In a real utility control room, AMI data arrives continuously—every 15 minutes from thousands of meters. We need a system that processes data as it arrives, in sliding windows, and flags anomalies in near-real-time.
The pipeline design:
- Ingest: New AMI readings arrive every 15 minutes
- Buffer: Maintain a 1-hour sliding window (4 readings per meter)
- Feature extraction: Compute hourly statistics from the window (same features as Part A)
- Score: Run the features through the trained VAE, compute ELBO anomaly score
- Threshold: Compare the score against an adaptive threshold
- Alert: If the score exceeds the threshold, flag the meter-hour for investigation
Online vs batch detection: Batch detection processes historical data in bulk—great for retrospective analysis. Online (streaming) detection processes each new observation as it arrives—essential for real-time monitoring. The trade-off is that online systems must be fast and memory-efficient, since they cannot reprocess the entire dataset each time. Our sliding window approach strikes a balance: we maintain enough context (1 hour) to compute meaningful features, but discard older data to keep memory bounded.
weather = pd.read_csv(DATA_DIR + "weather/hourly_observations.csv",
parse_dates=["timestamp"])
outages = pd.read_csv(DATA_DIR + "outages/outage_events.csv",
parse_dates=["fault_detected"])
print(f"Weather records: {len(weather):,}")
print(f"Outage events: {len(outages):,}")
print(f"AMI meters: {ami['meter_id'].nunique():,}")
The core of the streaming pipeline is a class that maintains a buffer of recent readings per meter, computes features when the window is full, and scores each window through the trained VAE.
from collections import defaultdict, deque
class StreamingAnomalyDetector:
"""
Sliding window anomaly detector for AMI voltage data.
Maintains a 1-hour buffer per meter and scores with a trained VAE.
"""
def __init__(self, vae_model, scaler, feature_cols,
window_size=4, threshold=None):
"""
Args:
vae_model: trained VoltageVAE instance
scaler: fitted StandardScaler
feature_cols: list of feature column names
window_size: number of readings per window (4 = 1 hour at 15-min intervals)
threshold: fixed ELBO threshold (None = use adaptive)
"""
self.vae = vae_model
self.vae.eval()
self.scaler = scaler
self.feature_cols = feature_cols
self.window_size = window_size
self.fixed_threshold = threshold
self.buffers = defaultdict(
lambda: deque(maxlen=window_size)
)
self.detections = []
self.all_scores = []
def ingest(self, meter_id, timestamp, voltage, consumption_kwh):
"""Process a single AMI reading."""
self.buffers[meter_id].append({
"timestamp": timestamp,
"voltage": voltage,
"consumption_kwh": consumption_kwh,
})
if len(self.buffers[meter_id]) < self.window_size:
return None
features = self._extract_features(meter_id)
score = self._score(features)
self.all_scores.append({
"meter_id": meter_id,
"timestamp": timestamp,
"elbo_score": score,
})
return score
def _extract_features(self, meter_id):
"""Compute hourly statistics from the sliding window."""
readings = list(self.buffers[meter_id])
voltages = np.array([r["voltage"] for r in readings])
consumption = sum(r["consumption_kwh"] for r in readings)
return {
"voltage_mean": voltages.mean(),
"voltage_std": voltages.std(),
"voltage_range": voltages.max() - voltages.min(),
"voltage_min": voltages.min(),
"voltage_max": voltages.max(),
"consumption_kwh": consumption,
}
def _score(self, features):
"""Run features through the VAE and return ELBO score."""
x = np.array([[features[c] for c in self.feature_cols]])
x_scaled = self.scaler.transform(x)
x_tensor = torch.FloatTensor(x_scaled)
with torch.no_grad():
x_recon, mu, log_var = self.vae(x_tensor)
recon = F.mse_loss(x_recon, x_tensor, reduction="none").mean(dim=1)
kl = -0.5 * torch.sum(
1 + log_var - mu.pow(2) - log_var.exp(), dim=1
)
score = (recon + kl).item()
return score
detector = StreamingAnomalyDetector(
vae_model=vae,
scaler=scaler,
feature_cols=feature_cols,
window_size=4,
threshold=elbo_threshold,
)
print("Streaming detector initialized.")
print(f" Window size: {detector.window_size} readings (1 hour)")
print(f" Fixed threshold: {elbo_threshold:.4f}")
A fixed threshold works in stable conditions, but voltage patterns change with the seasons. In summer, air conditioning loads cause higher voltage variance—what looks anomalous in January might be perfectly normal in July. An adaptive threshold uses a rolling percentile of recent scores to set the alert level, automatically adjusting to seasonal patterns.
Why not just a fixed threshold? A fixed threshold tuned for winter conditions will flood operators with false positives in summer (when natural variance is higher). Conversely, a threshold tuned for summer will miss genuine anomalies in winter (when variance should be lower). Adaptive thresholding solves both problems by tracking what is "normal for the current season."
Cold start behavior: The AdaptiveThreshold requires min_samples=24 readings (6 hours of 15-minute data) before the adaptive threshold activates. During this cold-start period, the detector falls back to a fixed fallback_threshold (set to the ELBO threshold from the VAE training). This means the system is functional immediately but less precise during the first few hours of operation. When deploying to a new meter, you can warm up the detector using recent historical data to eliminate the cold-start period entirely.
class AdaptiveThreshold:
"""
Rolling percentile threshold that adapts to seasonal patterns.
Maintains a window of recent scores and computes the threshold
as the Nth percentile of that window.
"""
def __init__(self, window_hours=168, percentile=99.0,
min_samples=24, fallback_threshold=1.0):
"""
Args:
window_hours: lookback window for computing percentile (168 = 1 week)
percentile: percentile to use as threshold (99 = flag top 1%)
min_samples: minimum scores before adaptive threshold activates
fallback_threshold: fixed threshold used until min_samples is reached
"""
self.window_size = window_hours
self.percentile = percentile
self.min_samples = min_samples
self.fallback = fallback_threshold
self.score_history = deque(maxlen=window_hours)
def update_and_check(self, score):
"""Add a score and return (is_anomaly, current_threshold)."""
self.score_history.append(score)
if len(self.score_history) < self.min_samples:
threshold = self.fallback
else:
threshold = np.percentile(
list(self.score_history), self.percentile
)
is_anomaly = score > threshold
return is_anomaly, threshold
class AdaptiveStreamingDetector(StreamingAnomalyDetector):
"""Streaming detector with per-meter adaptive thresholds."""
def __init__(self, vae_model, scaler, feature_cols,
window_size=4, lookback_hours=168, percentile=99.0):
super().__init__(vae_model, scaler, feature_cols, window_size)
self.lookback_hours = lookback_hours
self.percentile = percentile
self.adaptive_thresholds = defaultdict(
lambda: AdaptiveThreshold(
window_hours=lookback_hours,
percentile=percentile,
fallback_threshold=elbo_threshold,
)
)
def ingest(self, meter_id, timestamp, voltage, consumption_kwh):
"""Process a reading with adaptive thresholding."""
score = super().ingest(meter_id, timestamp, voltage, consumption_kwh)
if score is None:
return None
is_anomaly, threshold = self.adaptive_thresholds[meter_id].update_and_check(score)
if is_anomaly:
self.detections.append({
"meter_id": meter_id,
"timestamp": timestamp,
"elbo_score": score,
"threshold": threshold,
})
return {"score": score, "threshold": threshold, "is_anomaly": is_anomaly}
adaptive_detector = AdaptiveStreamingDetector(
vae_model=vae,
scaler=scaler,
feature_cols=feature_cols,
window_size=4,
lookback_hours=168,
percentile=99.0,
)
print("Adaptive streaming detector ready.")
print(f" Lookback window: {adaptive_detector.lookback_hours} hours (1 week)")
print(f" Percentile: {adaptive_detector.percentile}th")
Choosing the lookback window: The 168-hour (1-week) lookback window captures a full weekly cycle of voltage patterns (weekday vs weekend differences). For stronger seasonal adaptation, you could extend this to 720 hours (1 month). The trade-off: longer windows are more stable but slower to adapt; shorter windows respond faster but can be noisy. For most utility applications, one week provides a good balance.
Now we replay one week of AMI data through the streaming pipeline, simulating what would happen if this system were running in SP&L's control room. We then overlay actual outage events to see how our detections correlate with real grid problems.
sim_start = "2024-07-08"
sim_end = "2024-07-15"
sim_data = ami[(ami["timestamp"] >= sim_start) &
(ami["timestamp"] < sim_end)].sort_values("timestamp")
print(f"Simulation period: {sim_start} to {sim_end}")
print(f"Readings to process: {len(sim_data):,}")
print(f"Unique meters: {sim_data['meter_id'].nunique():,}")
results = []
for row in sim_data.itertuples(index=False):
result = adaptive_detector.ingest(
meter_id=row.meter_id,
timestamp=row.timestamp,
voltage=row.voltage,
consumption_kwh=row.consumption_kwh,
)
if result is not None:
results.append({
"meter_id": row["meter_id"],
"timestamp": row["timestamp"],
**result,
})
results_df = pd.DataFrame(results)
detections_df = pd.DataFrame(adaptive_detector.detections)
print(f"\nWindows scored: {len(results_df):,}")
print(f"Anomalies flagged: {len(detections_df):,}")
print(f"Alert rate: {len(detections_df)/len(results_df)*100:.2f}%")
week_outages = outages[
(outages["fault_detected"] >= sim_start) &
(outages["fault_detected"] < sim_end)
]
print(f"Actual outage events this week: {len(week_outages)}")
print(week_outages[["fault_detected", "feeder_id", "cause_code",
"customers_affected"]].to_string(index=False))
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 10), sharex=True)
results_df["hour"] = results_df["timestamp"].dt.floor("h")
hourly_max_score = results_df.groupby("hour")["score"].max()
hourly_avg_score = results_df.groupby("hour")["score"].mean()
ax1.fill_between(hourly_avg_score.index, hourly_avg_score.values,
alpha=0.3, color="#5FCCDB", label="Avg ELBO score")
ax1.plot(hourly_max_score.index, hourly_max_score.values,
color="#2D6A7A", linewidth=0.8, alpha=0.7, label="Max ELBO score")
for _, outage in week_outages.iterrows():
ax1.axvline(x=outage["fault_detected"], color="red", linestyle="--",
alpha=0.8, linewidth=1.5)
ax1.axvline(x=pd.NaT, color="red", linestyle="--", label="Outage event")
ax1.set_ylabel("ELBO Anomaly Score")
ax1.set_title("Streaming VAE Anomaly Scores — July 8-15, 2024")
ax1.legend(loc="upper right")
if len(detections_df) > 0:
det_hourly = detections_df.groupby(
detections_df["timestamp"].dt.floor("h")
).size()
ax2.bar(det_hourly.index, det_hourly.values,
width=0.04, color="#E53E3E", alpha=0.8, label="Anomaly detections")
for _, outage in week_outages.iterrows():
ax2.axvline(x=outage["fault_detected"], color="red", linestyle="--",
alpha=0.8, linewidth=1.5)
ax2.set_xlabel("Time")
ax2.set_ylabel("Detections per Hour")
ax2.set_title("Anomaly Detection Timeline vs Actual Outages")
ax2.legend(loc="upper right")
plt.tight_layout()
plt.show()
lead_time_hours = 2
if len(detections_df) > 0:
outage_hits = 0
outage_details = []
for _, outage in week_outages.iterrows():
fault_time = outage["fault_detected"]
window_start = fault_time - pd.Timedelta(hours=lead_time_hours)
prior_detections = detections_df[
(detections_df["timestamp"] >= window_start) &
(detections_df["timestamp"] <= fault_time)
]
hit = len(prior_detections) > 0
outage_hits += int(hit)
outage_details.append({
"fault_time": fault_time,
"cause": outage["cause_code"],
"prior_alerts": len(prior_detections),
"detected": hit,
})
correlation_df = pd.DataFrame(outage_details)
hit_rate = outage_hits / len(week_outages) * 100
print(f"\nOutage correlation analysis ({lead_time_hours}h lead time):")
print(f" Outages with prior anomaly detection: {outage_hits}/{len(week_outages)} ({hit_rate:.1f}%)")
print(f"\nDetails:")
print(correlation_df.to_string(index=False))
Look for clusters of anomaly detections preceding outage events. The 2-hour lead time window used here is illustrative—it was chosen for this tutorial to demonstrate the correlation analysis technique, not because there is empirical evidence that voltage anomalies consistently precede outages by exactly 2 hours. In practice, pre-fault anomaly lead times vary widely: thermal overloads may show gradual voltage depression over hours to days, while weather-driven faults may produce only minutes of warning (or none at all). When deploying this in production, you should empirically calibrate the lead time by analyzing multiple windows (e.g., 30 min, 1h, 2h, 4h, 8h) and selecting the window that maximizes the hit rate for your specific failure modes. Not all outages will have precursor anomalies—sudden equipment failures may not show warning signs—but weather-driven and overload events often produce detectable voltage deviations beforehand.
For deployment, you need to save both the VAE weights and the scaler that was fit on training data. Without the scaler, new incoming data cannot be normalized consistently.
import joblib
torch.save(vae.state_dict(), "voltage_vae.pt")
joblib.dump(scaler, "voltage_scaler.pkl")
Why a 3-dimensional latent space? The 3-dimensional latent space was chosen to match the intrinsic dimensionality of voltage data, which has approximately 3 independent modes of variation: voltage level, variability, and consumption correlation. A larger latent space would dilute the KL divergence term, making it less informative for anomaly detection—anomalous points would not be pushed as far from the prior, reducing the separation between normal and anomalous scores.
- Built a Variational Autoencoder that learns a probabilistic distribution over normal voltage patterns
- Used the ELBO to compute principled anomaly scores with both reconstruction and distributional components
- Compared VAE vs basic autoencoder vs Isolation Forest using precision-recall analysis
- Designed a real-time streaming pipeline with sliding window feature extraction
- Implemented adaptive thresholding that adjusts to seasonal voltage variance
- Simulated real-time operation and correlated detections with actual outage events
Combining with Phase Imbalance Detection
The VAE anomaly scores can be enriched by adding phase imbalance features. If your AMI data includes phase labels (A, B, C), compute the voltage imbalance ratio (V_max - V_min) / V_avg across phases for each transformer and include it as an additional input feature. Phase imbalances that develop gradually will shift the VAE encoding away from the normal cluster, producing early warnings before the imbalance causes equipment damage.
Ideas to Try Next
- Graph Neural Networks (GNNs) for spatial anomalies: Model the grid topology as a graph and detect anomalies that span multiple meters or feeders—catching problems that no single-meter detector would flag
- Federated learning: Train the VAE across multiple utilities without sharing raw AMI data, preserving customer privacy while improving detection accuracy
- Temporal convolutional VAE: Replace the feedforward encoder with a temporal convolution to capture sequential patterns across multiple windows
- KL annealing: Implement a warm-up schedule that gradually increases the KL weight from 0 to 1 during training, which can improve latent representations
- Multi-meter ensemble: Score each meter independently, then aggregate scores per transformer or feeder to detect systemic anomalies
Key Terms Glossary
- Variational Autoencoder (VAE) — a generative model that learns a probability distribution over the latent space rather than a fixed encoding
- ELBO (Evidence Lower Bound) — the VAE loss function combining reconstruction quality and latent space regularity; doubles as an anomaly score
- KL divergence — a measure of how different the learned latent distribution is from the prior; acts as a regularizer
- Reparameterization trick — an algebraic rearrangement that allows backpropagation through the sampling operation: z = mu + sigma * epsilon
- Posterior collapse — a failure mode where the VAE ignores the latent variable; the KL divergence drops to near zero
- Sliding window — a fixed-size buffer of recent observations that advances as new data arrives
- Adaptive thresholding — dynamically adjusting the anomaly threshold based on recent score distributions rather than using a fixed cutoff
- Online detection — processing each observation as it arrives, in contrast to batch processing of historical data
- Phase imbalance — unequal voltage or current across the three phases of a distribution transformer, indicating wiring issues or load asymmetry