# Copyright (c) 2022-2024, InterDigital Communications, Inc
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of InterDigital Communications, Inc nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
# NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import base64
import logging
import re
from glob import glob
from pathlib import Path
from typing import Dict, List
from detectron2.data import DatasetCatalog, MetadataCatalog
from detectron2.data.common import DatasetFromList, MapDataset
from detectron2.data.dataset_mapper import DatasetMapper
from detectron2.data.datasets import load_coco_json, register_coco_instances
from detectron2.data.samplers import InferenceSampler
from detectron2.utils.serialize import PicklableWrapper
from jde.utils.io import read_results
from PIL import Image
from torch.utils.data import Dataset
from compressai_vision.registry import register_datacatalog, register_dataset
from .utils import JDECustomMapper, LinearMapper
def manual_load_data(path, ext):
img_list = sorted(glob(f"{path}/*.{ext}"))
datalist = []
for img_addr in img_list:
img_id = Path(img_addr).stem
img = Image.open(img_addr)
fW, fH = img.size
d = {
"file_name": img_addr,
"height": fH,
"width": fW,
"image_id": img_id,
"annotations": None,
}
datalist.append(d)
return datalist
def bypass_collator(batch):
return batch
[docs]def deccode_compressed_rle(data):
assert isinstance(data, Dict) or isinstance(data, List)
if isinstance(data, Dict):
data = list(data.values())
for anno in data:
segm = anno.get("segmentation", None)
if segm:
# Decode compressed RLEs with base64 to be compatible with pycoco tools
if type(segm) != list and type(segm["counts"]) != list:
segm["counts"] = base64.b64decode(segm["counts"])
class BaseDataset(Dataset):
def __init__(self, root, dataset_name, imgs_folder, **kwargs):
super().__init__()
self.logger = logging.getLogger(self.__class__.__name__)
self.dataset_name = dataset_name
self.annotation_path = None
if "annotation_file" in kwargs:
if kwargs["annotation_file"].lower() != "none":
self.annotation_path = Path(root) / kwargs["annotation_file"]
assert self.annotation_path == kwargs["dataset"].annotation_path
self.seqinfo_path = None
if "seqinfo" in kwargs:
if kwargs["seqinfo"].lower() != "none":
self.seqinfo_path = kwargs["dataset"].seqinfo_path
self.images_folder = Path(root) / imgs_folder
assert self.images_folder == kwargs["dataset"].imgs_folder_path
self.sampler = None
self.collate_fn = None
self.mapDataset = None
self.org_mapper_func = None
self.thing_classes = []
self.thing_dataset_id_to_contiguous_id = []
[docs]@register_dataset("DefaultDataset")
class DefaultDataset(BaseDataset):
"""
Loads an image folder database. testing image samples
are respectively stored in separate directories
(Currently, this class does not support any of the training related operations):
.. code-block:: text
|--rootdir
|-- img000.png
|-- img001.png
Attributes
----------
root : string
root directory of the dataset
transform : (callable, optional)
a function or transform that takes in a PIL image and returns a transformed version
use_BGR : Bool
if True the color order of the sample is BGR otherwise RGB returned
"""
def __init__(
self,
root,
dataset_name,
imgs_folder: str = "valid",
**kwargs,
):
super().__init__(root, dataset_name, imgs_folder, **kwargs)
if not self.images_folder.is_dir():
raise RuntimeError(f'Invalid directory "{root}"')
self.samples = [f for f in sorted(self.images_folder.iterdir()) if f.is_file()]
self.use_BGR = kwargs["use_BGR"]
self.transform = kwargs["transforms"]
self.ret_name = kwargs["ret_name"]
self.sampler = InferenceSampler(len(kwargs["dataset"]))
self.collate_fn = bypass_collator
_dataset = DatasetFromList(kwargs["dataset"].dataset, copy=False)
if "cfg" in kwargs:
if kwargs["cfg"] is not None:
mapper = DatasetMapper(kwargs["cfg"], False)
self.mapDataset = MapDataset(_dataset, mapper)
return
self.mapDataset = MapDataset(_dataset, LinearMapper(bgr=self.use_BGR))
def __getitem__(self, index):
"""
Args:
index (int): Index
Returns:
img: `PIL.Image.Image` or transformed `PIL.Image.Image`.
"""
if self.mapDataset:
return self.mapDataset[index]
raise NotImplementedError
img = Image.open(self.samples[index]).convert("RGB")
if self.use_BGR is True:
r, g, b = img.split()
img = Image.merge("RGB", (b, g, r))
if self.transform:
if self.ret_name is True:
return (self.transform(img), str(self.samples[index]))
return self.transform(img)
if self.ret_name is True:
return (img, str(self.samples[index]))
return img
def __len__(self):
if self.mapDataset:
return len(self.mapDataset)
return len(self.samples)
[docs]@register_dataset("Detectron2Dataset")
class Detectron2Dataset(BaseDataset):
def __init__(self, root, dataset_name, imgs_folder, **kwargs):
super().__init__(root, dataset_name, imgs_folder, **kwargs)
self.dataset = kwargs["dataset"].dataset
try:
DatasetCatalog.get(dataset_name)
except KeyError:
if self.annotation_path:
register_coco_instances(
dataset_name, {}, self.annotation_path, self.images_folder
)
self.logger.info(f'"{dataset_name}" successfully registred.')
self.sampler = InferenceSampler(len(kwargs["dataset"]))
self.collate_fn = bypass_collator
_dataset = DatasetFromList(self.dataset, copy=False)
if kwargs["linear_mapper"] is True:
mapper = LinearMapper()
else:
assert (
kwargs["cfg"] is not None
), "A proper mapper information via cfg must be provided"
mapper = DatasetMapper(kwargs["cfg"], False)
self.mapDataset = MapDataset(_dataset, mapper)
self._org_mapper_func = PicklableWrapper(DatasetMapper(kwargs["cfg"], False))
metaData = MetadataCatalog.get(dataset_name)
try:
self.thing_classes = metaData.thing_classes
self.thing_dataset_id_to_contiguous_id = (
metaData.thing_dataset_id_to_contiguous_id
)
except AttributeError:
self.logger.warning("No attribute: thing_classes")
[docs] def get_org_mapper_func(self):
return self._org_mapper_func
def __getitem__(self, idx):
return self.mapDataset[idx]
def __len__(self):
return len(self.mapDataset)
[docs]@register_dataset("TrackingDataset")
class TrackingDataset(BaseDataset):
def __init__(self, root, dataset_name, imgs_folder, **kwargs):
super().__init__(root, dataset_name, imgs_folder, **kwargs)
self.dataset = kwargs["dataset"].dataset
self.sampler = InferenceSampler(len(kwargs["dataset"]))
self.collate_fn = bypass_collator
_dataset = DatasetFromList(self.dataset, copy=False)
if kwargs["linear_mapper"] is True:
mapper = LinearMapper()
else:
mapper = JDECustomMapper(kwargs["patch_size"])
self.mapDataset = MapDataset(_dataset, mapper)
self._org_mapper_func = PicklableWrapper(JDECustomMapper(kwargs["patch_size"]))
[docs] def get_org_mapper_func(self):
return self._org_mapper_func
def __getitem__(self, idx):
return self.mapDataset[idx]
def __len__(self):
return len(self.mapDataset)
[docs]class DataCatalog:
def __init__(
self,
root,
imgs_folder="images",
annotation_file="sample.json",
seqinfo="seqinfo.ini",
dataset_name="sample_dataset",
ext=".png",
):
self.logger = logging.getLogger(self.__class__.__name__)
_imgs_folder = Path(root) / imgs_folder
if not _imgs_folder.is_dir():
raise RuntimeError(f'Invalid image sample directory "{_imgs_folder}"')
self._annotation_file = None
if annotation_file.lower() != "none":
_annotation_file = Path(root) / annotation_file
if not _annotation_file.is_file():
raise RuntimeError(f'Invalid annotation file "{_annotation_file}"')
self._annotation_file = _annotation_file
else: # annotation_file is not available
self.logger.warning(
"No annotation found, there may be no evaluation output based on groundtruth\n"
)
self._sequence_info_file = None
if seqinfo.lower() != "none":
_sequence_info_file = Path(root) / seqinfo
if not _annotation_file.is_file():
self.logger.warning(
f"Sequence information does not exist at the given path {_sequence_info_file}"
)
self._sequence_info_file = None
else:
self._sequence_info_file = _sequence_info_file
else: # seqinfo is not available
self.logger.warning("No sequence information provided\n")
self._dataset_name = dataset_name
self._dataset = None
self._imgs_folder = _imgs_folder
self._img_ext = ext
@property
def dataset_name(self):
return self._dataset_name
@property
def dataset(self):
return self._dataset
@property
def annotation_path(self):
return self._annotation_file
@property
def seqinfo_path(self):
return self._sequence_info_file
@property
def imgs_folder_path(self):
return self._imgs_folder
def __len__(self):
return len(self._dataset)
# super().__init__(dataset_name, dataset, cfg, imgs_folder_path, annotations_file)
@register_datacatalog("MPEGTVDTRACKING")
class MPEGTVDTRACKING(DataCatalog):
"""Load an image folder database to support testing image samples extracted from MPEG-TVD Objects Tracking videos:
.. code-block:: none
- mpeg-TVD-Tracking/
- annoations/
-
- images/
- 00001.png
- ....png
Args:
root (string): root directory of the dataset
transform (callable, optional): a function or transform that takes in a
PIL image and returns a transformed version
"""
def __init__(
self,
root,
imgs_folder="images",
annotation_file="gt.txt",
seqinfo="seqinfo.ini",
dataset_name="mpeg-tvd-tracking",
ext="png",
):
super().__init__(
root,
imgs_folder=imgs_folder,
annotation_file=annotation_file,
seqinfo=seqinfo,
dataset_name=dataset_name,
ext=ext,
)
self.data_type = "mot"
gt_frame_dict = read_results(
str(self.annotation_path), self.data_type, is_gt=True
)
gt_ignore_frame_dict = read_results(
str(self.annotation_path), self.data_type, is_ignore=True
)
img_lists = sorted(self.imgs_folder_path.glob(f"*.{ext}"))
assert len(gt_frame_dict) == len(gt_ignore_frame_dict)
self._dataset = []
self._gt_labels = gt_frame_dict
self._gt_ignore_labels = gt_ignore_frame_dict
for file_name in img_lists:
img_id = file_name.name.split(f".{ext}")[0]
new_d = {
"file_name": str(file_name),
"image_id": img_id,
"annotations": {
"gt": gt_frame_dict.get(int(img_id), []),
"gt_ignore": gt_ignore_frame_dict.get(int(img_id), []),
},
}
self._dataset.append(new_d)
def get_ground_truth_labels(self, id: int):
return {
"gt": self._gt_labels.get(id, []),
"gt_ignore": self._gt_ignore_labels.get(id, []),
}
def get_min_max_across_tensors(self):
maxv = 48.58344268798828
minv = -4.722218990325928
return (minv, maxv)
@register_datacatalog("MPEGHIEVE")
class MPEGHIEVE(MPEGTVDTRACKING):
"""Load an image folder database to support testing image samples extracted from MPEG-HiEve videos:
.. code-block:: none
- mpeg-HiEve/
- annoations/
-
- images/
- 00001.png
- ....png
Args:
root (string): root directory of the dataset
transform (callable, optional): a function or transform that takes in a
PIL image and returns a transformed version
"""
def __init__(
self,
root,
imgs_folder="images",
annotation_file="gt.txt",
seqinfo="seqinfo.ini",
dataset_name="mpeg-hieve-tracking",
ext="png",
):
super().__init__(
root,
imgs_folder=imgs_folder,
annotation_file=annotation_file,
seqinfo=seqinfo,
dataset_name=dataset_name,
ext=ext,
)
def get_min_max_across_tensors(self):
maxv = 11.823183059692383
minv = -1.0795124769210815
return (minv, maxv)
@register_datacatalog("MPEGOIV6")
class MPEGOIV6(DataCatalog):
"""Load an image folder database to support testing image samples from MPEG-OpenimagesV6:
.. code-block:: none
- mpeg-oiv6/
- annoations/
-
- images/
- 452c856678a9b284.jpg
- ....jpg
Args:
root (string): root directory of the dataset
transform (callable, optional): a function or transform that takes in a
PIL image and returns a transformed version
use_BGR (Bool): if True the color order of the sample is BGR otherwise RGB returned
"""
def __init__(
self,
root,
imgs_folder="images",
annotation_file="mpeg-oiv6-segmentation-coco.json",
seqinfo="",
dataset_name="mpeg-oiv6-segmentation",
ext="",
):
super().__init__(
root,
imgs_folder=imgs_folder,
annotation_file=annotation_file,
seqinfo=seqinfo,
dataset_name=dataset_name,
ext=ext,
)
if self.annotation_path:
self._dataset = load_coco_json(
self.annotation_path, self.imgs_folder_path, dataset_name=dataset_name
)
else:
self._dataset = manual_load_data(self.imgs_folder_path, "jpg")
self.task = "detection"
if "segmentation" in dataset_name:
self.task = "segmentation"
# TODO [hyomin]
# if self.task == 'segmentation':
# self.deccode_compressed_rle(dataset)
def get_min_max_across_tensors(self):
if self.task == "segmentation":
maxv = 28.397489547729492
minv = -26.426830291748047
return (minv, maxv)
assert self.task == "detection"
maxv = 20.246625900268555
minv = -23.09193229675293
return (minv, maxv)
@register_datacatalog("SFUHW")
class SFUHW(DataCatalog):
"""Load an image folder database with Detectron2 Cfg. testing image samples
and annotations are respectively stored in separate directories
(Currently this class supports none of training related operation ):
.. code-block:: none
- rootdir/
- images
- img000.png
- img001.png
- imgxxx.png
- annotations
- xxxx.json
Args:
root (string): root directory of the dataset
"""
def __init__(
self,
root,
imgs_folder="images",
annotation_file=None,
seqinfo="seqinfo.ini",
dataset_name="sfu-hw-object-v1",
ext="png",
):
super().__init__(
root,
imgs_folder=imgs_folder,
annotation_file=annotation_file,
seqinfo=seqinfo,
dataset_name=dataset_name,
ext=ext,
)
self._dataset = load_coco_json(
self.annotation_path, self.imgs_folder_path, dataset_name=dataset_name
)
def get_min_max_across_tensors(self):
# from mpeg-fcvcm
minv = -17.884761810302734
maxv = 16.694171905517578
return (minv, maxv)
@register_datacatalog("COCO")
class COCO(DataCatalog):
"""Load an image folder database with Detectron2 Cfg. testing image samples
and annotations are respectively stored in separate directories
(Currently this class supports none of training related operation ):
.. code-block:: none
- rootdir/
- [train_folder]
- img000.jpg
- img001.jpg
- imgxxx.jpg
- [validation_folder]
- img000.jpg
- img001.jpg
- imgxxx.jpg
- [test_folder]
- img000.jpg
- img001.jpg
- imgxxx.jpg
- annotations
- [instances_val].json
- [captions_val].json
- ...
Args:
root (string): root directory of the dataset
"""
def __init__(
self,
root,
imgs_folder="val2017",
annotation_file="instances_val2017.json",
seqinfo="",
dataset_name="mpeg-coco",
ext="",
):
super().__init__(
root,
imgs_folder=imgs_folder,
annotation_file=annotation_file,
seqinfo=seqinfo,
dataset_name=dataset_name,
ext=ext,
)
self._dataset = load_coco_json(
self.annotation_path, self.imgs_folder_path, dataset_name=dataset_name
)
def get_min_max_across_tensors(self):
raise NotImplementedError
@register_datacatalog("IMAGES")
class IMAGES(DataCatalog):
"""Load an image folder with images and no annotations
(Currently this class supports none of training related operation ):
.. code-block:: none
- rootdir/
- [test_folder]
- img000.jpg
- img001.jpg
- imgxxx.jpg
Args:
root (string): root directory of the dataset
"""
def __init__(
self,
root,
imgs_folder="test",
annotation_file=None,
seqinfo=None,
dataset_name="kodak",
ext="",
):
super().__init__(
root,
imgs_folder,
annotation_file,
seqinfo,
dataset_name,
ext,
)
all_files = [
f
for f in sorted(self.imgs_folder_path.iterdir())
if f.is_file() and f.suffix[1:].lower() == ext.lower()
]
self._dataset = []
for p in all_files:
img_id = re.findall(r"[\d]+", str(Path(p).stem))
assert len(img_id) == 1
fw, fh = Image.open(p).size
d = {
"file_name": str(p),
"height": fh,
"width": fw,
"image_id": img_id[0],
}
self._dataset.append(d)