import mmcv
import numpy as np
import os
import pandas as pd
import tempfile
from lyft_dataset_sdk.lyftdataset import LyftDataset as Lyft
from lyft_dataset_sdk.utils.data_classes import Box as LyftBox
from os import path as osp
from pyquaternion import Quaternion
from mmdet3d.core.evaluation.lyft_eval import lyft_eval
from mmdet.datasets import DATASETS
from ..core import show_result
from ..core.bbox import Box3DMode, Coord3DMode, LiDARInstance3DBoxes
from .custom_3d import Custom3DDataset
from .pipelines import Compose
[docs]@DATASETS.register_module()
class LyftDataset(Custom3DDataset):
r"""Lyft Dataset.
This class serves as the API for experiments on the Lyft Dataset.
Please refer to
`<https://www.kaggle.com/c/3d-object-detection-for-autonomous-vehicles/data>`_
for data downloading.
Args:
ann_file (str): Path of annotation file.
pipeline (list[dict], optional): Pipeline used for data processing.
Defaults to None.
data_root (str): Path of dataset root.
classes (tuple[str], optional): Classes used in the dataset.
Defaults to None.
load_interval (int, optional): Interval of loading the dataset. It is
used to uniformly sample the dataset. Defaults to 1.
modality (dict, optional): Modality to specify the sensor data used
as input. Defaults to None.
box_type_3d (str, optional): Type of 3D box of this dataset.
Based on the `box_type_3d`, the dataset will encapsulate the box
to its original format then converted them to `box_type_3d`.
Defaults to 'LiDAR' in this dataset. Available options includes
- 'LiDAR': Box in LiDAR coordinates.
- 'Depth': Box in depth coordinates, usually for indoor dataset.
- 'Camera': Box in camera coordinates.
filter_empty_gt (bool, optional): Whether to filter empty GT.
Defaults to True.
test_mode (bool, optional): Whether the dataset is in test mode.
Defaults to False.
""" # noqa: E501
NameMapping = {
'bicycle': 'bicycle',
'bus': 'bus',
'car': 'car',
'emergency_vehicle': 'emergency_vehicle',
'motorcycle': 'motorcycle',
'other_vehicle': 'other_vehicle',
'pedestrian': 'pedestrian',
'truck': 'truck',
'animal': 'animal'
}
DefaultAttribute = {
'car': 'is_stationary',
'truck': 'is_stationary',
'bus': 'is_stationary',
'emergency_vehicle': 'is_stationary',
'other_vehicle': 'is_stationary',
'motorcycle': 'is_stationary',
'bicycle': 'is_stationary',
'pedestrian': 'is_stationary',
'animal': 'is_stationary'
}
CLASSES = ('car', 'truck', 'bus', 'emergency_vehicle', 'other_vehicle',
'motorcycle', 'bicycle', 'pedestrian', 'animal')
def __init__(self,
ann_file,
pipeline=None,
data_root=None,
classes=None,
load_interval=1,
modality=None,
box_type_3d='LiDAR',
filter_empty_gt=True,
test_mode=False):
self.load_interval = load_interval
super().__init__(
data_root=data_root,
ann_file=ann_file,
pipeline=pipeline,
classes=classes,
modality=modality,
box_type_3d=box_type_3d,
filter_empty_gt=filter_empty_gt,
test_mode=test_mode)
if self.modality is None:
self.modality = dict(
use_camera=False,
use_lidar=True,
use_radar=False,
use_map=False,
use_external=False,
)
[docs] def load_annotations(self, ann_file):
"""Load annotations from ann_file.
Args:
ann_file (str): Path of the annotation file.
Returns:
list[dict]: List of annotations sorted by timestamps.
"""
data = mmcv.load(ann_file)
data_infos = list(sorted(data['infos'], key=lambda e: e['timestamp']))
data_infos = data_infos[::self.load_interval]
self.metadata = data['metadata']
self.version = self.metadata['version']
return data_infos
[docs] def get_data_info(self, index):
"""Get data info according to the given index.
Args:
index (int): Index of the sample data to get.
Returns:
dict: Data information that will be passed to the data \
preprocessing pipelines. It includes the following keys:
- sample_idx (str): sample index
- pts_filename (str): filename of point clouds
- sweeps (list[dict]): infos of sweeps
- timestamp (float): sample timestamp
- img_filename (str, optional): image filename
- lidar2img (list[np.ndarray], optional): transformations \
from lidar to different cameras
- ann_info (dict): annotation info
"""
info = self.data_infos[index]
# standard protocal modified from SECOND.Pytorch
input_dict = dict(
sample_idx=info['token'],
pts_filename=info['lidar_path'],
sweeps=info['sweeps'],
timestamp=info['timestamp'] / 1e6,
)
if self.modality['use_camera']:
image_paths = []
lidar2img_rts = []
for cam_type, cam_info in info['cams'].items():
image_paths.append(cam_info['data_path'])
# obtain lidar to image transformation matrix
lidar2cam_r = np.linalg.inv(cam_info['sensor2lidar_rotation'])
lidar2cam_t = cam_info[
'sensor2lidar_translation'] @ lidar2cam_r.T
lidar2cam_rt = np.eye(4)
lidar2cam_rt[:3, :3] = lidar2cam_r.T
lidar2cam_rt[3, :3] = -lidar2cam_t
intrinsic = cam_info['cam_intrinsic']
viewpad = np.eye(4)
viewpad[:intrinsic.shape[0], :intrinsic.shape[1]] = intrinsic
lidar2img_rt = (viewpad @ lidar2cam_rt.T)
lidar2img_rts.append(lidar2img_rt)
input_dict.update(
dict(
img_filename=image_paths,
lidar2img=lidar2img_rts,
))
if not self.test_mode:
annos = self.get_ann_info(index)
input_dict['ann_info'] = annos
return input_dict
[docs] def get_ann_info(self, index):
"""Get annotation info according to the given index.
Args:
index (int): Index of the annotation data to get.
Returns:
dict: Annotation information consists of the following keys:
- gt_bboxes_3d (:obj:`LiDARInstance3DBoxes`): \
3D ground truth bboxes.
- gt_labels_3d (np.ndarray): Labels of ground truths.
- gt_names (list[str]): Class names of ground truths.
"""
info = self.data_infos[index]
gt_bboxes_3d = info['gt_boxes']
gt_names_3d = info['gt_names']
gt_labels_3d = []
for cat in gt_names_3d:
if cat in self.CLASSES:
gt_labels_3d.append(self.CLASSES.index(cat))
else:
gt_labels_3d.append(-1)
gt_labels_3d = np.array(gt_labels_3d)
if 'gt_shape' in info:
gt_shape = info['gt_shape']
gt_bboxes_3d = np.concatenate([gt_bboxes_3d, gt_shape], axis=-1)
# the lyft box center is [0.5, 0.5, 0.5], we change it to be
# the same as KITTI (0.5, 0.5, 0)
gt_bboxes_3d = LiDARInstance3DBoxes(
gt_bboxes_3d,
box_dim=gt_bboxes_3d.shape[-1],
origin=(0.5, 0.5, 0.5)).convert_to(self.box_mode_3d)
anns_results = dict(
gt_bboxes_3d=gt_bboxes_3d,
gt_labels_3d=gt_labels_3d,
)
return anns_results
def _format_bbox(self, results, jsonfile_prefix=None):
"""Convert the results to the standard format.
Args:
results (list[dict]): Testing results of the dataset.
jsonfile_prefix (str): The prefix of the output jsonfile.
You can specify the output directory/filename by
modifying the jsonfile_prefix. Default: None.
Returns:
str: Path of the output json file.
"""
lyft_annos = {}
mapped_class_names = self.CLASSES
print('Start to convert detection format...')
for sample_id, det in enumerate(mmcv.track_iter_progress(results)):
annos = []
boxes = output_to_lyft_box(det)
sample_token = self.data_infos[sample_id]['token']
boxes = lidar_lyft_box_to_global(self.data_infos[sample_id], boxes)
for i, box in enumerate(boxes):
name = mapped_class_names[box.label]
lyft_anno = dict(
sample_token=sample_token,
translation=box.center.tolist(),
size=box.wlh.tolist(),
rotation=box.orientation.elements.tolist(),
name=name,
score=box.score)
annos.append(lyft_anno)
lyft_annos[sample_token] = annos
lyft_submissions = {
'meta': self.modality,
'results': lyft_annos,
}
mmcv.mkdir_or_exist(jsonfile_prefix)
res_path = osp.join(jsonfile_prefix, 'results_lyft.json')
print('Results writes to', res_path)
mmcv.dump(lyft_submissions, res_path)
return res_path
def _evaluate_single(self,
result_path,
logger=None,
metric='bbox',
result_name='pts_bbox'):
"""Evaluation for a single model in Lyft protocol.
Args:
result_path (str): Path of the result file.
logger (logging.Logger | str | None): Logger used for printing
related information during evaluation. Default: None.
metric (str): Metric name used for evaluation. Default: 'bbox'.
result_name (str): Result name in the metric prefix.
Default: 'pts_bbox'.
Returns:
dict: Dictionary of evaluation details.
"""
output_dir = osp.join(*osp.split(result_path)[:-1])
lyft = Lyft(
data_path=osp.join(self.data_root, self.version),
json_path=osp.join(self.data_root, self.version, self.version),
verbose=True)
eval_set_map = {
'v1.01-train': 'val',
}
metrics = lyft_eval(lyft, self.data_root, result_path,
eval_set_map[self.version], output_dir, logger)
# record metrics
detail = dict()
metric_prefix = f'{result_name}_Lyft'
for i, name in enumerate(metrics['class_names']):
AP = float(metrics['mAPs_cate'][i])
detail[f'{metric_prefix}/{name}_AP'] = AP
detail[f'{metric_prefix}/mAP'] = metrics['Final mAP']
return detail
[docs] def evaluate(self,
results,
metric='bbox',
logger=None,
jsonfile_prefix=None,
csv_savepath=None,
result_names=['pts_bbox'],
show=False,
out_dir=None,
pipeline=None):
"""Evaluation in Lyft protocol.
Args:
results (list[dict]): Testing results of the dataset.
metric (str | list[str]): Metrics to be evaluated.
logger (logging.Logger | str | None): Logger used for printing
related information during evaluation. Default: None.
jsonfile_prefix (str | None): The prefix of json files. It includes
the file path and the prefix of filename, e.g., "a/b/prefix".
If not specified, a temp file will be created. Default: None.
csv_savepath (str | None): The path for saving csv files.
It includes the file path and the csv filename,
e.g., "a/b/filename.csv". If not specified,
the result will not be converted to csv file.
show (bool): Whether to visualize.
Default: False.
out_dir (str): Path to save the visualization results.
Default: None.
pipeline (list[dict], optional): raw data loading for showing.
Default: None.
Returns:
dict[str, float]: Evaluation results.
"""
result_files, tmp_dir = self.format_results(results, jsonfile_prefix,
csv_savepath)
if isinstance(result_files, dict):
results_dict = dict()
for name in result_names:
print(f'Evaluating bboxes of {name}')
ret_dict = self._evaluate_single(result_files[name])
results_dict.update(ret_dict)
elif isinstance(result_files, str):
results_dict = self._evaluate_single(result_files)
if tmp_dir is not None:
tmp_dir.cleanup()
if show:
self.show(results, out_dir, pipeline=pipeline)
return results_dict
def _build_default_pipeline(self):
"""Build the default pipeline for this dataset."""
pipeline = [
dict(
type='LoadPointsFromFile',
coord_type='LIDAR',
load_dim=5,
use_dim=5,
file_client_args=dict(backend='disk')),
dict(
type='LoadPointsFromMultiSweeps',
sweeps_num=10,
file_client_args=dict(backend='disk')),
dict(
type='DefaultFormatBundle3D',
class_names=self.CLASSES,
with_label=False),
dict(type='Collect3D', keys=['points'])
]
return Compose(pipeline)
[docs] def show(self, results, out_dir, show=True, pipeline=None):
"""Results visualization.
Args:
results (list[dict]): List of bounding boxes results.
out_dir (str): Output directory of visualization result.
show (bool): Visualize the results online.
pipeline (list[dict], optional): raw data loading for showing.
Default: None.
"""
assert out_dir is not None, 'Expect out_dir, got none.'
pipeline = self._get_pipeline(pipeline)
for i, result in enumerate(results):
if 'pts_bbox' in result.keys():
result = result['pts_bbox']
data_info = self.data_infos[i]
pts_path = data_info['lidar_path']
file_name = osp.split(pts_path)[-1].split('.')[0]
points = self._extract_data(i, pipeline, 'points').numpy()
points = Coord3DMode.convert_point(points, Coord3DMode.LIDAR,
Coord3DMode.DEPTH)
inds = result['scores_3d'] > 0.1
gt_bboxes = self.get_ann_info(i)['gt_bboxes_3d'].tensor.numpy()
show_gt_bboxes = Box3DMode.convert(gt_bboxes, Box3DMode.LIDAR,
Box3DMode.DEPTH)
pred_bboxes = result['boxes_3d'][inds].tensor.numpy()
show_pred_bboxes = Box3DMode.convert(pred_bboxes, Box3DMode.LIDAR,
Box3DMode.DEPTH)
show_result(points, show_gt_bboxes, show_pred_bboxes, out_dir,
file_name, show)
[docs] def json2csv(self, json_path, csv_savepath):
"""Convert the json file to csv format for submission.
Args:
json_path (str): Path of the result json file.
csv_savepath (str): Path to save the csv file.
"""
results = mmcv.load(json_path)['results']
sample_list_path = osp.join(self.data_root, 'sample_submission.csv')
data = pd.read_csv(sample_list_path)
Id_list = list(data['Id'])
pred_list = list(data['PredictionString'])
cnt = 0
print('Converting the json to csv...')
for token in results.keys():
cnt += 1
predictions = results[token]
prediction_str = ''
for i in range(len(predictions)):
prediction_str += \
str(predictions[i]['score']) + ' ' + \
str(predictions[i]['translation'][0]) + ' ' + \
str(predictions[i]['translation'][1]) + ' ' + \
str(predictions[i]['translation'][2]) + ' ' + \
str(predictions[i]['size'][0]) + ' ' + \
str(predictions[i]['size'][1]) + ' ' + \
str(predictions[i]['size'][2]) + ' ' + \
str(Quaternion(list(predictions[i]['rotation']))
.yaw_pitch_roll[0]) + ' ' + \
predictions[i]['name'] + ' '
prediction_str = prediction_str[:-1]
idx = Id_list.index(token)
pred_list[idx] = prediction_str
df = pd.DataFrame({'Id': Id_list, 'PredictionString': pred_list})
mmcv.mkdir_or_exist(os.path.dirname(csv_savepath))
df.to_csv(csv_savepath, index=False)
def output_to_lyft_box(detection):
"""Convert the output to the box class in the Lyft.
Args:
detection (dict): Detection results.
Returns:
list[:obj:`LyftBox`]: List of standard LyftBoxes.
"""
box3d = detection['boxes_3d']
scores = detection['scores_3d'].numpy()
labels = detection['labels_3d'].numpy()
box_gravity_center = box3d.gravity_center.numpy()
box_dims = box3d.dims.numpy()
box_yaw = box3d.yaw.numpy()
# TODO: check whether this is necessary
# with dir_offset & dir_limit in the head
box_yaw = -box_yaw - np.pi / 2
box_list = []
for i in range(len(box3d)):
quat = Quaternion(axis=[0, 0, 1], radians=box_yaw[i])
box = LyftBox(
box_gravity_center[i],
box_dims[i],
quat,
label=labels[i],
score=scores[i])
box_list.append(box)
return box_list
def lidar_lyft_box_to_global(info, boxes):
"""Convert the box from ego to global coordinate.
Args:
info (dict): Info for a specific sample data, including the
calibration information.
boxes (list[:obj:`LyftBox`]): List of predicted LyftBoxes.
Returns:
list: List of standard LyftBoxes in the global
coordinate.
"""
box_list = []
for box in boxes:
# Move box to ego vehicle coord system
box.rotate(Quaternion(info['lidar2ego_rotation']))
box.translate(np.array(info['lidar2ego_translation']))
# Move box to global coord system
box.rotate(Quaternion(info['ego2global_rotation']))
box.translate(np.array(info['ego2global_translation']))
box_list.append(box)
return box_list