異常検知 (Anomaly Detection)

学習目標: 正常データのみから異常を見つける深層異常検知手法(AE再構成 / Isolation Forest / Deep SVDD / Normalizing Flow)を理解する

異常検知の特徴

  • 正例(異常)は少ない or 存在しない — 教師あり分類が使いにくい
  • 正常分布を学習し、そこから外れるサンプルを「異常」と判定
  • 主要応用: 製造業の外観検査・クレジットカード不正・侵入検知・医療異常検出・予知保全

手法の分類

カテゴリ代表手法適用
密度推定型KDE, GMM, Normalizing Flowテーブル・低次元
距離/隣接型kNN, LOFテーブル・小規模
分離型Isolation Forest, One-Class SVMテーブル・中規模
再構成型Autoencoder, VAE, MAE画像・時系列
分類型(自己教師)Deep SVDD, CSI画像・高次元
特徴量モデル型PaDiM, PatchCore外観検査(MVTec)

評価指標

  • AUROC: 異常スコアの順位性。閾値非依存で公平比較
  • AUPRC: 不均衡時の真の難易度
  • F1@best: 運用時の最適閾値での F1

オートエンコーダによる再構成型異常検知

正常データだけでAEを学習 → 異常は再構成しにくい → 大きな再構成誤差で検出。

実装

import torch
import torch.nn as nn
import torch.nn.functional as F


class AnomalyAE(nn.Module):
    def __init__(self, input_dim, hidden_dim=64, latent_dim=16):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, latent_dim),
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, input_dim),
        )

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)


def train(model, normal_loader, epochs=100, lr=1e-3):
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    for epoch in range(epochs):
        for x, in normal_loader:                 # 正常のみ
            recon = model(x)
            loss = F.mse_loss(recon, x)
            opt.zero_grad(); loss.backward(); opt.step()


def anomaly_score(model, x):
    """サンプル単位の再構成誤差"""
    model.eval()
    with torch.no_grad():
        recon = model(x)
        return F.mse_loss(recon, x, reduction='none').mean(dim=1)

閾値の決め方

import numpy as np

scores = anomaly_score(model, val_x).numpy()
# 正常データのスコア分布から 99-percentile を閾値に
threshold = np.percentile(scores, 99)

is_anomaly = scores > threshold

変分オートエンコーダ (VAE) による検知

復元誤差に加えて KL(潜在空間からの逸脱)も使える。連続的なスコアが取りやすい。

def vae_anomaly_score(model, x):
    recon, mu, logvar = model(x)
    recon_err = F.mse_loss(recon, x, reduction='none').mean(dim=1)
    kl = -0.5 * (1 + logvar - mu.pow(2) - logvar.exp()).sum(dim=1)
    return recon_err + 0.5 * kl

Deep SVDD — One-Class分類

正常データの特徴ベクトルを1点 c を中心とした超球内に押し込めるよう学習。
球の中心から離れているほど「異常」。

損失

L = (1/N) Σ_i || f_θ(x_i) - c ||²
中心 c は学習開始時の平均で初期化(その後固定 or 学習)

実装

class DeepSVDD(nn.Module):
    def __init__(self, encoder, c):
        super().__init__()
        self.encoder = encoder
        self.register_buffer('c', c)

    def forward(self, x):
        return self.encoder(x)


def init_center(encoder, dataloader, eps=0.1):
    """データ全体の特徴の平均で中心を初期化"""
    encoder.eval()
    n, c = 0, None
    with torch.no_grad():
        for x, in dataloader:
            z = encoder(x)
            c = z.sum(0) if c is None else c + z.sum(0)
            n += z.size(0)
    c = c / n
    # ゼロ近傍を避ける
    c[abs(c) < eps] = eps * torch.sign(c[abs(c) < eps])
    return c


def svdd_train(svdd, normal_loader, epochs=50, lr=1e-3):
    opt = torch.optim.Adam(svdd.parameters(), lr=lr,
                           weight_decay=1e-6)
    for epoch in range(epochs):
        for x, in normal_loader:
            z = svdd(x)
            loss = ((z - svdd.c) ** 2).sum(dim=1).mean()
            opt.zero_grad(); loss.backward(); opt.step()


def svdd_score(svdd, x):
    with torch.no_grad():
        z = svdd(x)
        return ((z - svdd.c) ** 2).sum(dim=1)
注意: 単純なDeep SVDDはbias項が中心に張り付くと崩壊する(trivial solution)。 Affine変換と bias を除外してネットワークを構成する必要があります。

画像異常検知(MVTec ベンチマーク)

製造業の外観検査では PaDiM / PatchCore / FastFlow が現在の主流。
ImageNetで事前学習したCNNの中間特徴 + 統計モデルが強い。

PatchCore のアイデア

1. 正常画像を事前学習 ResNet に通し、中間層のパッチ特徴を全て収集
2. メモリバンクに保存 (Coreset Sampling で 1〜10% に絞る)
3. テスト画像のパッチ特徴を取り出し、メモリバンクとの最小距離を異常スコアに

スケッチ実装

import torch
import torchvision.models as models

class PatchCore:
    def __init__(self, layer_idx=(1, 2)):
        backbone = models.resnet50(weights='IMAGENET1K_V1')
        self.feature_extractor = nn.Sequential(*list(backbone.children())[:6])
        self.feature_extractor.eval()
        self.memory_bank = None

    def extract_patches(self, x):
        with torch.no_grad():
            f = self.feature_extractor(x)         # (N, C, H, W)
        # H*W個のパッチ特徴 (N*H*W, C)
        return f.permute(0, 2, 3, 1).reshape(-1, f.size(1))

    def build_memory(self, normal_loader, coreset_ratio=0.1):
        patches = []
        for x, in normal_loader:
            patches.append(self.extract_patches(x))
        patches = torch.cat(patches, dim=0)

        # Greedy coreset subsampling
        N = patches.size(0)
        k = int(N * coreset_ratio)
        indices = self._coreset(patches, k)
        self.memory_bank = patches[indices]

    def score(self, x):
        patches = self.extract_patches(x)
        # 各パッチ → メモリ最近傍距離
        dists = torch.cdist(patches, self.memory_bank).min(dim=1).values
        # 画像レベルスコアはmax
        N = x.size(0)
        return dists.view(N, -1).max(dim=1).values

手法比較

手法事前学習学習時間MVTec AUROC
AE不要長い0.75 程度
Deep SVDD不要0.80 程度
PaDiMImageNet固定無し(統計のみ)0.95+
PatchCoreImageNet固定無し0.99+
FastFlowImageNet + NF0.99+