Spaces:
Runtime error
Runtime error
| # Copyright (c) Meta Platforms, Inc. and affiliates. All Rights Reserved | |
| import copy | |
| import gc | |
| import logging | |
| import os | |
| from collections import defaultdict | |
| from operator import xor | |
| from pathlib import Path | |
| from typing import List, Optional | |
| import numpy as np | |
| import pycocotools.mask as mask_util | |
| import torch | |
| from pycocotools.cocoeval import COCOeval | |
| from sam3.eval.cgf1_eval import CGF1Eval | |
| from sam3.eval.coco_eval_offline import convert_to_xywh | |
| from sam3.model.box_ops import box_xywh_inter_union | |
| from sam3.train.masks_ops import rle_encode | |
| from sam3.train.utils import distributed as dist | |
| from typing_extensions import override | |
| try: | |
| import rapidjson as json | |
| except ModuleNotFoundError: | |
| import json | |
| from iopath.common.file_io import g_pathmgr | |
| class YTVISevalMixin: | |
| """ | |
| Identical to COCOeval but adapts computeIoU to compute IoU between tracklets/masklets. | |
| """ | |
| def _prepare(self): | |
| """ | |
| Copied from cocoeval.py but doesn't convert masks to RLEs (we assume they already are RLEs) | |
| """ | |
| p = self.params | |
| if p.useCats: | |
| gts = self.cocoGt.loadAnns( | |
| self.cocoGt.getAnnIds(imgIds=p.imgIds, catIds=p.catIds) | |
| ) | |
| dts = self.cocoDt.loadAnns( | |
| self.cocoDt.getAnnIds(imgIds=p.imgIds, catIds=p.catIds) | |
| ) | |
| else: | |
| gts = self.cocoGt.loadAnns(self.cocoGt.getAnnIds(imgIds=p.imgIds)) | |
| dts = self.cocoDt.loadAnns(self.cocoDt.getAnnIds(imgIds=p.imgIds)) | |
| # set ignore flag | |
| for gt in gts: | |
| gt["ignore"] = gt["ignore"] if "ignore" in gt else 0 | |
| gt["ignore"] = "iscrowd" in gt and gt["iscrowd"] | |
| if p.iouType == "keypoints": | |
| gt["ignore"] = (gt["num_keypoints"] == 0) or gt["ignore"] | |
| self._gts = defaultdict(list) # gt for evaluation | |
| self._dts = defaultdict(list) # dt for evaluation | |
| for gt in gts: | |
| self._gts[gt["image_id"], gt["category_id"]].append(gt) | |
| for dt in dts: | |
| self._dts[dt["image_id"], dt["category_id"]].append(dt) | |
| self.evalImgs = defaultdict(list) # per-image per-category evaluation results | |
| self.eval = {} # accumulated evaluation results | |
| def computeIoU(self, imgId, catId): | |
| """ | |
| Compute IoU between tracklets. Copied from cocoeval.py but adapted for videos (in YT-VIS format) | |
| """ | |
| p = self.params | |
| if p.useCats: | |
| gt = self._gts[imgId, catId] | |
| dt = self._dts[imgId, catId] | |
| else: | |
| gt = [_ for cId in p.catIds for _ in self._gts[imgId, cId]] | |
| dt = [_ for cId in p.catIds for _ in self._dts[imgId, cId]] | |
| if len(gt) == 0 or len(dt) == 0: | |
| return [] | |
| # For class mAP and phrase AP evaluation, we sort the detections in descending order of scores (as in COCOeval). | |
| # For demo F1 evaluation, we DO NOT sort the detections (but match them with GTs via Hungarian matching). | |
| assert hasattr(self, "sort_inds_by_scores_in_iou"), ( | |
| "subclasses that inherits YTVISevalMixin should set `self.sort_inds_by_scores_in_iou` " | |
| "(True for class mAP and phrase AP, False for demo F1)" | |
| ) | |
| if self.sort_inds_by_scores_in_iou: | |
| inds = np.argsort([-d["score"] for d in dt], kind="mergesort") | |
| dt = [dt[i] for i in inds] | |
| if len(dt) > p.maxDets[-1]: | |
| dt = dt[0 : p.maxDets[-1]] | |
| if p.iouType == "segm": | |
| g = [g["segmentations"] for g in gt] | |
| d = [d["segmentations"] for d in dt] | |
| elif p.iouType == "bbox": | |
| g = [g["bboxes"] for g in gt] | |
| d = [d["bboxes"] for d in dt] | |
| else: | |
| raise Exception("unknown iouType for iou computation") | |
| def iou_tracklets(preds, gts): | |
| preds = torch.tensor(preds) | |
| gts = torch.tensor(gts) | |
| inter, union = box_xywh_inter_union( | |
| preds.unsqueeze(1), gts.unsqueeze(0) | |
| ) # Num preds x Num GTS x Num frames | |
| inter = inter.sum(-1) | |
| union = union.sum(-1) | |
| assert ( | |
| union > 0 | |
| ).all(), ( | |
| "There exists a tracklet with zero GTs across time. This is suspicious" | |
| ) | |
| return inter / union | |
| def iou_masklets(preds, gts): | |
| inter = 0 | |
| union = 0 | |
| for p_i, gt_i in zip(preds, gts): | |
| if p_i and gt_i: | |
| # Compute areas of intersection and union | |
| inter += mask_util.area( | |
| mask_util.merge([p_i, gt_i], intersect=True) | |
| ) | |
| union += mask_util.area( | |
| mask_util.merge([p_i, gt_i], intersect=False) | |
| ) | |
| elif gt_i: | |
| union += mask_util.area(gt_i) | |
| elif p_i: | |
| union += mask_util.area(p_i) | |
| if union > 0: | |
| iou = inter / union | |
| assert iou >= 0 and iou <= 1, "Encountered an error in IoU computation" | |
| else: | |
| assert np.isclose(inter, 0) and np.isclose( | |
| union, 0 | |
| ), "Encountered an error in IoU computation" | |
| iou = 1 | |
| return iou | |
| if p.iouType == "segm": | |
| ious = [[iou_masklets(d_i, g_i) for g_i in g] for d_i in d] | |
| else: | |
| ious = iou_tracklets(d, g) | |
| return np.array(ious) | |
| class YTVISeval(YTVISevalMixin, COCOeval): | |
| # For class mAP and phrase AP evaluation, we sort the detections in descending order of scores (as in COCOeval). | |
| sort_inds_by_scores_in_iou = True | |
| class VideoDemoF1Eval(YTVISevalMixin, CGF1Eval): | |
| # For demo F1 evaluation, we DO NOT sort the detections (but match them with GTs via Hungarian matching). | |
| sort_inds_by_scores_in_iou = False | |
| class YTVISResultsWriter: | |
| """ | |
| Gather and dumps predictions in YT-VIS format. | |
| Expected flow of API calls: reset() -> N * update() -> compute_synced() | |
| """ | |
| def __init__( | |
| self, | |
| dump_file: str, | |
| postprocessor, | |
| gather_pred_via_filesys=False, | |
| pred_file_evaluators: Optional[List] = None, | |
| save_per_frame_scores: bool = False, | |
| write_eval_metrics_file: bool = True, | |
| eval_metrics_file_suffix: str = ".sam3_eval_metrics", | |
| ): | |
| self.dump_file = dump_file | |
| self.dump = [] | |
| self.postprocessor = postprocessor | |
| self.gather_pred_via_filesys = gather_pred_via_filesys | |
| if dist.is_main_process(): | |
| dirname = os.path.dirname(self.dump_file) | |
| if not os.path.exists(dirname): | |
| os.makedirs(dirname, exist_ok=True) | |
| logging.info(f"Creating folder: {dirname}") | |
| # the evaluation hooks to be applied to the prediction files | |
| self.pred_file_evaluators = pred_file_evaluators or [] | |
| self.save_per_frame_scores = save_per_frame_scores | |
| # in addition to the prediction file, we also write the evaluation metrics | |
| # for easier debugging and analysis (stored in another eval_metrics_file | |
| # so that we can keep the dumped prediction file under YT-VIS format) | |
| self.write_eval_metrics_file = write_eval_metrics_file | |
| if self.write_eval_metrics_file: | |
| self.eval_metrics_file = self.dump_file + eval_metrics_file_suffix | |
| os.makedirs(os.path.dirname(self.eval_metrics_file), exist_ok=True) | |
| def _dump_vid_preds(self, results): | |
| dumped_results = copy.deepcopy(results) | |
| self.dump.extend(dumped_results) | |
| def prepare(self, predictions): | |
| ytvis_results = [] | |
| for video_id, prediction in predictions.items(): | |
| if len(prediction) == 0: | |
| continue | |
| for k in ["boxes", "scores", "labels"]: | |
| assert ( | |
| k in prediction | |
| ), f"Expected predictions to have `{k}` key, available keys are {prediction.keys()}" | |
| if self.save_per_frame_scores: | |
| assert ( | |
| "per_frame_scores" in prediction | |
| ), f"Expected predictions to have `per_frame_scores` key, available keys are {prediction.keys()}" | |
| assert xor( | |
| "masks" in prediction, "masks_rle" in prediction | |
| ), f"Expected predictions to have either `masks` key or `masks_rle` key, available keys are {prediction.keys()}" | |
| boxes = prediction["boxes"] | |
| boxes = convert_to_xywh(boxes).tolist() | |
| scores = prediction["scores"].tolist() | |
| labels = prediction["labels"].tolist() | |
| if "masks" in prediction: | |
| masks = prediction["masks"].squeeze(2) | |
| assert ( | |
| masks.ndim == 4 | |
| ), "Expected masks to be of shape(N_preds,T_frames,H,W)" | |
| areas = [mask.flatten(1).sum(1).tolist() for mask in masks] | |
| rles = [rle_encode(masklet) for masklet in masks] | |
| # memory clean | |
| del masks | |
| del prediction["masks"] | |
| elif "masks_rle" in prediction: | |
| rles = prediction.pop("masks_rle") | |
| areas = [ | |
| [0 if rle is None else rle.pop("area") for rle in rles_per_obj] | |
| for rles_per_obj in rles | |
| ] | |
| else: | |
| raise ValueError( | |
| "Expected either `masks` or `masks_rle` key in the predictions." | |
| ) | |
| new_results = [ | |
| { | |
| "video_id": video_id, | |
| "category_id": track_label, | |
| "bboxes": track_boxes, | |
| "score": track_score, | |
| "segmentations": track_masks, | |
| "areas": track_areas, | |
| } | |
| for ( | |
| track_boxes, | |
| track_masks, | |
| track_areas, | |
| track_score, | |
| track_label, | |
| ) in zip(boxes, rles, areas, scores, labels) | |
| ] | |
| # Optionally, save per-frame scores | |
| if self.save_per_frame_scores: | |
| per_frame_scores = prediction["per_frame_scores"].tolist() | |
| for res, track_per_frame_scores in zip(new_results, per_frame_scores): | |
| res["per_frame_scores"] = track_per_frame_scores | |
| ytvis_results.extend(new_results) | |
| return ytvis_results | |
| def set_sync_device(self, device: torch.device): | |
| self._sync_device = device | |
| def update(self, *args, **kwargs): | |
| predictions = self.postprocessor.process_results(*args, **kwargs) | |
| results = self.prepare(predictions) | |
| self._dump_vid_preds(results) | |
| def _dump_preds(self): | |
| if not dist.is_main_process(): | |
| self.dump = [] | |
| gc.collect() | |
| return | |
| dumped_file = Path(self.dump_file) | |
| logging.info(f"YTVIS evaluator: Dumping predictions to {dumped_file}") | |
| with g_pathmgr.open(str(dumped_file), "w") as f: | |
| json.dump(self.dump, f) | |
| self.dump = [] | |
| gc.collect() | |
| return str(dumped_file) | |
| def synchronize_between_processes(self): | |
| logging.info("YT-VIS evaluator: Synchronizing between processes") | |
| dump_dict = self._dedup_pre_gather(self.dump) | |
| if self.gather_pred_via_filesys: | |
| dump_dict_all_gpus = dist.gather_to_rank_0_via_filesys(dump_dict) | |
| else: | |
| dump_dict_all_gpus = dist.all_gather(dump_dict, force_cpu=True) | |
| self.dump = self._dedup_post_gather(dump_dict_all_gpus) | |
| logging.info(f"Gathered all {len(self.dump)} predictions") | |
| def _dedup_pre_gather(self, predictions): | |
| """ | |
| Organize the predictions as a dict-of-list using (video_id, category_id) as keys | |
| for deduplication after gathering them across GPUs. | |
| During evaluation, PyTorch data loader under `drop_last: False` would wrap | |
| around the dataset length to be a multiple of world size (GPU num) and duplicate | |
| the remaining batches. This causes the same test sample to appear simultaneously | |
| in multiple GPUs, resulting in duplicated predictions being saved into prediction | |
| files. These duplicates are then counted as false positives under detection mAP | |
| metrics (since a ground truth can be matched with only one prediction). | |
| For example, if there are 4 GPUs and 6 samples [A1, A2, B1, B2, C1, C2], the data | |
| loader (under `drop_last: False`) would load it by wrapping it around like | |
| `[A1, A2, B1, B2, C1, C2, *A1*, *A2*]` to make a multiple of 4 and then split it as | |
| - GPU 0: A1, C1 | |
| - GPU 1: A2, C2 | |
| - GPU 3: B1, **A1** | |
| - GPU 4: B2, **A2** | |
| (as in DistributedSampler in https://github.com/pytorch/pytorch/blob/521588519da9f4876d90ddd7a17c10d0eca89dc6/torch/utils/data/distributed.py#L116-L124) | |
| so the predictions on A1 and A2 will occur twice in the final gathered outputs | |
| in the prediction file (and counted as false positives). This also affects our | |
| YT-VIS official val evaluation, but to a lesser extent than YT-VIS dev since | |
| the latter is much smaller and more susceptible to false positives. | |
| So we to deduplicate this. The tricky part is that we cannot deduplicate them | |
| simply using video id, given that we are sharding the classes in each video | |
| across multiple batches (with 20 prompts per batch) in our "orig_cats" eval dbs. | |
| The solution is to deduplicate based on (video_id, category_id) tuple as keys. | |
| We organize the predictions as a dict-of-list using (video_id, category_id) as | |
| keys on each GPU, with the list of masklets under this (video_id, category_id) | |
| on this GPU as values. Then, we all-gather this dict-of-list across GPUs and | |
| if a key (video_id, category_id) appears in multiple GPUs, we only take the | |
| prediction masklet list from one GPU. | |
| """ | |
| prediction_dict = defaultdict(list) | |
| for p in predictions: | |
| prediction_dict[(p["video_id"], p["category_id"])].append(p) | |
| return prediction_dict | |
| def _dedup_post_gather(self, list_of_prediction_dict): | |
| """ | |
| Deduplicate the predictions from all GPUs. See `_dedup_pre_gather` for details. | |
| """ | |
| dedup_prediction_dict = {} | |
| duplication_keys = [] | |
| for prediction_dict in list_of_prediction_dict: | |
| for k, v in prediction_dict.items(): | |
| if k not in dedup_prediction_dict: | |
| dedup_prediction_dict[k] = v | |
| else: | |
| duplication_keys.append(k) | |
| logging.info( | |
| f"skipped {len(duplication_keys)} duplicated predictions in YTVISResultsWriter " | |
| f"with the following (video_id, category_id) tuples: {duplication_keys}" | |
| ) | |
| dedup_predictions = sum(dedup_prediction_dict.values(), []) | |
| return dedup_predictions | |
| def compute_synced( | |
| self, | |
| ): | |
| self.synchronize_between_processes() | |
| dumped_file = self._dump_preds() | |
| if not dist.is_main_process(): | |
| return {"": 0.0} | |
| # run evaluation hooks on the prediction file | |
| meters = {} | |
| all_video_np_level_results = defaultdict(dict) | |
| for evaluator in self.pred_file_evaluators: | |
| gc.collect() | |
| results, video_np_level_results = evaluator.evaluate(dumped_file) | |
| meters.update(results) | |
| for (video_id, category_id), res in video_np_level_results.items(): | |
| all_video_np_level_results[(video_id, category_id)].update(res) | |
| gc.collect() | |
| if self.write_eval_metrics_file: | |
| # convert the nested dict of {(video_id, category_id): per_sample_metric_dict} | |
| # to a list of per-sample metric dicts (with video_id and category_id) for JSON, | |
| # as JSON doesn't allow using tuples like (video_id, category_id) as dict keys | |
| video_np_level_metrics = [ | |
| {"video_id": video_id, "category_id": category_id, **res} | |
| for (video_id, category_id), res in all_video_np_level_results.items() | |
| ] | |
| eval_metrics = { | |
| "dataset_level_metrics": meters, | |
| "video_np_level_metrics": video_np_level_metrics, | |
| } | |
| with g_pathmgr.open(self.eval_metrics_file, "w") as f: | |
| json.dump(eval_metrics, f) | |
| logging.info( | |
| f"YTVIS evaluator: Dumped evaluation metrics to {self.eval_metrics_file}" | |
| ) | |
| if len(meters) == 0: | |
| meters = {"": 0.0} | |
| return meters | |
| def compute(self): | |
| return {"": 0.0} | |
| def reset(self, *args, **kwargs): | |
| self.dump = [] | |