# Copyright (c) 2022-2024, InterDigital Communications, Inc
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of InterDigital Communications, Inc nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
# NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import json
import math
from pathlib import Path
import motmetrics as mm
import numpy as np
import pandas as pd
import torch
from detectron2.evaluation import COCOEvaluator
from jde.utils.io import unzip_objs
from pytorch_msssim import ms_ssim
from tqdm import tqdm
from compressai_vision.datasets import deccode_compressed_rle
from compressai_vision.registry import register_evaluator
from compressai_vision.utils import time_measure, to_cpu
from .base_evaluator import BaseEvaluator
from .tf_evaluation_utils import (
DetectionResultFields,
InputDataFields,
OpenImagesChallengeEvaluator,
decode_gt_raw_data_into_masks_and_boxes,
decode_masks,
encode_masks,
)
[docs]@register_evaluator("COCO-EVAL")
class COCOEVal(BaseEvaluator):
def __init__(
self,
datacatalog_name,
dataset_name,
dataset,
output_dir="./vision_output/",
criteria="AP",
):
super().__init__(datacatalog_name, dataset_name, dataset, output_dir, criteria)
self.set_annotation_info(dataset)
self._evaluator = COCOEvaluator(
dataset_name, False, output_dir=output_dir, use_fast_impl=False
)
if datacatalog_name == "MPEGOIV6":
deccode_compressed_rle(self._evaluator._coco_api.anns)
self.reset()
[docs] def reset(self):
self._evaluator.reset()
[docs] def digest(self, gt, pred):
return self._evaluator.process(gt, pred)
[docs] def results(self, save_path: str = None):
out = self._evaluator.evaluate()
if save_path:
self.write_results(out, save_path)
self.write_results(out)
# summary = {}
# for key, item_dict in out.items():
# summary[f"{key}"] = item_dict["AP"]
return out
[docs]@register_evaluator("OIC-EVAL")
class OpenImagesChallengeEval(BaseEvaluator):
def __init__(
self,
datacatalog_name,
dataset_name,
dataset,
output_dir="./vision_output/",
criteria="AP50",
):
super().__init__(datacatalog_name, dataset_name, dataset, output_dir, criteria)
self.set_annotation_info(dataset)
with open(dataset.annotation_path) as f:
json_dict = json.load(f)
def _search_category(name):
for e, item in enumerate(self.thing_classes):
if name == self._normalize_labelname(item):
return e
self._logger.error(f"Not found Item {name} in 'thing_classes'")
raise ValueError
assert len(json_dict["annotations"]) > 0
has_segmentation = (
True if "segmentation" in json_dict["annotations"][0] else False
)
self._valid_contiguous_id = []
self._oic_labelmap_dict = {}
self._oic_categories = []
assert "oic_labelmap" in json_dict
assert "oic_annotations" in json_dict
valid_categories = json_dict["oic_labelmap"]
gt_annotations = json_dict["oic_annotations"]
for item in valid_categories:
e = _search_category(self._normalize_labelname(item["name"]))
if e is not None:
self._valid_contiguous_id.append(e)
self._oic_labelmap_dict[self._normalize_labelname(item["name"])] = item[
"id"
]
self._oic_categories.append({"id": item["id"], "name": item["name"]})
self._oic_evaluator = OpenImagesChallengeEvaluator(
self._oic_categories, evaluate_masks=has_segmentation
)
self._logger.info(
f"Loading annotations for {len(gt_annotations)} images and reformatting them for OpenImageChallenge Evaluator."
)
for gt in tqdm(gt_annotations):
img_id = gt["image_id"]
annotations = gt["annotations"]
anno_dict = {}
anno_dict[InputDataFields.groundtruth_boxes] = np.array(annotations["bbox"])
anno_dict[InputDataFields.groundtruth_classes] = np.array(
annotations["cls"]
)
anno_dict[InputDataFields.groundtruth_group_of] = np.array(
annotations["group_of"]
)
img_cls = []
for name in annotations["img_cls"]:
img_cls.append(self._oic_labelmap_dict[self._normalize_labelname(name)])
anno_dict[InputDataFields.groundtruth_image_classes] = np.array(img_cls)
if "mask" in annotations:
segments, _ = decode_gt_raw_data_into_masks_and_boxes(
annotations["mask"], annotations["img_size"]
)
anno_dict[InputDataFields.groundtruth_instance_masks] = segments
self._oic_evaluator.add_single_ground_truth_image_info(img_id, anno_dict)
self._logger.info(
f"All groundtruth annotations for {len(gt_annotations)} images are successfully registred to the evaluator."
)
self.reset()
[docs] def reset(self):
self._predictions = []
self._cc = 0
@staticmethod
def _normalize_labelname(name: str):
return name.lower().replace(" ", "_")
[docs] def digest(self, gt, pred):
assert len(gt) == len(pred) == 1, "Batch size must be 1 for the evaluation"
if self._oic_evaluator is None:
self._logger.warning(
"There is no assigned evaluator for the class. Evaluator will not work properly"
)
return
img_id = gt[0]["image_id"]
test_fields = to_cpu(pred[0]["instances"])
imgH, imgW = test_fields.image_size
classes = test_fields.pred_classes
scores = test_fields.scores
bboxes = test_fields.pred_boxes.tensor
assert len(classes) == len(scores) == len(bboxes)
masks = []
has_mask = test_fields.has("pred_masks")
if has_mask:
masks = test_fields.pred_masks
assert len(masks) == len(bboxes)
valid_indexes = []
for e, cid in enumerate(classes):
if cid in self._valid_contiguous_id:
valid_indexes.append(e)
if len(valid_indexes) > 0:
pred_dict = {
"img_id": img_id,
"img_size": (imgH, imgW),
"classes": classes[valid_indexes].tolist(),
"scores": scores[valid_indexes].tolist(),
"bboxes": bboxes[valid_indexes].tolist(),
}
if has_mask:
pred_dict["masks"] = encode_masks(masks[valid_indexes])
self._predictions.append(pred_dict)
self._cc += 1
return
def _process_prediction(self, pred_dict):
valid_cls = []
valid_scores = []
valid_bboxes = []
valid_segment_masks = []
valid_segment_boxes = []
imgH, imgW = pred_dict["img_size"]
classes = pred_dict["classes"]
scores = pred_dict["scores"]
bboxes = pred_dict["bboxes"]
has_mask = True if "masks" in pred_dict else False
if has_mask:
masks = pred_dict["masks"]
for e, _items in enumerate(zip(classes, scores, bboxes)):
_class, _score, _bbox = _items
cate_name = self._normalize_labelname(self.thing_classes[_class])
valid_cls.append(self._oic_labelmap_dict[cate_name])
valid_scores.append(_score)
norm_bbox = np.array(_bbox) / [imgW, imgH, imgW, imgH]
# XMin, YMin, XMax, YMax --> YMin, XMin, YMax, XMax
valid_bboxes.append(norm_bbox[[1, 0, 3, 2]])
if has_mask:
segment, boxe = decode_masks(masks[e])
valid_segment_masks.append(segment)
valid_segment_boxes.append(boxe)
res_dict = {
DetectionResultFields.detection_classes: np.array(valid_cls),
DetectionResultFields.detection_scores: np.array(valid_scores).astype(
float
),
}
if has_mask:
res_dict[DetectionResultFields.detection_masks] = np.concatenate(
valid_segment_masks, axis=0
)
res_dict[DetectionResultFields.detection_boxes] = np.concatenate(
valid_segment_boxes, axis=0
)
else:
res_dict[DetectionResultFields.detection_boxes] = np.array(
valid_bboxes
).astype(float)
return res_dict
[docs] def results(self, save_path: str = None):
if self._oic_evaluator is None:
self._logger.warning(
"There is no assigned evaluator for the class. Evaluator will not work properly"
)
return
if len(self._predictions) == 0:
self._logger.warning("There is no detected objects to evaluate")
return
start = time_measure()
for pred_dict in self._predictions:
img_id = pred_dict["img_id"]
processed_dict = self._process_prediction(pred_dict)
self._oic_evaluator.add_single_detected_image_info(img_id, processed_dict)
self._logger.info(
f"Elapsed time to process and register the predicted items to the evaluator: {time_measure() - start:.02f} sec"
)
out = self._oic_evaluator.evaluate()
self._logger.info(f"Total evaluation time: {time_measure() - start:.02f} sec")
if save_path:
self.write_results(out, save_path)
self.write_results(out)
summary = {}
for key, value in out.items():
name = "-".join(key.split("/")[1:])
summary[name] = value
return summary
[docs]@register_evaluator("MOT-JDE-EVAL")
class MOT_JDE_Eval(BaseEvaluator):
"""
A Multiple Object Tracking Evaluator
This class evaluates MOT performance of tracking model such as JDE in compressai-vision.
BaseEvaluator is inherited to interface with pipeline architecture in compressai-vision
Functions below in this class refers to
The class Evaluator inin Towards-Realtime-MOT/utils/evaluation.py at
<https://github.com/Zhongdao/Towards-Realtime-MOT/blob/master/utils/evaluation.py>
<https://github.com/Zhongdao/Towards-Realtime-MOT/blob/master/track.py>
Full license statement can be found at
<https://github.com/Zhongdao/Towards-Realtime-MOT/blob/master/LICENSE>
"""
def __init__(
self,
datacatalog_name,
dataset_name,
dataset,
output_dir="./vision_output/",
criteria="MOTA",
):
super().__init__(datacatalog_name, dataset_name, dataset, output_dir, criteria)
self.set_annotation_info(dataset)
mm.lap.default_solver = "lap"
self.dataset = dataset.dataset
self.eval_info_file_name = self.get_jde_eval_info_name(self.dataset_name)
self.reset()
[docs] def reset(self):
self.acc = mm.MOTAccumulator(auto_id=True)
self._predictions = {}
@staticmethod
def _load_gt_in_motchallenge(filepath, fmt="mot15-2D", min_confidence=-1):
return mm.io.loadtxt(filepath, fmt=fmt, min_confidence=min_confidence)
@staticmethod
def _format_pd_in_motchallenge(predictions: dict):
all_indexes = []
all_columns = []
for frmID, preds in predictions.items():
if len(preds) > 0:
for data in preds:
tlwh, objID, conf = data
all_indexes.append((frmID, objID))
column_data = tlwh + (conf, -1, -1)
all_columns.append(column_data)
columns = ["X", "Y", "Width", "Height", "Confidence", "ClassId", "Visibility"]
idx = pd.MultiIndex.from_tuples(all_indexes, names=["FrameId", "Id"])
return pd.DataFrame(all_columns, idx, columns)
[docs] def digest(self, gt, pred):
pred_list = []
for tlwh, id in zip(pred["tlwhs"], pred["ids"]):
x1, y1, w, h = tlwh
x2, y2 = x1 + w, y1 + h
parsed_pred = ((x1, y1, w, h), id, 1.0)
pred_list.append(parsed_pred)
self._predictions[int(gt[0]["image_id"])] = pred_list
[docs] def results(self, save_path: str = None):
out = self.mot_eval()
if save_path:
self.write_results(out, save_path)
self.write_results(out)
return out
[docs] @staticmethod
def digest_summary(summary):
ret = {}
keys_lists = [
(
[
"idf1",
"idp",
"idr",
"recall",
"precision",
"num_unique_objects",
"mota",
"motp",
],
None,
),
(
[
"mostly_tracked",
"partially_tracked",
"mostly_lost",
"num_false_positives",
"num_misses",
"num_switches",
"num_fragmentations",
"num_transfer",
"num_ascend",
"num_migrate",
],
int,
),
]
for keys, dtype in keys_lists:
selected_keys = [key for key in keys if key in summary]
for key in selected_keys:
ret[key] = summary[key] if dtype is None else dtype(summary[key])
return ret
[docs] def mot_eval(self):
assert len(self.dataset) == len(
self._predictions
), "Total number of frames are mismatch"
# skip the very first frame
for gt_frame in self.dataset[1:]:
frm_id = int(gt_frame["image_id"])
pred_objs = self._predictions[frm_id].copy()
pred_tlwhs, pred_ids, _ = unzip_objs(pred_objs)
gt_objs = gt_frame["annotations"]["gt"].copy()
gt_tlwhs, gt_ids, _ = unzip_objs(gt_objs)
gt_ignore = gt_frame["annotations"]["gt_ignore"].copy()
gt_ignore_tlwhs, _, _ = unzip_objs(gt_ignore)
# remove ignored results
keep = np.ones(len(pred_tlwhs), dtype=bool)
iou_distance = mm.distances.iou_matrix(
gt_ignore_tlwhs, pred_tlwhs, max_iou=0.5
)
if len(iou_distance) > 0:
match_is, match_js = mm.lap.linear_sum_assignment(iou_distance)
match_is, match_js = map(
lambda a: np.asarray(a, dtype=int), [match_is, match_js]
)
match_ious = iou_distance[match_is, match_js]
match_js = np.asarray(match_js, dtype=int)
match_js = match_js[np.logical_not(np.isnan(match_ious))]
keep[match_js] = False
pred_tlwhs = pred_tlwhs[keep]
pred_ids = pred_ids[keep]
# get distance matrix
iou_distance = mm.distances.iou_matrix(gt_tlwhs, pred_tlwhs, max_iou=0.5)
# accumulate
self.acc.update(gt_ids, pred_ids, iou_distance)
# get summary
metrics = mm.metrics.motchallenge_metrics
mh = mm.metrics.create()
summary = mh.compute(
self.acc,
metrics=metrics,
name=self.dataset_name,
return_dataframe=False,
return_cached=True,
)
return self.digest_summary(summary)
def _save_all_eval_info(self, pred: dict):
Path(self.output_dir).mkdir(parents=True, exist_ok=True)
file_name = f"{self.output_dir}/{self.eval_info_file_name}"
torch.save(pred, file_name)
def _load_all_eval_info(self):
file_name = f"{self.output_dir}/{self.eval_info_file_name}"
return torch.load(file_name)
[docs]@register_evaluator("MOT-TVD-EVAL")
class MOT_TVD_Eval(MOT_JDE_Eval):
"""
A Multiple Object Tracking Evaluator for TVD
This class evaluates MOT performance of tracking model such as JDE specifically on TVD
"""
def __init__(
self,
datacatalog_name,
dataset_name,
dataset,
output_dir="./vision_output/",
criteria="MOTA",
):
super().__init__(datacatalog_name, dataset_name, dataset, output_dir, criteria)
self.set_annotation_info(dataset)
self._gt_pd = self._load_gt_in_motchallenge(self.annotation_path)
assert self.seqinfo_path is not None, "Sequence Information must be provided"
[docs] def mot_eval(self):
assert len(self.dataset) == len(
self._predictions
), "Total number of frames are mismatch"
self._save_all_eval_info(self._predictions)
_pd_pd = self._format_pd_in_motchallenge(self._predictions)
acc, ana = mm.utils.CLEAR_MOT_M(self._gt_pd, _pd_pd, self.seqinfo_path)
# get summary
metrics = mm.metrics.motchallenge_metrics
mh = mm.metrics.create()
summary = mh.compute(
acc,
ana=ana,
metrics=metrics,
name=self.dataset_name,
return_dataframe=False,
return_cached=True,
)
return self.digest_summary(summary)
[docs]@register_evaluator("MOT-HIEVE-EVAL")
class MOT_HiEve_Eval(MOT_JDE_Eval):
"""
A Multiple Object Tracking Evaluator for HiEve
This class evaluates MOT performance of tracking model such as JDE specifically on HiEve
"""
def __init__(
self,
datacatalog_name,
dataset_name,
dataset,
output_dir="./vision_output/",
criteria="MOTA",
):
super().__init__(datacatalog_name, dataset_name, dataset, output_dir, criteria)
self.set_annotation_info(dataset)
mm.lap.default_solver = "munkres"
self._gt_pd = self._load_gt_in_motchallenge(
self.annotation_path, min_confidence=1
)
[docs] def mot_eval(self):
assert len(self.dataset) == len(
self._predictions
), "Total number of frames are mismatch"
self._save_all_eval_info(self._predictions)
_pd_pd = self._format_pd_in_motchallenge(self._predictions)
acc = mm.utils.compare_to_groundtruth(self._gt_pd, _pd_pd)
# get summary
metrics = mm.metrics.motchallenge_metrics
mh = mm.metrics.create()
summary = mh.compute(
acc,
metrics=metrics,
name=self.dataset_name,
return_dataframe=False,
return_cached=True,
)
return self.digest_summary(summary)
[docs]@register_evaluator("YOLO-EVAL")
class YOLOEval(BaseEvaluator):
def __init__(
self,
datacatalog_name,
dataset_name,
dataset,
output_dir="./vision_output/",
criteria="AP50",
):
super().__init__(datacatalog_name, dataset_name, dataset, output_dir, criteria)
self.set_annotation_info(dataset)
[docs]@register_evaluator("VISUAL-QUALITY-EVAL")
class VisualQualityEval(BaseEvaluator):
def __init__(
self,
datacatalog_name,
dataset_name,
dataset,
output_dir="./vision_output/",
criteria="psnr",
):
super().__init__(datacatalog_name, dataset_name, dataset, output_dir, criteria)
self.reset()
[docs] @staticmethod
def compute_psnr(a, b):
mse = torch.mean((a - b) ** 2).item()
return -10 * math.log10(mse)
[docs] @staticmethod
def compute_msssim(a, b):
return ms_ssim(a, b, data_range=1.0).item()
[docs] def reset(self):
self._evaluations = []
self._sum_psnr = 0
self._sum_msssim = 0
self._cc = 0
[docs] def write_results(self, path: str = None):
if path is None:
path = f"{self.output_dir}"
path = Path(path)
if not path.is_dir():
self._logger.info(f"creating output folder: {path}")
path.mkdir(parents=True, exist_ok=True)
with open(f"{path}/{self.output_file_name}.json", "w", encoding="utf-8") as f:
json.dump(self._evaluations, f, ensure_ascii=False, indent=4)
[docs] def digest(self, gt, pred):
ref = gt[0]["image"].unsqueeze(0).cpu()
tst = pred.unsqueeze(0).cpu()
assert ref.shape == tst.shape
psnr = self.compute_psnr(ref, tst)
msssim = self.compute_msssim(ref, tst)
eval_dict = {
"img_id": gt[0]["image_id"],
"img_size": (gt[0]["height"], gt[0]["width"]),
"msssim": msssim,
"psnr": psnr,
}
self._sum_psnr += psnr
self._sum_msssim += msssim
self._evaluations.append(eval_dict)
self._cc += 1
[docs] def results(self, save_path: str = None):
if save_path:
self.write_results(save_path)
self.write_results()
summary = {
"msssim": (self._sum_msssim / self._cc),
"psnr": (self._sum_psnr / self._cc),
}
return summary