異常検知 (Anomaly Detection)
学習目標: 正常データのみから異常を見つける深層異常検知手法(AE再構成 / Isolation Forest / Deep SVDD / Normalizing Flow)を理解する
異常検知の特徴
- 正例(異常)は少ない or 存在しない — 教師あり分類が使いにくい
- 正常分布を学習し、そこから外れるサンプルを「異常」と判定
- 主要応用: 製造業の外観検査・クレジットカード不正・侵入検知・医療異常検出・予知保全
手法の分類
| カテゴリ | 代表手法 | 適用 |
|---|---|---|
| 密度推定型 | KDE, GMM, Normalizing Flow | テーブル・低次元 |
| 距離/隣接型 | kNN, LOF | テーブル・小規模 |
| 分離型 | Isolation Forest, One-Class SVM | テーブル・中規模 |
| 再構成型 | Autoencoder, VAE, MAE | 画像・時系列 |
| 分類型(自己教師) | Deep SVDD, CSI | 画像・高次元 |
| 特徴量モデル型 | PaDiM, PatchCore | 外観検査(MVTec) |
評価指標
- AUROC: 異常スコアの順位性。閾値非依存で公平比較
- AUPRC: 不均衡時の真の難易度
- F1@best: 運用時の最適閾値での F1
オートエンコーダによる再構成型異常検知
正常データだけでAEを学習 → 異常は再構成しにくい → 大きな再構成誤差で検出。
実装
import torch
import torch.nn as nn
import torch.nn.functional as F
class AnomalyAE(nn.Module):
def __init__(self, input_dim, hidden_dim=64, latent_dim=16):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, latent_dim),
)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, hidden_dim), nn.ReLU(),
nn.Linear(hidden_dim, input_dim),
)
def forward(self, x):
z = self.encoder(x)
return self.decoder(z)
def train(model, normal_loader, epochs=100, lr=1e-3):
opt = torch.optim.Adam(model.parameters(), lr=lr)
for epoch in range(epochs):
for x, in normal_loader: # 正常のみ
recon = model(x)
loss = F.mse_loss(recon, x)
opt.zero_grad(); loss.backward(); opt.step()
def anomaly_score(model, x):
"""サンプル単位の再構成誤差"""
model.eval()
with torch.no_grad():
recon = model(x)
return F.mse_loss(recon, x, reduction='none').mean(dim=1)
閾値の決め方
import numpy as np
scores = anomaly_score(model, val_x).numpy()
# 正常データのスコア分布から 99-percentile を閾値に
threshold = np.percentile(scores, 99)
is_anomaly = scores > threshold
変分オートエンコーダ (VAE) による検知
復元誤差に加えて KL(潜在空間からの逸脱)も使える。連続的なスコアが取りやすい。
def vae_anomaly_score(model, x):
recon, mu, logvar = model(x)
recon_err = F.mse_loss(recon, x, reduction='none').mean(dim=1)
kl = -0.5 * (1 + logvar - mu.pow(2) - logvar.exp()).sum(dim=1)
return recon_err + 0.5 * kl
Deep SVDD — One-Class分類
正常データの特徴ベクトルを1点 c を中心とした超球内に押し込めるよう学習。
球の中心から離れているほど「異常」。
損失
L = (1/N) Σ_i || f_θ(x_i) - c ||² 中心 c は学習開始時の平均で初期化(その後固定 or 学習)
実装
class DeepSVDD(nn.Module):
def __init__(self, encoder, c):
super().__init__()
self.encoder = encoder
self.register_buffer('c', c)
def forward(self, x):
return self.encoder(x)
def init_center(encoder, dataloader, eps=0.1):
"""データ全体の特徴の平均で中心を初期化"""
encoder.eval()
n, c = 0, None
with torch.no_grad():
for x, in dataloader:
z = encoder(x)
c = z.sum(0) if c is None else c + z.sum(0)
n += z.size(0)
c = c / n
# ゼロ近傍を避ける
c[abs(c) < eps] = eps * torch.sign(c[abs(c) < eps])
return c
def svdd_train(svdd, normal_loader, epochs=50, lr=1e-3):
opt = torch.optim.Adam(svdd.parameters(), lr=lr,
weight_decay=1e-6)
for epoch in range(epochs):
for x, in normal_loader:
z = svdd(x)
loss = ((z - svdd.c) ** 2).sum(dim=1).mean()
opt.zero_grad(); loss.backward(); opt.step()
def svdd_score(svdd, x):
with torch.no_grad():
z = svdd(x)
return ((z - svdd.c) ** 2).sum(dim=1)
注意: 単純なDeep SVDDはbias項が中心に張り付くと崩壊する(trivial solution)。
Affine変換と bias を除外してネットワークを構成する必要があります。
画像異常検知(MVTec ベンチマーク)
製造業の外観検査では PaDiM / PatchCore / FastFlow が現在の主流。
ImageNetで事前学習したCNNの中間特徴 + 統計モデルが強い。
PatchCore のアイデア
1. 正常画像を事前学習 ResNet に通し、中間層のパッチ特徴を全て収集 2. メモリバンクに保存 (Coreset Sampling で 1〜10% に絞る) 3. テスト画像のパッチ特徴を取り出し、メモリバンクとの最小距離を異常スコアに
スケッチ実装
import torch
import torchvision.models as models
class PatchCore:
def __init__(self, layer_idx=(1, 2)):
backbone = models.resnet50(weights='IMAGENET1K_V1')
self.feature_extractor = nn.Sequential(*list(backbone.children())[:6])
self.feature_extractor.eval()
self.memory_bank = None
def extract_patches(self, x):
with torch.no_grad():
f = self.feature_extractor(x) # (N, C, H, W)
# H*W個のパッチ特徴 (N*H*W, C)
return f.permute(0, 2, 3, 1).reshape(-1, f.size(1))
def build_memory(self, normal_loader, coreset_ratio=0.1):
patches = []
for x, in normal_loader:
patches.append(self.extract_patches(x))
patches = torch.cat(patches, dim=0)
# Greedy coreset subsampling
N = patches.size(0)
k = int(N * coreset_ratio)
indices = self._coreset(patches, k)
self.memory_bank = patches[indices]
def score(self, x):
patches = self.extract_patches(x)
# 各パッチ → メモリ最近傍距離
dists = torch.cdist(patches, self.memory_bank).min(dim=1).values
# 画像レベルスコアはmax
N = x.size(0)
return dists.view(N, -1).max(dim=1).values
手法比較
| 手法 | 事前学習 | 学習時間 | MVTec AUROC |
|---|---|---|---|
| AE | 不要 | 長い | 0.75 程度 |
| Deep SVDD | 不要 | 中 | 0.80 程度 |
| PaDiM | ImageNet固定 | 無し(統計のみ) | 0.95+ |
| PatchCore | ImageNet固定 | 無し | 0.99+ |
| FastFlow | ImageNet + NF | 中 | 0.99+ |