一、作者介绍
作者:李逸超,西安工程大学电子信息学院,2025级研究生张宏伟人工智能课题组
研究方向:机器视觉与人工智能
联系邮箱:2317314922@qq.com
二、AnomalyDINO核心原理
2.1工业图像异常检测概述
异常检测(AnomalyDetection)在工业视觉检查领域旨在识别出严重偏离正常数据分布的样本。传统的全样本(Full-shot)异常检测方法依赖大量正常样本来训练分类器或生成模型。然而,工业生产中往往难以低成本地获取海量数据。
基于少样本(Few-shot)的异常检测能在仅提供极少量正常参考图的情况下实现高精度预测,极大满足了工业界对快速部署的需求。该任务属于典型的无监督学习范畴,无需成对的缺陷图像进行预训练,主要依靠对正常样本特征分布的精确理解来定位如划痕、污染、结构缺失等底层视觉异常。
2.2AnomalyDINO概述
AnomalyDINO是一种专注于工业视觉异常检测的纯视觉架构模型。它遵循经典的补丁级深度最近邻(Patch-leveldeepnearestneighbor)范式。
相较于依赖文本提示的多模态模型,该方法凭借DINOv2提取的高质量视觉特征,在无需任何微调(Fine-tuning)和元学习(Meta-learning)的“免训练”设定下,即可实现精准的图像级异常预测与像素级异常分割。它在MVTecAD等权威数据集上达到了业界领先的少样本检测水平,因其极低的部署开销和出色的鲁棒性,成为工业检测场景的主流方案之一。
2.3DINOv2特征提取网络
AnomalyDINO的核心骨干网络利用了DINOv2强大的特征表示能力。DINOv2是一个基于视觉Transformer(ViT)的自监督学习框架。其特征提取主要包含以下机制:
图1 DINOv2模型框架
双分支输入与裁剪:模型通过让网络“自己认识自己”来学习特征。原图经过全局裁剪(GlobalCrops)输入给教师网络(TeacherViT),保留完整的上下文信息;经过局部裁剪(LocalCrops)后输入给学生网络(StudentViT),迫使模型从局部细节预测全局特征,使其对细微纹理变化极其敏感。
特征输出(CLS&PatchTokens):网络输出包含代表全局语义的CLSToken,以及密集局部特征的PatchTokens。在像素级少样本任务中,正是这些高质量的PatchTokens构成了异常检测的特征比对库。
联合损失函数(DINO+iBOT):使用DINOLoss对比师生网络对全局理解的交叉熵差异,同时引入iBOTLoss强制学生网络还原被掩码(Masking)区域的局部特征,大幅提升了密集预测与细粒度检测能力。
不对称参数更新:学生网络通过反向传播计算梯度更新参数,而教师网络不参与梯度回传,仅通过指数移动平均(EMA)从学生网络的历史参数中平滑拷贝,确保教师模型始终作为稳定的特征提取器。
2.4自适应前景分离与掩膜(AdaptiveMasking)
工业图像往往背景复杂,直接进行全局特征对比会引入大量背景噪声导致误报。AnomalyDINO利用DINOv2强大的零样本注意力机制实现了无监督的前景分割:
降维提取主成分:巧妙利用PCA算法,仅提取图像特征的第一主成分(FirstPC)。第一主成分的值通常能完美区分前景与背景,通过阈值操作即可得到初始二值掩膜。
自适应反转(AdaptiveMasking):为防止PCA方向反转导致背景被错误抓取,代码加入了自校验逻辑。当图像中心区域的前景像素占比低于35%时,算法会自动反转符号,确保在各种工业数据集上始终精准剥离背景。
形态学后处理:结合OpenCV传统的膨胀(Dilate)和闭运算(MORPH_CLOSE)操作,平滑掩膜边缘并填补前景内部的细小空洞,保证提取的特征纯粹且完整。
2.5正常特征内存库构建(MemoryBank)
这是少样本检测的核心机制。在构建内存库时,模型会遍历少量的正常参考样本,执行精准过滤与高效拼接:
特征提纯:依托前一步生成的自适应掩膜,代码仅保留前景物体的有效Patch特征。通过np.concatenate将这些提纯后的特征打平并拼接成一个巨大的特征矩阵。这既缩减了显存占用,又排除了背景噪声对正常特征分布的干扰。
引入FAISS构建高维索引:为解决庞大密集纹理特征的检索瓶颈,算法引入了Meta开源的FAISS向量检索库。通过faiss.GpuIndexFlatL2在GPU端直接申请计算资源,并在存入特征前执行L2归一化(normalize_L2),为后续计算余弦相似度做好了底层准备。
图2 模型框架图
2.6距离度量与异常分数计算
在推理阶段,底层特征的距离如何科学地转化为最终的异常评估分数是关键:
度量转换:在调用检索功能时,深度高维特征的绝对模长易受光照或对比度影响,而其“方向”更能代表本质的纹理语义。因此利用归一化后的L2距离,在数学上等价转化为余弦距离(CosineDistance),极大提高了复杂特征匹配的鲁棒性。
二维空间重构:将检索出的一维距离数组重新映射回原图的空间位置。初始化一个与原始掩膜等大的全零矩阵,精准填回距离后reshape为二维异常热力图(AnomalyMap),使得背景区域被安全置零,彻底杜绝虚假报警。
长尾聚合评分机制:为了将像素级的距离转化为单一的图像级异常分数,并非采用简单的最大值或平均值,而是采用聚合统计量进行评估。通过提取距离分布中Top1%的均值作为最终异常分数$mean(H_{0.01}(\mathcal{D}))$。这种统计学上的截断处理既能捕捉到最强的异常信号,又对单一点的离群噪声具有出色的鲁棒性。
三、数据集介绍与预处理
3.1数据集构成
MVTecAD(MVTecAnomalyDetection)数据集是工业视觉异常检测领域最经典、也是被最广泛采用的基准(Benchmark)数据集。该数据集专门针对真实的工业检测场景设计,包含超过5000张高分辨率图像,共划分为15个不同的工业产品类别:
图3 MVTecAD数据集官网
图4 MVTecAD数据集内容
5类纹理(Textures):包含地毯(Carpet)、网格(Grid)、皮革(Leather)、瓷砖(Tile)、木材(Wood)。这些类别表面呈周期性或复杂的密集纹理拓扑,要求模型具备强大的全局纹理一致性建模能力。
10类物体(Objects):包含瓶子(Bottle)、电缆(Cable)、胶囊(Capsule)、榛子(Hazelnut)、金属螺母(MetalNut)、药丸(Pill)、螺丝(Screw)、牙刷(Toothbrush)、晶体管(Transistor)、拉链(Zipper)。物体类别通常形变较大,需要模型重点关注局部结构、对齐方式和逻辑组件的完整性。
3.2训练与测试范式
为了严谨模拟工业实际生产中缺陷样本极度稀缺的真实环境,数据集完全遵循无监督异常检测(UnsupervisedAnomalyDetection)的任务范式进行构建:
训练集:仅包含绝对无缺陷的正常(Good)图像。在少样本(Few-shot)设定下,算法仅被允许从中抽取1张或4张样本的高维特征来构建正常基准特征分布。
测试集:同时包含完全正常的图像以及带有各类真实工业缺陷的异常图像。
双层评估体系:数据集提供了极其精细的像素级GroundTruth掩码(Masks)。这使得评估体系不仅包含用于判断整张图片是否存在异常的图像级指标(ROC-AUC、F1-max),还引入了像素级定位指标(Pixel-F1、Pixel-AUPRO)。其中,区域重叠均值(PRO)指标能够公平地对待不同大小的缺陷连通域,准确反映模型对局部微小疵点的覆盖和定位能力。
3.3数据处理与增强
由于工业图像的成像环境、目标物体的摆放位置与开源基础模型在自然图像上的预训练分布存在较大差异,必须通过精细的少样本预处理流水线对数据进行提纯与增强:
尺寸统一与自适应多分辨率:为了兼顾推理低延迟与密集预测的精度,输入图像在送入骨干网络前,通常被统一缩放并裁剪至448×448或672×672像素(分辨率须为DINOv2补丁步长14的整数倍)。实验表明,更高的分辨率能够显著增强小面积缺陷的像素级定位效果。
零样本前景掩膜生成(Masking):对于10类具体物体,由于其背景通常包含复杂的传送带、金属托盘等结构,直接比对会引入大量的工业环境噪声。通过阈值化DINOv2特征的第一主成分(FirstPC)生成无监督掩膜,并利用OpenCV的膨胀(Dilate)与闭运算(MORPH_CLOSE)进行平滑和填补空洞,从而在不借助任何人工标注的情况下完美剥离背景。对于5类密集纹理,则默认不执行掩膜操作。
少样本旋转增强(Rotation):在少样本场景下,测试样本在生产线上的旋转角度变化往往大于极少数的参考样本。例如在‘螺丝(Screw)’类别的1-shot设定下,通过对唯一的正常参考图进行多角度旋转增强,特征匹配的图像级AUROC能够从65.6%大幅跃升至89.2%。
预处理模式划分:
不可知模式(Agnostic):默认策略。在完全不知道后续测试样本是否存在旋转或形变规律时,默认对所有图像应用旋转增强与自适应掩膜。
已知模式(Informed):在受控的工业自动化场景下(或测试图已在线对齐),可以关闭旋转增强,仅保留前景掩膜提取。这样既能缩减内存库的构建开销,又能有效防止将‘晶体管错位(misplaced)’等涉及角度的语义异常误判为正常特征。
3.4数据集目录结构
项目内的数据集及特征路径严格遵循标准的工业视觉基准拓扑,其目录层级结构如下:
图5 数据集目录结构图
四、免训练算法流程
由于AnomalyDINO采用免训练(Training-Free)的深度最近邻范式,模型不需要像传统GAN网络那样进行复杂的对抗训练和参数微调。其核心流程分为两个阶段:一是通过少量正常样本构建高维特征内存库的“提纯阶段”;二是对待测样本进行检索比对的“推理阶段”。
4.1内存库构建流程
加载参考样本:读取极少量的绝对无缺陷正常样本。
执行数据增强(可选):在不可知(Agnostic)模式下,对输入的正常样本进行多角度旋转增强,以扩充正常样本在特征空间中的多视角分布。
密集特征提取:将增强后的图像送入预训练的DINOv2骨干网络,密集提取出每个贴片(Patch)的高维Token特征表示。
自适应前景剥离:利用第一主成分(FirstPC)生成无监督掩膜,通过自适应校验检查前景占比,并结合OpenCV膨胀与闭运算剔除背景贴片的干扰。
特征矩阵拼接:将通过掩膜筛选后的有效前景Patch特征进行打平,并使用np.concatenate拼接成一个统一的高维正常特征矩阵。
矩阵归一化与存入索引:对拼接后的特征矩阵进行L2归一化处理(normalize_L2),随后将其直接添加至GPU加速的FAISS向量检索库中(如faiss.GpuIndexFlatL2),完成特征数据库M的高效构建。
4.2推理与缺陷检测流程
读取待测样本:输入一张待检测的测试工业图像。
分辨率适配预处理:将测试样本缩放并裁剪至骨干网络指定分辨率,确保其边长为DINOv2步长14的整数倍。
计算测试特征:将预处理后的测试样本送入DINOv2网络中,获取测试图像贴片的局部高维特征向量。
高维最近邻检索:通过FAISS检索库,利用GPU并行计算测试图像中每个贴片特征与内存库M中所有正常特征贴片之间的最近邻L2距离,并在数学上严格转化为余弦距离。
像素级异常图重构:将检索出的一维距离数组精准填回原始掩膜对应的空间位置,将背景区域安全置零,再经过双线性插值上采样与高斯平滑,生成精确的像素级二维异常热力图(AnomalyMap)。
图像级聚合评分:对贴片的距离分布进行统计学上的“长尾截断”处理,提取距离分布中Top1%最高值的均值作为该张图片的整图异常分数,平衡局部缺陷信号的敏感度与单一噪点的鲁棒性,输出最终的工业检测结果。
图6 结果展示
五、代码实现
1、主程序代码
import argparse import os from argparse import ArgumentParser, Action import yaml from tqdm import trange import csv from src.utils import get_dataset_info from src.detection import run_anomaly_detection from src.post_eval import eval_finished_run from src.visualize import create_sample_plots from src.backbones import get_model class IntListAction(Action): """ Define a custom action to always return a list. This allows --shots 1 to be treated as a list of one element [1]. """ def __call__(self, namespace, values): if not isinstance(values, list): values = [values] setattr(namespace, self.dest, values) def parse_args(): parser = ArgumentParser() parser.add_argument("--dataset", type=str, default="MVTec") parser.add_argument("--model_name", type=str, default="dinov2_vits14", help="Name of the backbone model. Choose from ['dinov2_vits14', 'dinov2_vitb14', 'dinov2_vitl14', 'dinov2_vitg14', 'vit_b_16'].") parser.add_argument("--data_root", type=str, default="data/mvtec_anomaly_detection", help="Path to the root directory of the dataset.") parser.add_argument("--preprocess", type=str, default="agnostic", help="Preprocessing method. Choose from ['agnostic', 'informed', 'masking_only'].") parser.add_argument("--resolution", type=int, default=448) parser.add_argument("--knn_metric", type=str, default="L2_normalized") parser.add_argument("--k_neighbors", type=int, default=1) parser.add_argument("--faiss_on_cpu", default=False, action=argparse.BooleanOptionalAction, help="Use GPU for FAISS kNN search. (Conda install faiss-gpu recommended, does usually not work with pip install.)") parser.add_argument("--shots", nargs='+', type=int, default=[1], #action=IntListAction, help="List of shots to evaluate. Full-shot scenario is -1.") parser.add_argument("--num_seeds", type=int, default=1) parser.add_argument("--mask_ref_images", default=False) parser.add_argument("--just_seed", type=int, default=None) parser.add_argument('--save_examples', default=True, action=argparse.BooleanOptionalAction, help="Save example plots.") parser.add_argument("--eval_clf", default=True, action=argparse.BooleanOptionalAction, help="Evaluate anomaly detection performance.") parser.add_argument("--eval_segm", default=False, action=argparse.BooleanOptionalAction, help="Evaluate anomaly segmentation performance.") parser.add_argument("--device", default='cuda:0') parser.add_argument("--warmup_iters", type=int, default=25, help="Number of warmup iterations, relevant when benchmarking inference time.") #parser.add_argument("--tag", help="Optional tag for the saving directory.") # tag: 给保存结果的文件夹加一个自定义的后缀标签 parser.add_argument("--tag", type=str, default="test", help="Optional tag for the saving directory.") args = parser.parse_args() return args if __name__=="__main__": args = parse_args() print(f"Requested to run {len(args.shots)} (different) shot(s):", args.shots) print(f"Requested to repeat the experiments {args.num_seeds} time(s).") objects, object_anomalies, masking_default, rotation_default = get_dataset_info(args.dataset, args.preprocess) # set CUDA device os.environ["CUDA_VISIBLE_DEVICES"] = str(args.device[-1]) model = get_model(args.model_name, 'cuda', smaller_edge_size=args.resolution) if not args.model_name.startswith("dinov2"): masking_default = {o: False for o in objects} print("Caution: Only DINOv2 supports 0-shot masking (for now)!") if args.just_seed != None: seeds = [args.just_seed] else: seeds = range(args.num_seeds) for shot in list(args.shots): save_examples = args.save_examples results_dir = f"results_{args.dataset}/{args.model_name}_{args.resolution}/{shot}-shot_preprocess={args.preprocess}" if args.tag != None: results_dir += "_" + args.tag plots_dir = results_dir os.makedirs(f"{results_dir}", exist_ok=True) # save preprocessing setups (masking and rotation) to file with open(f"{results_dir}/preprocess.yaml", "w") as f: yaml.dump({"masking": masking_default, "rotation": rotation_default}, f) # save arguments to file with open(f"{results_dir}/args.yaml", "w") as f: yaml.dump(vars(args), f) if args.faiss_on_cpu: print("Warning: Running similarity search on CPU. Consider using faiss-gpu for faster inference.") print("Results will be saved to", results_dir) for seed in seeds: print(f"=========== Shot = {shot}, Seed = {seed} ===========") if os.path.exists(f"{results_dir}/metrics_seed={seed}.json"): print(f"Results for shot {shot}, seed {seed} already exist. Skipping.") continue else: timeit_file = results_dir + "/time_measurements.csv" with open(timeit_file, 'w', newline='') as file: writer = csv.writer(file) writer.writerow(["Object", "Sample", "Anomaly_Score", "MemoryBank_Time", "Inference_Time"]) for object_name in objects: if save_examples: os.makedirs(f"{plots_dir}/{object_name}", exist_ok=True) os.makedirs(f"{plots_dir}/{object_name}/examples", exist_ok=True) # CUDA warmup for _ in trange(args.warmup_iters, desc="CUDA warmup", leave=False): first_image = os.listdir(f"{args.data_root}/{object_name}/train/good")[0] img_tensor, grid_size = model.prepare_image(f"{args.data_root}/{object_name}/train/good/{first_image}") features = model.extract_features(img_tensor) anomaly_scores, time_memorybank, time_inference = run_anomaly_detection( model, object_name, data_root = args.data_root, n_ref_samples = shot, object_anomalies = object_anomalies, plots_dir = plots_dir, save_examples = save_examples, knn_metric = args.knn_metric, knn_neighbors = args.k_neighbors, faiss_on_cpu = args.faiss_on_cpu, masking = masking_default[object_name], mask_ref_images = args.mask_ref_images, rotation = rotation_default[object_name], seed = seed, save_patch_dists = args.eval_clf, # save patch distances for detection evaluation save_tiffs = args.eval_segm) # save anomaly maps as tiffs for segmentation evaluation # write anomaly scores and inference times to file for counter, sample in enumerate(anomaly_scores.keys()): anomaly_score = anomaly_scores[sample] inference_time = time_inference[sample] writer.writerow([object_name, sample, f"{anomaly_score:.5f}", f"{time_memorybank:.5f}", f"{inference_time:.5f}"]) # print(f"Mean inference time ({object_name}): {sum(time_inference.values())/len(time_inference):.5f} s/sample") # read inference times from file with open(timeit_file, 'r') as file: reader = csv.reader(file) next(reader) inference_times = [float(row[4]) for row in reader] print(f"Finished AD for {len(objects)} objects (seed {seed}), mean inference time: {sum(inference_times)/len(inference_times):.5f} s/sample") # evaluate all finished runs and create sample anomaly maps for inspection print(f"=========== Evaluate seed = {seed} ===========") eval_finished_run(args.dataset, args.data_root, anomaly_maps_dir = results_dir + f"/anomaly_maps/seed={seed}", output_dir = results_dir, seed = seed, pro_integration_limit = 0.3, eval_clf = args.eval_clf, eval_segm = args.eval_segm) create_sample_plots(results_dir, anomaly_maps_dir = results_dir + f"/anomaly_maps/seed={seed}", seed = seed, dataset = args.dataset, data_root = args.data_root) # deactivate creation of examples for the next seeds... save_examples = False print("Finished and evaluated all runs!")2、detection代码
import matplotlib.pyplot as plt import os import cv2 import numpy as np from tqdm import tqdm import faiss import tifffile as tiff import time import torch from src.utils import augment_image, dists2map, plot_ref_images from src.post_eval import mean_top1p def run_anomaly_detection( model, object_name, data_root, n_ref_samples, object_anomalies, plots_dir, save_examples = False, masking = None, mask_ref_images = False, rotation = False, knn_metric = 'L2_normalized', knn_neighbors = 1, faiss_on_cpu = False, seed = 0, save_patch_dists = True, save_tiffs = False): """ Main function to evaluate the anomaly detection performance of a given object/product. Parameters: - model: The backbone model for feature extraction (and, in case of DINOv2, masking). - object_name: The name of the object/product to evaluate. - data_root: The root directory of the dataset. - n_ref_samples: The number of reference samples to use for evaluation (k-shot). Set to -1 for full-shot setting. - object_anomalies: The anomaly types for each object/product. - plots_dir: The directory to save the example plots. - save_examples: Whether to save example images and plots. Default is True. - masking: Whether to apply DINOv2 to estimate the foreground mask (and discard background patches). - rotation: Whether to augment reference samples with rotation. - knn_metric: The metric to use for kNN search. Default is 'L2_normalized' (1 - cosine similarity) - knn_neighbors: The number of nearest neighbors to consider. Default is 1. - seed: The seed value for deterministic sampling in few-shot setting. Default is 0. - save_patch_dists: Whether to save the patch distances. Default is True. Required to eval detection. - save_tiffs: Whether to save the anomaly maps as TIFF files. Default is False. Required to eval segmentation. """ assert knn_metric in ["L2", "L2_normalized"] # add 'good' to the anomaly types type_anomalies = object_anomalies[object_name] type_anomalies.append('good') # ensure that each type is only evaluated once type_anomalies = list(set(type_anomalies)) # Extract reference features features_ref = [] images_ref = [] masks_ref = [] vis_backgroud = [] img_ref_folder = f"{data_root}/{object_name}/train/good/" if n_ref_samples == -1: # full-shot setting img_ref_samples = sorted(os.listdir(img_ref_folder)) else: # few-shot setting, pick samples in deterministic fashion according to seed img_ref_samples = sorted(os.listdir(img_ref_folder))[seed*n_ref_samples:(seed + 1)*n_ref_samples] if len(img_ref_samples) < n_ref_samples: print(f"Warning: Not enough reference samples for {object_name}! Only {len(img_ref_samples)} samples available.") with torch.inference_mode(): # start measuring time (feature extraction/memory bank set up) start_time = time.time() for img_ref_n in tqdm(img_ref_samples, desc="Building memory bank", leave=False): # load reference image... img_ref = f"{img_ref_folder}{img_ref_n}" image_ref = cv2.cvtColor(cv2.imread(img_ref, cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB) # augment reference image (if applicable)... if rotation: img_augmented = augment_image(image_ref) else: img_augmented = [image_ref] for i in range(len(img_augmented)): image_ref = img_augmented[i] image_ref_tensor, grid_size1 = model.prepare_image(image_ref) features_ref_i = model.extract_features(image_ref_tensor) # compute background mask and discard background patches mask_ref = model.compute_background_mask(features_ref_i, grid_size1, threshold=10, masking_type=(mask_ref_images and masking)) features_ref.append(features_ref_i[mask_ref]) if save_examples: images_ref.append(image_ref) vis_image_background = model.get_embedding_visualization(features_ref_i, grid_size1, mask_ref) masks_ref.append(mask_ref) vis_backgroud.append(vis_image_background) features_ref = np.concatenate(features_ref, axis=0).astype('float32') if faiss_on_cpu: # similariy search on CPU knn_index = faiss.IndexFlatL2(features_ref.shape[1]) else: # similariy search on GPU res = faiss.StandardGpuResources() knn_index = faiss.GpuIndexFlatL2(res, features_ref.shape[1]) # knn_index = faiss.IndexFlatL2(features_ref.shape[1]) # knn_index = faiss.index_cpu_to_gpu(res, int(model.device[-1]), knn_index) if knn_metric == "L2_normalized": faiss.normalize_L2(features_ref) knn_index.add(features_ref) # end measuring time (for memory bank set up; in seconds, same for all test samples of this object) time_memorybank = time.time() - start_time # plot some reference samples for inspection if save_examples: plots_dir_ = f"{plots_dir}/{object_name}/" plot_ref_images(images_ref, masks_ref, vis_backgroud, grid_size1, plots_dir_, title = "Reference Images", img_names = img_ref_samples) inference_times = {} anomaly_scores = {} idx = 0 # Evaluate anomalies for each anomaly type (and "good") for type_anomaly in tqdm(type_anomalies, desc = f"processing test samples ({object_name})"): data_dir = f"{data_root}/{object_name}/test/{type_anomaly}" if save_patch_dists or save_tiffs: os.makedirs(f"{plots_dir}/anomaly_maps/seed={seed}/{object_name}/test/{type_anomaly}", exist_ok=True) for idx, img_test_nr in enumerate(sorted(os.listdir(data_dir))): # start measuring time (inference) start_time = time.time() image_test_path = f"{data_dir}/{img_test_nr}" # Extract test features image_test = cv2.cvtColor(cv2.imread(image_test_path, cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB) image_tensor2, grid_size2 = model.prepare_image(image_test) features2 = model.extract_features(image_tensor2) # Compute background mask if masking: mask2 = model.compute_background_mask(features2, grid_size2, threshold=10, masking_type=masking) else: mask2 = np.ones(features2.shape[0], dtype=bool) if save_examples and idx < 3: vis_image_test_background = model.get_embedding_visualization(features2, grid_size2, mask2) # Discard irrelevant features features2 = features2[mask2] # Compute distances to nearest neighbors in M if knn_metric == "L2": distances, match2to1 = knn_index.search(features2, k = knn_neighbors) if knn_neighbors > 1: distances = distances.mean(axis=1) distances = np.sqrt(distances) elif knn_metric == "L2_normalized": faiss.normalize_L2(features2) distances, match2to1 = knn_index.search(features2, k = knn_neighbors) if knn_neighbors > 1: distances = distances.mean(axis=1) distances = distances / 2 # equivalent to cosine distance (1 - cosine similarity) output_distances = np.zeros_like(mask2, dtype=float) output_distances[mask2] = distances.squeeze() d_masked = output_distances.reshape(grid_size2) # save inference time torch.cuda.synchronize() # Synchronize CUDA kernels before measuring time inf_time = time.time() - start_time inference_times[f"{type_anomaly}/{img_test_nr}"] = inf_time anomaly_scores[f"{type_anomaly}/{img_test_nr}"] = mean_top1p(output_distances.flatten()) # Save the anomaly maps (raw as .npy or full resolution .tiff files) img_test_nr = img_test_nr.split(".")[0] if save_tiffs: anomaly_map = dists2map(d_masked, image_test.shape) tiff.imwrite(f"{plots_dir}/anomaly_maps/seed={seed}/{object_name}/test/{type_anomaly}/{img_test_nr}.tiff", anomaly_map) if save_patch_dists: np.save(f"{plots_dir}/anomaly_maps/seed={seed}/{object_name}/test/{type_anomaly}/{img_test_nr}.npy", d_masked) # Save some example plots (3 per anomaly type) if save_examples and idx < 3: fig, (ax1, ax2, ax3, ax4,) = plt.subplots(1, 4, figsize=(18, 4.5)) # plot test image, PCA + mask ax1.imshow(image_test) ax2.imshow(vis_image_test_background) # plot patch distances d_masked[~mask2.reshape(grid_size2)] = 0.0 plt.colorbar(ax3.imshow(d_masked), ax=ax3, fraction=0.12, pad=0.05, orientation="horizontal") # compute image level anomaly score (mean(top 1%) of patches = empirical tail value at risk for quantile 0.99) score_top1p = mean_top1p(distances) ax4.axvline(score_top1p, color='r', linestyle='dashed', linewidth=1, label=round(score_top1p, 2)) ax4.legend() ax4.hist(distances.flatten())