Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Archana_Badagi/food-round2
  • eric_a_scuccimarra/food-round2
  • joel_joseph/food-round2
  • darthgera123/food-round2
  • reshmarameshbabu/food-round2
  • nikhil_rayaprolu/food-round2
6 results
Show changes
Showing with 2129 additions and 30 deletions
import inspect
from functools import partial
import mmcv
class Registry(object):
def __init__(self, name):
self._name = name
self._module_dict = dict()
def __repr__(self):
format_str = self.__class__.__name__ + '(name={}, items={})'.format(
self._name, list(self._module_dict.keys()))
return format_str
@property
def name(self):
return self._name
@property
def module_dict(self):
return self._module_dict
def get(self, key):
return self._module_dict.get(key, None)
def _register_module(self, module_class, force=False):
"""Register a module.
Args:
module (:obj:`nn.Module`): Module to be registered.
"""
if not inspect.isclass(module_class):
raise TypeError('module must be a class, but got {}'.format(
type(module_class)))
module_name = module_class.__name__
if not force and module_name in self._module_dict:
raise KeyError('{} is already registered in {}'.format(
module_name, self.name))
self._module_dict[module_name] = module_class
def register_module(self, cls=None, force=False):
if cls is None:
return partial(self.register_module, force=force)
self._register_module(cls, force=force)
return cls
def build_from_cfg(cfg, registry, default_args=None):
"""Build a module from config dict.
Args:
cfg (dict): Config dict. It should at least contain the key "type".
registry (:obj:`Registry`): The registry to search the type from.
default_args (dict, optional): Default initialization arguments.
Returns:
obj: The constructed object.
"""
assert isinstance(cfg, dict) and 'type' in cfg
assert isinstance(default_args, dict) or default_args is None
args = cfg.copy()
obj_type = args.pop('type')
if mmcv.is_str(obj_type):
obj_cls = registry.get(obj_type)
if obj_cls is None:
raise KeyError('{} is not in the {} registry'.format(
obj_type, registry.name))
elif inspect.isclass(obj_type):
obj_cls = obj_type
else:
raise TypeError('type must be a str or valid type, but got {}'.format(
type(obj_type)))
if default_args is not None:
for name, value in default_args.items():
args.setdefault(name, value)
return obj_cls(**args)
# -*- coding: utf-8 -*-
"""
This module defines the :class:`NiceRepr` mixin class, which defines a
``__repr__`` and ``__str__`` method that only depend on a custom ``__nice__``
method, which you must define. This means you only have to overload one
function instead of two. Furthermore, if the object defines a ``__len__``
method, then the ``__nice__`` method defaults to something sensible, otherwise
it is treated as abstract and raises ``NotImplementedError``.
To use simply have your object inherit from :class:`NiceRepr`
(multi-inheritance should be ok).
This code was copied from the ubelt library: https://github.com/Erotemic/ubelt
Example:
>>> # Objects that define __nice__ have a default __str__ and __repr__
>>> class Student(NiceRepr):
... def __init__(self, name):
... self.name = name
... def __nice__(self):
... return self.name
>>> s1 = Student('Alice')
>>> s2 = Student('Bob')
>>> print('s1 = {}'.format(s1))
>>> print('s2 = {}'.format(s2))
s1 = <Student(Alice)>
s2 = <Student(Bob)>
Example:
>>> # Objects that define __len__ have a default __nice__
>>> class Group(NiceRepr):
... def __init__(self, data):
... self.data = data
... def __len__(self):
... return len(self.data)
>>> g = Group([1, 2, 3])
>>> print('g = {}'.format(g))
g = <Group(3)>
"""
import warnings
class NiceRepr(object):
"""
Inherit from this class and define ``__nice__`` to "nicely" print your
objects.
Defines ``__str__`` and ``__repr__`` in terms of ``__nice__`` function
Classes that inherit from :class:`NiceRepr` should redefine ``__nice__``.
If the inheriting class has a ``__len__``, method then the default
``__nice__`` method will return its length.
Example:
>>> class Foo(NiceRepr):
... def __nice__(self):
... return 'info'
>>> foo = Foo()
>>> assert str(foo) == '<Foo(info)>'
>>> assert repr(foo).startswith('<Foo(info) at ')
Example:
>>> class Bar(NiceRepr):
... pass
>>> bar = Bar()
>>> import pytest
>>> with pytest.warns(None) as record:
>>> assert 'object at' in str(bar)
>>> assert 'object at' in repr(bar)
Example:
>>> class Baz(NiceRepr):
... def __len__(self):
... return 5
>>> baz = Baz()
>>> assert str(baz) == '<Baz(5)>'
"""
def __nice__(self):
if hasattr(self, '__len__'):
# It is a common pattern for objects to use __len__ in __nice__
# As a convenience we define a default __nice__ for these objects
return str(len(self))
else:
# In all other cases force the subclass to overload __nice__
raise NotImplementedError(
'Define the __nice__ method for {!r}'.format(self.__class__))
def __repr__(self):
try:
nice = self.__nice__()
classname = self.__class__.__name__
return '<{0}({1}) at {2}>'.format(classname, nice, hex(id(self)))
except NotImplementedError as ex:
warnings.warn(str(ex), category=RuntimeWarning)
return object.__repr__(self)
def __str__(self):
try:
classname = self.__class__.__name__
nice = self.__nice__()
return '<{0}({1})>'.format(classname, nice)
except NotImplementedError as ex:
warnings.warn(str(ex), category=RuntimeWarning)
return object.__repr__(self)
[pytest]
addopts = --xdoctest --xdoctest-style=auto
norecursedirs = .git ignore build __pycache__ data docker docs .eggs
filterwarnings= default
ignore:.*No cfgstr given in Cacher constructor or call.*:Warning
ignore:.*Define the __nice__ method for.*:Warning
-r requirements/runtime.txt
-r requirements/optional.txt
-r requirements/tests.txt
-r requirements/build.txt
# These must be installed before building mmdetection
cython
numpy
torch>=1.1
albumentations>=0.3.2
imagecorruptions
matplotlib
mmcv>=0.2.15
numpy
# need older pillow until torchvision is fixed
Pillow<=6.2.2
six
terminaltables
torch>=1.1
torchvision
asynctest
codecov
flake8
isort
pytest
pytest-cov
pytest-runner
xdoctest >= 0.10.0
yapf
# Note: used for kwarray.group_items, this may be ported to mmcv in the future.
kwarray
#!/bin/bash
python tools/infer.py configs/htc_r50.py epoch_20.pth --json_out $AICROWD_PREDICTIONS_OUTPUT_PATH
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import platform
import subprocess
import time
from setuptools import find_packages, setup
from setuptools import Extension, dist, find_packages, setup
import torch
from torch.utils.cpp_extension import BuildExtension, CUDAExtension
dist.Distribution().fetch_build_eggs(['Cython', 'numpy>=1.11.1'])
import numpy as np # noqa: E402, isort:skip
from Cython.Build import cythonize # noqa: E402, isort:skip
def readme():
......@@ -10,11 +20,14 @@ def readme():
return content
MAJOR = 0
MINOR = 6
MAJOR = 1
MINOR = 0
PATCH = 0
SUFFIX = ''
SHORT_VERSION = '{}.{}.{}{}'.format(MAJOR, MINOR, PATCH, SUFFIX)
if PATCH:
SHORT_VERSION = '{}.{}.{}{}'.format(MAJOR, MINOR, PATCH, SUFFIX)
else:
SHORT_VERSION = '{}.{}{}'.format(MAJOR, MINOR, SUFFIX)
version_file = 'mmdet/version.py'
......@@ -80,13 +93,134 @@ def get_version():
return locals()['__version__']
def make_cuda_ext(name, module, sources):
define_macros = []
if torch.cuda.is_available() or os.getenv('FORCE_CUDA', '0') == '1':
define_macros += [("WITH_CUDA", None)]
else:
raise EnvironmentError('CUDA is required to compile MMDetection!')
return CUDAExtension(
name='{}.{}'.format(module, name),
sources=[os.path.join(*module.split('.'), p) for p in sources],
define_macros=define_macros,
extra_compile_args={
'cxx': [],
'nvcc': [
'-D__CUDA_NO_HALF_OPERATORS__',
'-D__CUDA_NO_HALF_CONVERSIONS__',
'-D__CUDA_NO_HALF2_OPERATORS__',
]
})
def make_cython_ext(name, module, sources):
extra_compile_args = None
if platform.system() != 'Windows':
extra_compile_args = {
'cxx': ['-Wno-unused-function', '-Wno-write-strings']
}
extension = Extension(
'{}.{}'.format(module, name),
[os.path.join(*module.split('.'), p) for p in sources],
include_dirs=[np.get_include()],
language='c++',
extra_compile_args=extra_compile_args)
extension, = cythonize(extension)
return extension
def parse_requirements(fname='requirements.txt', with_version=True):
"""
Parse the package dependencies listed in a requirements file but strips
specific versioning information.
Args:
fname (str): path to requirements file
with_version (bool, default=False): if True include version specs
Returns:
List[str]: list of requirements items
CommandLine:
python -c "import setup; print(setup.parse_requirements())"
"""
import sys
from os.path import exists
import re
require_fpath = fname
def parse_line(line):
"""
Parse information from a line in a requirements text file
"""
if line.startswith('-r '):
# Allow specifying requirements in other files
target = line.split(' ')[1]
for info in parse_require_file(target):
yield info
else:
info = {'line': line}
if line.startswith('-e '):
info['package'] = line.split('#egg=')[1]
else:
# Remove versioning from the package
pat = '(' + '|'.join(['>=', '==', '>']) + ')'
parts = re.split(pat, line, maxsplit=1)
parts = [p.strip() for p in parts]
info['package'] = parts[0]
if len(parts) > 1:
op, rest = parts[1:]
if ';' in rest:
# Handle platform specific dependencies
# http://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-platform-specific-dependencies
version, platform_deps = map(str.strip,
rest.split(';'))
info['platform_deps'] = platform_deps
else:
version = rest # NOQA
info['version'] = (op, version)
yield info
def parse_require_file(fpath):
with open(fpath, 'r') as f:
for line in f.readlines():
line = line.strip()
if line and not line.startswith('#'):
for info in parse_line(line):
yield info
def gen_packages_items():
if exists(require_fpath):
for info in parse_require_file(require_fpath):
parts = [info['package']]
if with_version and 'version' in info:
parts.extend(info['version'])
if not sys.version.startswith('3.4'):
# apparently package_deps are broken in 3.4
platform_deps = info.get('platform_deps')
if platform_deps is not None:
parts.append(';' + platform_deps)
item = ''.join(parts)
yield item
packages = list(gen_packages_items())
return packages
if __name__ == '__main__':
write_version_py()
setup(
name='mmdet',
version=get_version(),
description='Open MMLab Detection Toolbox',
description='Open MMLab Detection Toolbox and Benchmark',
long_description=readme(),
author='OpenMMLab',
author_email='chenkaidev@gmail.com',
keywords='computer vision, object detection',
url='https://github.com/open-mmlab/mmdetection',
packages=find_packages(exclude=('configs', 'tools', 'demo')),
......@@ -95,18 +229,73 @@ if __name__ == '__main__':
'Development Status :: 4 - Beta',
'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
],
license='Apache License 2.0',
setup_requires=['pytest-runner'],
tests_require=['pytest'],
install_requires=[
'mmcv>=0.2.6', 'numpy', 'matplotlib', 'six', 'terminaltables',
'pycocotools'
setup_requires=parse_requirements('requirements/build.txt'),
tests_require=parse_requirements('requirements/tests.txt'),
install_requires=parse_requirements('requirements/runtime.txt'),
extras_require={
'all': parse_requirements('requirements.txt'),
'tests': parse_requirements('requirements/tests.txt'),
'build': parse_requirements('requirements/build.txt'),
'optional': parse_requirements('requirements/optional.txt'),
},
ext_modules=[
make_cuda_ext(
name='compiling_info',
module='mmdet.ops.utils',
sources=['src/compiling_info.cpp']),
make_cython_ext(
name='soft_nms_cpu',
module='mmdet.ops.nms',
sources=['src/soft_nms_cpu.pyx']),
make_cuda_ext(
name='nms_cpu',
module='mmdet.ops.nms',
sources=['src/nms_cpu.cpp']),
make_cuda_ext(
name='nms_cuda',
module='mmdet.ops.nms',
sources=['src/nms_cuda.cpp', 'src/nms_kernel.cu']),
make_cuda_ext(
name='roi_align_cuda',
module='mmdet.ops.roi_align',
sources=['src/roi_align_cuda.cpp', 'src/roi_align_kernel.cu']),
make_cuda_ext(
name='roi_pool_cuda',
module='mmdet.ops.roi_pool',
sources=['src/roi_pool_cuda.cpp', 'src/roi_pool_kernel.cu']),
make_cuda_ext(
name='deform_conv_cuda',
module='mmdet.ops.dcn',
sources=[
'src/deform_conv_cuda.cpp',
'src/deform_conv_cuda_kernel.cu'
]),
make_cuda_ext(
name='deform_pool_cuda',
module='mmdet.ops.dcn',
sources=[
'src/deform_pool_cuda.cpp',
'src/deform_pool_cuda_kernel.cu'
]),
make_cuda_ext(
name='sigmoid_focal_loss_cuda',
module='mmdet.ops.sigmoid_focal_loss',
sources=[
'src/sigmoid_focal_loss.cpp',
'src/sigmoid_focal_loss_cuda.cu'
]),
make_cuda_ext(
name='masked_conv2d_cuda',
module='mmdet.ops.masked_conv',
sources=[
'src/masked_conv2d_cuda.cpp', 'src/masked_conv2d_kernel.cu'
]),
],
cmdclass={'build_ext': BuildExtension},
zip_safe=False)
# coding: utf-8
import asyncio
import os
import shutil
import urllib
import mmcv
import torch
from mmdet.apis import (async_inference_detector, inference_detector,
init_detector, show_result)
from mmdet.utils.contextmanagers import concurrent
from mmdet.utils.profiling import profile_time
async def main():
"""
Benchmark between async and synchronous inference interfaces.
Sample runs for 20 demo images on K80 GPU, model - mask_rcnn_r50_fpn_1x:
async sync
7981.79 ms 9660.82 ms
8074.52 ms 9660.94 ms
7976.44 ms 9406.83 ms
Async variant takes about 0.83-0.85 of the time of the synchronous
interface.
"""
project_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
config_file = os.path.join(project_dir, 'configs/mask_rcnn_r50_fpn_1x.py')
checkpoint_file = os.path.join(
project_dir, 'checkpoints/mask_rcnn_r50_fpn_1x_20181010-069fa190.pth')
if not os.path.exists(checkpoint_file):
url = ('https://s3.ap-northeast-2.amazonaws.com/open-mmlab/mmdetection'
'/models/mask_rcnn_r50_fpn_1x_20181010-069fa190.pth')
print('Downloading {} ...'.format(url))
local_filename, _ = urllib.request.urlretrieve(url)
os.makedirs(os.path.dirname(checkpoint_file), exist_ok=True)
shutil.move(local_filename, checkpoint_file)
print('Saved as {}'.format(checkpoint_file))
else:
print('Using existing checkpoint {}'.format(checkpoint_file))
device = 'cuda:0'
model = init_detector(
config_file, checkpoint=checkpoint_file, device=device)
# queue is used for concurrent inference of multiple images
streamqueue = asyncio.Queue()
# queue size defines concurrency level
streamqueue_size = 4
for _ in range(streamqueue_size):
streamqueue.put_nowait(torch.cuda.Stream(device=device))
# test a single image and show the results
img = mmcv.imread(os.path.join(project_dir, 'demo/demo.jpg'))
# warmup
await async_inference_detector(model, img)
async def detect(img):
async with concurrent(streamqueue):
return await async_inference_detector(model, img)
num_of_images = 20
with profile_time('benchmark', 'async'):
tasks = [
asyncio.create_task(detect(img)) for _ in range(num_of_images)
]
async_results = await asyncio.gather(*tasks)
with torch.cuda.stream(torch.cuda.default_stream()):
with profile_time('benchmark', 'sync'):
sync_results = [
inference_detector(model, img) for _ in range(num_of_images)
]
result_dir = os.path.join(project_dir, 'demo')
show_result(
img,
async_results[0],
model.CLASSES,
score_thr=0.5,
show=False,
out_file=os.path.join(result_dir, 'result_async.jpg'))
show_result(
img,
sync_results[0],
model.CLASSES,
score_thr=0.5,
show=False,
out_file=os.path.join(result_dir, 'result_sync.jpg'))
if __name__ == '__main__':
asyncio.run(main())
"""
Tests the Assigner objects.
CommandLine:
pytest tests/test_assigner.py
xdoctest tests/test_assigner.py zero
"""
import torch
from mmdet.core import MaxIoUAssigner
from mmdet.core.bbox.assigners import ApproxMaxIoUAssigner, PointAssigner
def test_max_iou_assigner():
self = MaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
)
bboxes = torch.FloatTensor([
[0, 0, 10, 10],
[10, 10, 20, 20],
[5, 5, 15, 15],
[32, 32, 38, 42],
])
gt_bboxes = torch.FloatTensor([
[0, 0, 10, 9],
[0, 10, 10, 19],
])
gt_labels = torch.LongTensor([2, 3])
assign_result = self.assign(bboxes, gt_bboxes, gt_labels=gt_labels)
assert len(assign_result.gt_inds) == 4
assert len(assign_result.labels) == 4
expected_gt_inds = torch.LongTensor([1, 0, 2, 0])
assert torch.all(assign_result.gt_inds == expected_gt_inds)
def test_max_iou_assigner_with_ignore():
self = MaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
ignore_iof_thr=0.5,
ignore_wrt_candidates=False,
)
bboxes = torch.FloatTensor([
[0, 0, 10, 10],
[10, 10, 20, 20],
[5, 5, 15, 15],
[32, 32, 38, 42],
])
gt_bboxes = torch.FloatTensor([
[0, 0, 10, 9],
[0, 10, 10, 19],
])
gt_bboxes_ignore = torch.Tensor([
[30, 30, 40, 40],
])
assign_result = self.assign(
bboxes, gt_bboxes, gt_bboxes_ignore=gt_bboxes_ignore)
expected_gt_inds = torch.LongTensor([1, 0, 2, -1])
assert torch.all(assign_result.gt_inds == expected_gt_inds)
def test_max_iou_assigner_with_empty_gt():
"""
Test corner case where an image might have no true detections
"""
self = MaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
)
bboxes = torch.FloatTensor([
[0, 0, 10, 10],
[10, 10, 20, 20],
[5, 5, 15, 15],
[32, 32, 38, 42],
])
gt_bboxes = torch.FloatTensor([])
assign_result = self.assign(bboxes, gt_bboxes)
expected_gt_inds = torch.LongTensor([0, 0, 0, 0])
assert torch.all(assign_result.gt_inds == expected_gt_inds)
def test_max_iou_assigner_with_empty_boxes():
"""
Test corner case where an network might predict no boxes
"""
self = MaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
)
bboxes = torch.empty((0, 4))
gt_bboxes = torch.FloatTensor([
[0, 0, 10, 9],
[0, 10, 10, 19],
])
gt_labels = torch.LongTensor([2, 3])
# Test with gt_labels
assign_result = self.assign(bboxes, gt_bboxes, gt_labels=gt_labels)
assert len(assign_result.gt_inds) == 0
assert tuple(assign_result.labels.shape) == (0, )
# Test without gt_labels
assign_result = self.assign(bboxes, gt_bboxes, gt_labels=None)
assert len(assign_result.gt_inds) == 0
assert assign_result.labels is None
def test_max_iou_assigner_with_empty_boxes_and_gt():
"""
Test corner case where an network might predict no boxes and no gt
"""
self = MaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
)
bboxes = torch.empty((0, 4))
gt_bboxes = torch.empty((0, 4))
assign_result = self.assign(bboxes, gt_bboxes)
assert len(assign_result.gt_inds) == 0
def test_point_assigner():
self = PointAssigner()
points = torch.FloatTensor([ # [x, y, stride]
[0, 0, 1],
[10, 10, 1],
[5, 5, 1],
[32, 32, 1],
])
gt_bboxes = torch.FloatTensor([
[0, 0, 10, 9],
[0, 10, 10, 19],
])
assign_result = self.assign(points, gt_bboxes)
expected_gt_inds = torch.LongTensor([1, 2, 1, 0])
assert torch.all(assign_result.gt_inds == expected_gt_inds)
def test_point_assigner_with_empty_gt():
"""
Test corner case where an image might have no true detections
"""
self = PointAssigner()
points = torch.FloatTensor([ # [x, y, stride]
[0, 0, 1],
[10, 10, 1],
[5, 5, 1],
[32, 32, 1],
])
gt_bboxes = torch.FloatTensor([])
assign_result = self.assign(points, gt_bboxes)
expected_gt_inds = torch.LongTensor([0, 0, 0, 0])
assert torch.all(assign_result.gt_inds == expected_gt_inds)
def test_point_assigner_with_empty_boxes_and_gt():
"""
Test corner case where an image might predict no points and no gt
"""
self = PointAssigner()
points = torch.FloatTensor([])
gt_bboxes = torch.FloatTensor([])
assign_result = self.assign(points, gt_bboxes)
assert len(assign_result.gt_inds) == 0
def test_approx_iou_assigner():
self = ApproxMaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
)
bboxes = torch.FloatTensor([
[0, 0, 10, 10],
[10, 10, 20, 20],
[5, 5, 15, 15],
[32, 32, 38, 42],
])
gt_bboxes = torch.FloatTensor([
[0, 0, 10, 9],
[0, 10, 10, 19],
])
approxs_per_octave = 1
approxs = bboxes
squares = bboxes
assign_result = self.assign(approxs, squares, approxs_per_octave,
gt_bboxes)
expected_gt_inds = torch.LongTensor([1, 0, 2, 0])
assert torch.all(assign_result.gt_inds == expected_gt_inds)
def test_approx_iou_assigner_with_empty_gt():
"""
Test corner case where an image might have no true detections
"""
self = ApproxMaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
)
bboxes = torch.FloatTensor([
[0, 0, 10, 10],
[10, 10, 20, 20],
[5, 5, 15, 15],
[32, 32, 38, 42],
])
gt_bboxes = torch.FloatTensor([])
approxs_per_octave = 1
approxs = bboxes
squares = bboxes
assign_result = self.assign(approxs, squares, approxs_per_octave,
gt_bboxes)
expected_gt_inds = torch.LongTensor([0, 0, 0, 0])
assert torch.all(assign_result.gt_inds == expected_gt_inds)
def test_approx_iou_assigner_with_empty_boxes():
"""
Test corner case where an network might predict no boxes
"""
self = ApproxMaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
)
bboxes = torch.empty((0, 4))
gt_bboxes = torch.FloatTensor([
[0, 0, 10, 9],
[0, 10, 10, 19],
])
approxs_per_octave = 1
approxs = bboxes
squares = bboxes
assign_result = self.assign(approxs, squares, approxs_per_octave,
gt_bboxes)
assert len(assign_result.gt_inds) == 0
def test_approx_iou_assigner_with_empty_boxes_and_gt():
"""
Test corner case where an network might predict no boxes and no gt
"""
self = ApproxMaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
)
bboxes = torch.empty((0, 4))
gt_bboxes = torch.empty((0, 4))
approxs_per_octave = 1
approxs = bboxes
squares = bboxes
assign_result = self.assign(approxs, squares, approxs_per_octave,
gt_bboxes)
assert len(assign_result.gt_inds) == 0
def test_random_assign_result():
"""
Test random instantiation of assign result to catch corner cases
"""
from mmdet.core.bbox.assigners.assign_result import AssignResult
AssignResult.random()
AssignResult.random(num_gts=0, num_preds=0)
AssignResult.random(num_gts=0, num_preds=3)
AssignResult.random(num_gts=3, num_preds=3)
AssignResult.random(num_gts=0, num_preds=3)
AssignResult.random(num_gts=7, num_preds=7)
AssignResult.random(num_gts=7, num_preds=64)
AssignResult.random(num_gts=24, num_preds=3)
"""Tests for async interface."""
import asyncio
import os
import sys
import asynctest
import mmcv
import torch
from mmdet.apis import async_inference_detector, init_detector
if sys.version_info >= (3, 7):
from mmdet.utils.contextmanagers import concurrent
class AsyncTestCase(asynctest.TestCase):
use_default_loop = False
forbid_get_event_loop = True
TEST_TIMEOUT = int(os.getenv("ASYNCIO_TEST_TIMEOUT", "30"))
def _run_test_method(self, method):
result = method()
if asyncio.iscoroutine(result):
self.loop.run_until_complete(
asyncio.wait_for(result, timeout=self.TEST_TIMEOUT))
class MaskRCNNDetector:
def __init__(self,
model_config,
checkpoint=None,
streamqueue_size=3,
device="cuda:0"):
self.streamqueue_size = streamqueue_size
self.device = device
# build the model and load checkpoint
self.model = init_detector(
model_config, checkpoint=None, device=self.device)
self.streamqueue = None
async def init(self):
self.streamqueue = asyncio.Queue()
for _ in range(self.streamqueue_size):
stream = torch.cuda.Stream(device=self.device)
self.streamqueue.put_nowait(stream)
if sys.version_info >= (3, 7):
async def apredict(self, img):
if isinstance(img, str):
img = mmcv.imread(img)
async with concurrent(self.streamqueue):
result = await async_inference_detector(self.model, img)
return result
class AsyncInferenceTestCase(AsyncTestCase):
if sys.version_info >= (3, 7):
async def test_simple_inference(self):
if not torch.cuda.is_available():
import pytest
pytest.skip("test requires GPU and torch+cuda")
root_dir = os.path.dirname(os.path.dirname(__name__))
model_config = os.path.join(root_dir,
"configs/mask_rcnn_r50_fpn_1x.py")
detector = MaskRCNNDetector(model_config)
await detector.init()
img_path = os.path.join(root_dir, "demo/demo.jpg")
bboxes, _ = await detector.apredict(img_path)
self.assertTrue(bboxes)
from os.path import dirname, exists, join
def _get_config_directory():
""" Find the predefined detector config directory """
try:
# Assume we are running in the source mmdetection repo
repo_dpath = dirname(dirname(__file__))
except NameError:
# For IPython development when this __file__ is not defined
import mmdet
repo_dpath = dirname(dirname(mmdet.__file__))
config_dpath = join(repo_dpath, 'configs')
if not exists(config_dpath):
raise Exception('Cannot find config path')
return config_dpath
def test_config_build_detector():
"""
Test that all detection models defined in the configs can be initialized.
"""
from xdoctest.utils import import_module_from_path
from mmdet.models import build_detector
config_dpath = _get_config_directory()
print('Found config_dpath = {!r}'.format(config_dpath))
# import glob
# config_fpaths = list(glob.glob(join(config_dpath, '**', '*.py')))
# config_names = [relpath(p, config_dpath) for p in config_fpaths]
# Only tests a representative subset of configurations
config_names = [
# 'dcn/faster_rcnn_dconv_c3-c5_r50_fpn_1x.py',
# 'dcn/cascade_mask_rcnn_dconv_c3-c5_r50_fpn_1x.py',
# 'dcn/faster_rcnn_dpool_r50_fpn_1x.py',
'dcn/mask_rcnn_dconv_c3-c5_r50_fpn_1x.py',
# 'dcn/faster_rcnn_dconv_c3-c5_x101_32x4d_fpn_1x.py',
# 'dcn/cascade_rcnn_dconv_c3-c5_r50_fpn_1x.py',
# 'dcn/faster_rcnn_mdpool_r50_fpn_1x.py',
# 'dcn/faster_rcnn_mdconv_c3-c5_group4_r50_fpn_1x.py',
# 'dcn/faster_rcnn_mdconv_c3-c5_r50_fpn_1x.py',
# ---
# 'htc/htc_x101_32x4d_fpn_20e_16gpu.py',
'htc/htc_without_semantic_r50_fpn_1x.py',
# 'htc/htc_dconv_c3-c5_mstrain_400_1400_x101_64x4d_fpn_20e.py',
# 'htc/htc_x101_64x4d_fpn_20e_16gpu.py',
# 'htc/htc_r50_fpn_1x.py',
# 'htc/htc_r101_fpn_20e.py',
# 'htc/htc_r50_fpn_20e.py',
# ---
'cityscapes/mask_rcnn_r50_fpn_1x_cityscapes.py',
# 'cityscapes/faster_rcnn_r50_fpn_1x_cityscapes.py',
# ---
# 'scratch/scratch_faster_rcnn_r50_fpn_gn_6x.py',
# 'scratch/scratch_mask_rcnn_r50_fpn_gn_6x.py',
# ---
# 'grid_rcnn/grid_rcnn_gn_head_x101_32x4d_fpn_2x.py',
'grid_rcnn/grid_rcnn_gn_head_r50_fpn_2x.py',
# ---
'double_heads/dh_faster_rcnn_r50_fpn_1x.py',
# ---
'empirical_attention/faster_rcnn_r50_fpn_attention_0010_dcn_1x.py',
# 'empirical_attention/faster_rcnn_r50_fpn_attention_1111_1x.py',
# 'empirical_attention/faster_rcnn_r50_fpn_attention_0010_1x.py',
# 'empirical_attention/faster_rcnn_r50_fpn_attention_1111_dcn_1x.py',
# ---
# 'ms_rcnn/ms_rcnn_r101_caffe_fpn_1x.py',
# 'ms_rcnn/ms_rcnn_x101_64x4d_fpn_1x.py',
# 'ms_rcnn/ms_rcnn_r50_caffe_fpn_1x.py',
# ---
# 'guided_anchoring/ga_faster_x101_32x4d_fpn_1x.py',
# 'guided_anchoring/ga_rpn_x101_32x4d_fpn_1x.py',
# 'guided_anchoring/ga_retinanet_r50_caffe_fpn_1x.py',
# 'guided_anchoring/ga_fast_r50_caffe_fpn_1x.py',
# 'guided_anchoring/ga_retinanet_x101_32x4d_fpn_1x.py',
# 'guided_anchoring/ga_rpn_r101_caffe_rpn_1x.py',
# 'guided_anchoring/ga_faster_r50_caffe_fpn_1x.py',
'guided_anchoring/ga_rpn_r50_caffe_fpn_1x.py',
# ---
'foveabox/fovea_r50_fpn_4gpu_1x.py',
# 'foveabox/fovea_align_gn_ms_r101_fpn_4gpu_2x.py',
# 'foveabox/fovea_align_gn_r50_fpn_4gpu_2x.py',
# 'foveabox/fovea_align_gn_r101_fpn_4gpu_2x.py',
'foveabox/fovea_align_gn_ms_r50_fpn_4gpu_2x.py',
# ---
# 'hrnet/cascade_rcnn_hrnetv2p_w32_20e.py',
# 'hrnet/mask_rcnn_hrnetv2p_w32_1x.py',
# 'hrnet/cascade_mask_rcnn_hrnetv2p_w32_20e.py',
# 'hrnet/htc_hrnetv2p_w32_20e.py',
# 'hrnet/faster_rcnn_hrnetv2p_w18_1x.py',
# 'hrnet/mask_rcnn_hrnetv2p_w18_1x.py',
# 'hrnet/faster_rcnn_hrnetv2p_w32_1x.py',
# 'hrnet/faster_rcnn_hrnetv2p_w40_1x.py',
'hrnet/fcos_hrnetv2p_w32_gn_1x_4gpu.py',
# ---
# 'gn+ws/faster_rcnn_r50_fpn_gn_ws_1x.py',
# 'gn+ws/mask_rcnn_x101_32x4d_fpn_gn_ws_2x.py',
'gn+ws/mask_rcnn_r50_fpn_gn_ws_2x.py',
# 'gn+ws/mask_rcnn_r50_fpn_gn_ws_20_23_24e.py',
# ---
# 'wider_face/ssd300_wider_face.py',
# ---
'pascal_voc/ssd300_voc.py',
'pascal_voc/faster_rcnn_r50_fpn_1x_voc0712.py',
'pascal_voc/ssd512_voc.py',
# ---
# 'gcnet/mask_rcnn_r4_gcb_c3-c5_r50_fpn_syncbn_1x.py',
# 'gcnet/mask_rcnn_r16_gcb_c3-c5_r50_fpn_syncbn_1x.py',
# 'gcnet/mask_rcnn_r4_gcb_c3-c5_r50_fpn_1x.py',
# 'gcnet/mask_rcnn_r16_gcb_c3-c5_r50_fpn_1x.py',
'gcnet/mask_rcnn_r50_fpn_sbn_1x.py',
# ---
'gn/mask_rcnn_r50_fpn_gn_contrib_2x.py',
# 'gn/mask_rcnn_r50_fpn_gn_2x.py',
# 'gn/mask_rcnn_r101_fpn_gn_2x.py',
# ---
# 'reppoints/reppoints_moment_x101_dcn_fpn_2x.py',
'reppoints/reppoints_moment_r50_fpn_2x.py',
# 'reppoints/reppoints_moment_x101_dcn_fpn_2x_mt.py',
'reppoints/reppoints_partial_minmax_r50_fpn_1x.py',
'reppoints/bbox_r50_grid_center_fpn_1x.py',
# 'reppoints/reppoints_moment_r101_dcn_fpn_2x.py',
# 'reppoints/reppoints_moment_r101_fpn_2x_mt.py',
# 'reppoints/reppoints_moment_r50_fpn_2x_mt.py',
'reppoints/reppoints_minmax_r50_fpn_1x.py',
# 'reppoints/reppoints_moment_r50_fpn_1x.py',
# 'reppoints/reppoints_moment_r101_fpn_2x.py',
# 'reppoints/reppoints_moment_r101_dcn_fpn_2x_mt.py',
'reppoints/bbox_r50_grid_fpn_1x.py',
# ---
# 'fcos/fcos_mstrain_640_800_x101_64x4d_fpn_gn_2x.py',
# 'fcos/fcos_mstrain_640_800_r101_caffe_fpn_gn_2x_4gpu.py',
'fcos/fcos_r50_caffe_fpn_gn_1x_4gpu.py',
# ---
'albu_example/mask_rcnn_r50_fpn_1x.py',
# ---
'libra_rcnn/libra_faster_rcnn_r50_fpn_1x.py',
# 'libra_rcnn/libra_retinanet_r50_fpn_1x.py',
# 'libra_rcnn/libra_faster_rcnn_r101_fpn_1x.py',
# 'libra_rcnn/libra_faster_rcnn_x101_64x4d_fpn_1x.py',
# 'libra_rcnn/libra_fast_rcnn_r50_fpn_1x.py',
# ---
# 'ghm/retinanet_ghm_r50_fpn_1x.py',
# ---
# 'fp16/retinanet_r50_fpn_fp16_1x.py',
'fp16/mask_rcnn_r50_fpn_fp16_1x.py',
'fp16/faster_rcnn_r50_fpn_fp16_1x.py'
]
print('Using {} config files'.format(len(config_names)))
for config_fname in config_names:
config_fpath = join(config_dpath, config_fname)
config_mod = import_module_from_path(config_fpath)
config_mod.model
config_mod.train_cfg
config_mod.test_cfg
print('Building detector, config_fpath = {!r}'.format(config_fpath))
# Remove pretrained keys to allow for testing in an offline environment
if 'pretrained' in config_mod.model:
config_mod.model['pretrained'] = None
detector = build_detector(
config_mod.model,
train_cfg=config_mod.train_cfg,
test_cfg=config_mod.test_cfg)
assert detector is not None
"""
pytest tests/test_forward.py
"""
import copy
from os.path import dirname, exists, join
import numpy as np
import torch
def _get_config_directory():
""" Find the predefined detector config directory """
try:
# Assume we are running in the source mmdetection repo
repo_dpath = dirname(dirname(__file__))
except NameError:
# For IPython development when this __file__ is not defined
import mmdet
repo_dpath = dirname(dirname(mmdet.__file__))
config_dpath = join(repo_dpath, 'configs')
if not exists(config_dpath):
raise Exception('Cannot find config path')
return config_dpath
def _get_config_module(fname):
"""
Load a configuration as a python module
"""
from xdoctest.utils import import_module_from_path
config_dpath = _get_config_directory()
config_fpath = join(config_dpath, fname)
config_mod = import_module_from_path(config_fpath)
return config_mod
def _get_detector_cfg(fname):
"""
Grab configs necessary to create a detector. These are deep copied to allow
for safe modification of parameters without influencing other tests.
"""
import mmcv
config = _get_config_module(fname)
model = copy.deepcopy(config.model)
train_cfg = mmcv.Config(copy.deepcopy(config.train_cfg))
test_cfg = mmcv.Config(copy.deepcopy(config.test_cfg))
return model, train_cfg, test_cfg
def test_ssd300_forward():
model, train_cfg, test_cfg = _get_detector_cfg('ssd300_coco.py')
model['pretrained'] = None
from mmdet.models import build_detector
detector = build_detector(model, train_cfg=train_cfg, test_cfg=test_cfg)
input_shape = (1, 3, 300, 300)
mm_inputs = _demo_mm_inputs(input_shape)
imgs = mm_inputs.pop('imgs')
img_metas = mm_inputs.pop('img_metas')
# Test forward train
gt_bboxes = mm_inputs['gt_bboxes']
gt_labels = mm_inputs['gt_labels']
losses = detector.forward(
imgs,
img_metas,
gt_bboxes=gt_bboxes,
gt_labels=gt_labels,
return_loss=True)
assert isinstance(losses, dict)
# Test forward test
with torch.no_grad():
img_list = [g[None, :] for g in imgs]
batch_results = []
for one_img, one_meta in zip(img_list, img_metas):
result = detector.forward([one_img], [[one_meta]],
return_loss=False)
batch_results.append(result)
def test_rpn_forward():
model, train_cfg, test_cfg = _get_detector_cfg('rpn_r50_fpn_1x.py')
model['pretrained'] = None
from mmdet.models import build_detector
detector = build_detector(model, train_cfg=train_cfg, test_cfg=test_cfg)
input_shape = (1, 3, 224, 224)
mm_inputs = _demo_mm_inputs(input_shape)
imgs = mm_inputs.pop('imgs')
img_metas = mm_inputs.pop('img_metas')
# Test forward train
gt_bboxes = mm_inputs['gt_bboxes']
losses = detector.forward(
imgs, img_metas, gt_bboxes=gt_bboxes, return_loss=True)
assert isinstance(losses, dict)
# Test forward test
with torch.no_grad():
img_list = [g[None, :] for g in imgs]
batch_results = []
for one_img, one_meta in zip(img_list, img_metas):
result = detector.forward([one_img], [[one_meta]],
return_loss=False)
batch_results.append(result)
def test_retina_ghm_forward():
model, train_cfg, test_cfg = _get_detector_cfg(
'ghm/retinanet_ghm_r50_fpn_1x.py')
model['pretrained'] = None
from mmdet.models import build_detector
detector = build_detector(model, train_cfg=train_cfg, test_cfg=test_cfg)
input_shape = (3, 3, 224, 224)
mm_inputs = _demo_mm_inputs(input_shape)
imgs = mm_inputs.pop('imgs')
img_metas = mm_inputs.pop('img_metas')
# Test forward train
gt_bboxes = mm_inputs['gt_bboxes']
gt_labels = mm_inputs['gt_labels']
losses = detector.forward(
imgs,
img_metas,
gt_bboxes=gt_bboxes,
gt_labels=gt_labels,
return_loss=True)
assert isinstance(losses, dict)
# Test forward test
with torch.no_grad():
img_list = [g[None, :] for g in imgs]
batch_results = []
for one_img, one_meta in zip(img_list, img_metas):
result = detector.forward([one_img], [[one_meta]],
return_loss=False)
batch_results.append(result)
if torch.cuda.is_available():
detector = detector.cuda()
imgs = imgs.cuda()
# Test forward train
gt_bboxes = [b.cuda() for b in mm_inputs['gt_bboxes']]
gt_labels = [g.cuda() for g in mm_inputs['gt_labels']]
losses = detector.forward(
imgs,
img_metas,
gt_bboxes=gt_bboxes,
gt_labels=gt_labels,
return_loss=True)
assert isinstance(losses, dict)
# Test forward test
with torch.no_grad():
img_list = [g[None, :] for g in imgs]
batch_results = []
for one_img, one_meta in zip(img_list, img_metas):
result = detector.forward([one_img], [[one_meta]],
return_loss=False)
batch_results.append(result)
def test_cascade_forward():
try:
from torchvision import _C as C # NOQA
except ImportError:
import pytest
raise pytest.skip('requires torchvision on cpu')
model, train_cfg, test_cfg = _get_detector_cfg(
'cascade_rcnn_r50_fpn_1x.py')
model['pretrained'] = None
# torchvision roi align supports CPU
model['bbox_roi_extractor']['roi_layer']['use_torchvision'] = True
from mmdet.models import build_detector
detector = build_detector(model, train_cfg=train_cfg, test_cfg=test_cfg)
input_shape = (1, 3, 256, 256)
# Test forward train with a non-empty truth batch
mm_inputs = _demo_mm_inputs(input_shape, num_items=[10])
imgs = mm_inputs.pop('imgs')
img_metas = mm_inputs.pop('img_metas')
gt_bboxes = mm_inputs['gt_bboxes']
gt_labels = mm_inputs['gt_labels']
losses = detector.forward(
imgs,
img_metas,
gt_bboxes=gt_bboxes,
gt_labels=gt_labels,
return_loss=True)
assert isinstance(losses, dict)
from mmdet.apis.train import parse_losses
total_loss = float(parse_losses(losses)[0].item())
assert total_loss > 0
# Test forward train with an empty truth batch
mm_inputs = _demo_mm_inputs(input_shape, num_items=[0])
imgs = mm_inputs.pop('imgs')
img_metas = mm_inputs.pop('img_metas')
gt_bboxes = mm_inputs['gt_bboxes']
gt_labels = mm_inputs['gt_labels']
losses = detector.forward(
imgs,
img_metas,
gt_bboxes=gt_bboxes,
gt_labels=gt_labels,
return_loss=True)
assert isinstance(losses, dict)
from mmdet.apis.train import parse_losses
total_loss = float(parse_losses(losses)[0].item())
assert total_loss > 0
def test_faster_rcnn_forward():
try:
from torchvision import _C as C # NOQA
except ImportError:
import pytest
raise pytest.skip('requires torchvision on cpu')
model, train_cfg, test_cfg = _get_detector_cfg('faster_rcnn_r50_fpn_1x.py')
model['pretrained'] = None
# torchvision roi align supports CPU
model['bbox_roi_extractor']['roi_layer']['use_torchvision'] = True
from mmdet.models import build_detector
detector = build_detector(model, train_cfg=train_cfg, test_cfg=test_cfg)
input_shape = (1, 3, 256, 256)
# Test forward train with a non-empty truth batch
mm_inputs = _demo_mm_inputs(input_shape, num_items=[10])
imgs = mm_inputs.pop('imgs')
img_metas = mm_inputs.pop('img_metas')
gt_bboxes = mm_inputs['gt_bboxes']
gt_labels = mm_inputs['gt_labels']
losses = detector.forward(
imgs,
img_metas,
gt_bboxes=gt_bboxes,
gt_labels=gt_labels,
return_loss=True)
assert isinstance(losses, dict)
from mmdet.apis.train import parse_losses
total_loss = float(parse_losses(losses)[0].item())
assert total_loss > 0
# Test forward train with an empty truth batch
mm_inputs = _demo_mm_inputs(input_shape, num_items=[0])
imgs = mm_inputs.pop('imgs')
img_metas = mm_inputs.pop('img_metas')
gt_bboxes = mm_inputs['gt_bboxes']
gt_labels = mm_inputs['gt_labels']
losses = detector.forward(
imgs,
img_metas,
gt_bboxes=gt_bboxes,
gt_labels=gt_labels,
return_loss=True)
assert isinstance(losses, dict)
from mmdet.apis.train import parse_losses
total_loss = float(parse_losses(losses)[0].item())
assert total_loss > 0
def test_faster_rcnn_ohem_forward():
try:
from torchvision import _C as C # NOQA
except ImportError:
import pytest
raise pytest.skip('requires torchvision on cpu')
model, train_cfg, test_cfg = _get_detector_cfg(
'faster_rcnn_ohem_r50_fpn_1x.py')
model['pretrained'] = None
# torchvision roi align supports CPU
model['bbox_roi_extractor']['roi_layer']['use_torchvision'] = True
from mmdet.models import build_detector
detector = build_detector(model, train_cfg=train_cfg, test_cfg=test_cfg)
input_shape = (1, 3, 256, 256)
# Test forward train with a non-empty truth batch
mm_inputs = _demo_mm_inputs(input_shape, num_items=[10])
imgs = mm_inputs.pop('imgs')
img_metas = mm_inputs.pop('img_metas')
gt_bboxes = mm_inputs['gt_bboxes']
gt_labels = mm_inputs['gt_labels']
losses = detector.forward(
imgs,
img_metas,
gt_bboxes=gt_bboxes,
gt_labels=gt_labels,
return_loss=True)
assert isinstance(losses, dict)
from mmdet.apis.train import parse_losses
total_loss = float(parse_losses(losses)[0].item())
assert total_loss > 0
# Test forward train with an empty truth batch
mm_inputs = _demo_mm_inputs(input_shape, num_items=[0])
imgs = mm_inputs.pop('imgs')
img_metas = mm_inputs.pop('img_metas')
gt_bboxes = mm_inputs['gt_bboxes']
gt_labels = mm_inputs['gt_labels']
losses = detector.forward(
imgs,
img_metas,
gt_bboxes=gt_bboxes,
gt_labels=gt_labels,
return_loss=True)
assert isinstance(losses, dict)
from mmdet.apis.train import parse_losses
total_loss = float(parse_losses(losses)[0].item())
assert total_loss > 0
def _demo_mm_inputs(input_shape=(1, 3, 300, 300),
num_items=None, num_classes=10): # yapf: disable
"""
Create a superset of inputs needed to run test or train batches.
Args:
input_shape (tuple):
input batch dimensions
num_items (None | List[int]):
specifies the number of boxes in each batch item
num_classes (int):
number of different labels a box might have
"""
(N, C, H, W) = input_shape
rng = np.random.RandomState(0)
imgs = rng.rand(*input_shape)
img_metas = [{
'img_shape': (H, W, C),
'ori_shape': (H, W, C),
'pad_shape': (H, W, C),
'filename': '<demo>.png',
'scale_factor': 1.0,
'flip': False,
} for _ in range(N)]
gt_bboxes = []
gt_labels = []
for batch_idx in range(N):
if num_items is None:
num_boxes = rng.randint(1, 10)
else:
num_boxes = num_items[batch_idx]
cx, cy, bw, bh = rng.rand(num_boxes, 4).T
tl_x = ((cx * W) - (W * bw / 2)).clip(0, W)
tl_y = ((cy * H) - (H * bh / 2)).clip(0, H)
br_x = ((cx * W) + (W * bw / 2)).clip(0, W)
br_y = ((cy * H) + (H * bh / 2)).clip(0, H)
boxes = np.vstack([tl_x, tl_y, br_x, br_y]).T
class_idxs = rng.randint(1, num_classes, size=num_boxes)
gt_bboxes.append(torch.FloatTensor(boxes))
gt_labels.append(torch.LongTensor(class_idxs))
mm_inputs = {
'imgs': torch.FloatTensor(imgs),
'img_metas': img_metas,
'gt_bboxes': gt_bboxes,
'gt_labels': gt_labels,
'gt_bboxes_ignore': None,
}
return mm_inputs
import mmcv
import torch
from mmdet.core import build_assigner, build_sampler
from mmdet.models.anchor_heads import AnchorHead
from mmdet.models.bbox_heads import BBoxHead
def test_anchor_head_loss():
"""
Tests anchor head loss when truth is empty and non-empty
"""
self = AnchorHead(num_classes=4, in_channels=1)
s = 256
img_metas = [{
'img_shape': (s, s, 3),
'scale_factor': 1,
'pad_shape': (s, s, 3)
}]
cfg = mmcv.Config({
'assigner': {
'type': 'MaxIoUAssigner',
'pos_iou_thr': 0.7,
'neg_iou_thr': 0.3,
'min_pos_iou': 0.3,
'ignore_iof_thr': -1
},
'sampler': {
'type': 'RandomSampler',
'num': 256,
'pos_fraction': 0.5,
'neg_pos_ub': -1,
'add_gt_as_proposals': False
},
'allowed_border': 0,
'pos_weight': -1,
'debug': False
})
# Anchor head expects a multiple levels of features per image
feat = [
torch.rand(1, 1, s // (2**(i + 2)), s // (2**(i + 2)))
for i in range(len(self.anchor_generators))
]
cls_scores, bbox_preds = self.forward(feat)
# Test that empty ground truth encourages the network to predict background
gt_bboxes = [torch.empty((0, 4))]
gt_labels = [torch.LongTensor([])]
gt_bboxes_ignore = None
empty_gt_losses = self.loss(cls_scores, bbox_preds, gt_bboxes, gt_labels,
img_metas, cfg, gt_bboxes_ignore)
# When there is no truth, the cls loss should be nonzero but there should
# be no box loss.
empty_cls_loss = sum(empty_gt_losses['loss_cls'])
empty_box_loss = sum(empty_gt_losses['loss_bbox'])
assert empty_cls_loss.item() > 0, 'cls loss should be non-zero'
assert empty_box_loss.item() == 0, (
'there should be no box loss when there are no true boxes')
# When truth is non-empty then both cls and box loss should be nonzero for
# random inputs
gt_bboxes = [
torch.Tensor([[23.6667, 23.8757, 238.6326, 151.8874]]),
]
gt_labels = [torch.LongTensor([2])]
one_gt_losses = self.loss(cls_scores, bbox_preds, gt_bboxes, gt_labels,
img_metas, cfg, gt_bboxes_ignore)
onegt_cls_loss = sum(one_gt_losses['loss_cls'])
onegt_box_loss = sum(one_gt_losses['loss_bbox'])
assert onegt_cls_loss.item() > 0, 'cls loss should be non-zero'
assert onegt_box_loss.item() > 0, 'box loss should be non-zero'
def test_bbox_head_loss():
"""
Tests bbox head loss when truth is empty and non-empty
"""
self = BBoxHead(in_channels=8, roi_feat_size=3)
num_imgs = 1
feat = torch.rand(1, 1, 3, 3)
# Dummy proposals
proposal_list = [
torch.Tensor([[23.6667, 23.8757, 228.6326, 153.8874]]),
]
target_cfg = mmcv.Config({'pos_weight': 1})
def _dummy_bbox_sampling(proposal_list, gt_bboxes, gt_labels):
"""
Create sample results that can be passed to BBoxHead.get_target
"""
assign_config = {
'type': 'MaxIoUAssigner',
'pos_iou_thr': 0.5,
'neg_iou_thr': 0.5,
'min_pos_iou': 0.5,
'ignore_iof_thr': -1
}
sampler_config = {
'type': 'RandomSampler',
'num': 512,
'pos_fraction': 0.25,
'neg_pos_ub': -1,
'add_gt_as_proposals': True
}
bbox_assigner = build_assigner(assign_config)
bbox_sampler = build_sampler(sampler_config)
gt_bboxes_ignore = [None for _ in range(num_imgs)]
sampling_results = []
for i in range(num_imgs):
assign_result = bbox_assigner.assign(proposal_list[i],
gt_bboxes[i],
gt_bboxes_ignore[i],
gt_labels[i])
sampling_result = bbox_sampler.sample(
assign_result,
proposal_list[i],
gt_bboxes[i],
gt_labels[i],
feats=feat)
sampling_results.append(sampling_result)
return sampling_results
# Test bbox loss when truth is empty
gt_bboxes = [torch.empty((0, 4))]
gt_labels = [torch.LongTensor([])]
sampling_results = _dummy_bbox_sampling(proposal_list, gt_bboxes,
gt_labels)
bbox_targets = self.get_target(sampling_results, gt_bboxes, gt_labels,
target_cfg)
labels, label_weights, bbox_targets, bbox_weights = bbox_targets
# Create dummy features "extracted" for each sampled bbox
num_sampled = sum(len(res.bboxes) for res in sampling_results)
dummy_feats = torch.rand(num_sampled, 8 * 3 * 3)
cls_scores, bbox_preds = self.forward(dummy_feats)
losses = self.loss(cls_scores, bbox_preds, labels, label_weights,
bbox_targets, bbox_weights)
assert losses.get('loss_cls', 0) > 0, 'cls-loss should be non-zero'
assert losses.get('loss_bbox', 0) == 0, 'empty gt loss should be zero'
# Test bbox loss when truth is non-empty
gt_bboxes = [
torch.Tensor([[23.6667, 23.8757, 238.6326, 151.8874]]),
]
gt_labels = [torch.LongTensor([2])]
sampling_results = _dummy_bbox_sampling(proposal_list, gt_bboxes,
gt_labels)
bbox_targets = self.get_target(sampling_results, gt_bboxes, gt_labels,
target_cfg)
labels, label_weights, bbox_targets, bbox_weights = bbox_targets
# Create dummy features "extracted" for each sampled bbox
num_sampled = sum(len(res.bboxes) for res in sampling_results)
dummy_feats = torch.rand(num_sampled, 8 * 3 * 3)
cls_scores, bbox_preds = self.forward(dummy_feats)
losses = self.loss(cls_scores, bbox_preds, labels, label_weights,
bbox_targets, bbox_weights)
assert losses.get('loss_cls', 0) > 0, 'cls-loss should be non-zero'
assert losses.get('loss_bbox', 0) > 0, 'box-loss should be non-zero'
def test_refine_boxes():
"""
Mirrors the doctest in
``mmdet.models.bbox_heads.bbox_head.BBoxHead.refine_boxes`` but checks for
multiple values of n_roi / n_img.
"""
self = BBoxHead(reg_class_agnostic=True)
test_settings = [
# Corner case: less rois than images
{
'n_roi': 2,
'n_img': 4,
'rng': 34285940
},
# Corner case: no images
{
'n_roi': 0,
'n_img': 0,
'rng': 52925222
},
# Corner cases: few images / rois
{
'n_roi': 1,
'n_img': 1,
'rng': 1200281
},
{
'n_roi': 2,
'n_img': 1,
'rng': 1200282
},
{
'n_roi': 2,
'n_img': 2,
'rng': 1200283
},
{
'n_roi': 1,
'n_img': 2,
'rng': 1200284
},
# Corner case: no rois few images
{
'n_roi': 0,
'n_img': 1,
'rng': 23955860
},
{
'n_roi': 0,
'n_img': 2,
'rng': 25830516
},
# Corner case: no rois many images
{
'n_roi': 0,
'n_img': 10,
'rng': 671346
},
{
'n_roi': 0,
'n_img': 20,
'rng': 699807
},
# Corner case: similar num rois and images
{
'n_roi': 20,
'n_img': 20,
'rng': 1200238
},
{
'n_roi': 10,
'n_img': 20,
'rng': 1200238
},
{
'n_roi': 5,
'n_img': 5,
'rng': 1200238
},
# ----------------------------------
# Common case: more rois than images
{
'n_roi': 100,
'n_img': 1,
'rng': 337156
},
{
'n_roi': 150,
'n_img': 2,
'rng': 275898
},
{
'n_roi': 500,
'n_img': 5,
'rng': 4903221
},
]
for demokw in test_settings:
try:
n_roi = demokw['n_roi']
n_img = demokw['n_img']
rng = demokw['rng']
print('Test refine_boxes case: {!r}'.format(demokw))
tup = _demodata_refine_boxes(n_roi, n_img, rng=rng)
rois, labels, bbox_preds, pos_is_gts, img_metas = tup
bboxes_list = self.refine_bboxes(rois, labels, bbox_preds,
pos_is_gts, img_metas)
assert len(bboxes_list) == n_img
assert sum(map(len, bboxes_list)) <= n_roi
assert all(b.shape[1] == 4 for b in bboxes_list)
except Exception:
print('Test failed with demokw={!r}'.format(demokw))
raise
def _demodata_refine_boxes(n_roi, n_img, rng=0):
"""
Create random test data for the
``mmdet.models.bbox_heads.bbox_head.BBoxHead.refine_boxes`` method
"""
import numpy as np
from mmdet.core.bbox.demodata import random_boxes
from mmdet.core.bbox.demodata import ensure_rng
try:
import kwarray
except ImportError:
import pytest
pytest.skip('kwarray is required for this test')
scale = 512
rng = ensure_rng(rng)
img_metas = [{'img_shape': (scale, scale)} for _ in range(n_img)]
# Create rois in the expected format
roi_boxes = random_boxes(n_roi, scale=scale, rng=rng)
if n_img == 0:
assert n_roi == 0, 'cannot have any rois if there are no images'
img_ids = torch.empty((0, ), dtype=torch.long)
roi_boxes = torch.empty((0, 4), dtype=torch.float32)
else:
img_ids = rng.randint(0, n_img, (n_roi, ))
img_ids = torch.from_numpy(img_ids)
rois = torch.cat([img_ids[:, None].float(), roi_boxes], dim=1)
# Create other args
labels = rng.randint(0, 2, (n_roi, ))
labels = torch.from_numpy(labels).long()
bbox_preds = random_boxes(n_roi, scale=scale, rng=rng)
# For each image, pretend random positive boxes are gts
is_label_pos = (labels.numpy() > 0).astype(np.int)
lbl_per_img = kwarray.group_items(is_label_pos, img_ids.numpy())
pos_per_img = [sum(lbl_per_img.get(gid, [])) for gid in range(n_img)]
# randomly generate with numpy then sort with torch
_pos_is_gts = [
rng.randint(0, 2, (npos, )).astype(np.uint8) for npos in pos_per_img
]
pos_is_gts = [
torch.from_numpy(p).sort(descending=True)[0] for p in _pos_is_gts
]
return rois, labels, bbox_preds, pos_is_gts, img_metas
"""
CommandLine:
pytest tests/test_nms.py
"""
import numpy as np
import torch
from mmdet.ops.nms.nms_wrapper import nms
def test_nms_device_and_dtypes_cpu():
"""
CommandLine:
xdoctest -m tests/test_nms.py test_nms_device_and_dtypes_cpu
"""
iou_thr = 0.7
base_dets = np.array([[49.1, 32.4, 51.0, 35.9, 0.9],
[49.3, 32.9, 51.0, 35.3, 0.9],
[35.3, 11.5, 39.9, 14.5, 0.4],
[35.2, 11.7, 39.7, 15.7, 0.3]])
# CPU can handle float32 and float64
dets = base_dets.astype(np.float32)
supressed, inds = nms(dets, iou_thr)
assert dets.dtype == supressed.dtype
assert len(inds) == len(supressed) == 3
dets = torch.FloatTensor(base_dets)
surpressed, inds = nms(dets, iou_thr)
assert dets.dtype == surpressed.dtype
assert len(inds) == len(surpressed) == 3
dets = base_dets.astype(np.float64)
supressed, inds = nms(dets, iou_thr)
assert dets.dtype == supressed.dtype
assert len(inds) == len(supressed) == 3
dets = torch.DoubleTensor(base_dets)
surpressed, inds = nms(dets, iou_thr)
assert dets.dtype == surpressed.dtype
assert len(inds) == len(surpressed) == 3
def test_nms_device_and_dtypes_gpu():
"""
CommandLine:
xdoctest -m tests/test_nms.py test_nms_device_and_dtypes_gpu
"""
if not torch.cuda.is_available():
import pytest
pytest.skip('test requires GPU and torch+cuda')
iou_thr = 0.7
base_dets = np.array([[49.1, 32.4, 51.0, 35.9, 0.9],
[49.3, 32.9, 51.0, 35.3, 0.9],
[35.3, 11.5, 39.9, 14.5, 0.4],
[35.2, 11.7, 39.7, 15.7, 0.3]])
for device_id in range(torch.cuda.device_count()):
print('Run NMS on device_id = {!r}'.format(device_id))
# GPU can handle float32 but not float64
dets = base_dets.astype(np.float32)
supressed, inds = nms(dets, iou_thr, device_id)
assert dets.dtype == supressed.dtype
assert len(inds) == len(supressed) == 3
dets = torch.FloatTensor(base_dets).to(device_id)
surpressed, inds = nms(dets, iou_thr)
assert dets.dtype == surpressed.dtype
assert len(inds) == len(surpressed) == 3
import torch
from mmdet.core import MaxIoUAssigner
from mmdet.core.bbox.samplers import OHEMSampler, RandomSampler
def test_random_sampler():
assigner = MaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
ignore_iof_thr=0.5,
ignore_wrt_candidates=False,
)
bboxes = torch.FloatTensor([
[0, 0, 10, 10],
[10, 10, 20, 20],
[5, 5, 15, 15],
[32, 32, 38, 42],
])
gt_bboxes = torch.FloatTensor([
[0, 0, 10, 9],
[0, 10, 10, 19],
])
gt_labels = torch.LongTensor([1, 2])
gt_bboxes_ignore = torch.Tensor([
[30, 30, 40, 40],
])
assign_result = assigner.assign(
bboxes,
gt_bboxes,
gt_bboxes_ignore=gt_bboxes_ignore,
gt_labels=gt_labels)
sampler = RandomSampler(
num=10, pos_fraction=0.5, neg_pos_ub=-1, add_gt_as_proposals=True)
sample_result = sampler.sample(assign_result, bboxes, gt_bboxes, gt_labels)
assert len(sample_result.pos_bboxes) == len(sample_result.pos_inds)
assert len(sample_result.neg_bboxes) == len(sample_result.neg_inds)
def test_random_sampler_empty_gt():
assigner = MaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
ignore_iof_thr=0.5,
ignore_wrt_candidates=False,
)
bboxes = torch.FloatTensor([
[0, 0, 10, 10],
[10, 10, 20, 20],
[5, 5, 15, 15],
[32, 32, 38, 42],
])
gt_bboxes = torch.empty(0, 4)
gt_labels = torch.empty(0, ).long()
assign_result = assigner.assign(bboxes, gt_bboxes, gt_labels=gt_labels)
sampler = RandomSampler(
num=10, pos_fraction=0.5, neg_pos_ub=-1, add_gt_as_proposals=True)
sample_result = sampler.sample(assign_result, bboxes, gt_bboxes, gt_labels)
assert len(sample_result.pos_bboxes) == len(sample_result.pos_inds)
assert len(sample_result.neg_bboxes) == len(sample_result.neg_inds)
def test_random_sampler_empty_pred():
assigner = MaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
ignore_iof_thr=0.5,
ignore_wrt_candidates=False,
)
bboxes = torch.empty(0, 4)
gt_bboxes = torch.FloatTensor([
[0, 0, 10, 9],
[0, 10, 10, 19],
])
gt_labels = torch.LongTensor([1, 2])
assign_result = assigner.assign(bboxes, gt_bboxes, gt_labels=gt_labels)
sampler = RandomSampler(
num=10, pos_fraction=0.5, neg_pos_ub=-1, add_gt_as_proposals=True)
sample_result = sampler.sample(assign_result, bboxes, gt_bboxes, gt_labels)
assert len(sample_result.pos_bboxes) == len(sample_result.pos_inds)
assert len(sample_result.neg_bboxes) == len(sample_result.neg_inds)
def _context_for_ohem():
try:
from test_forward import _get_detector_cfg
except ImportError:
# Hack: grab testing utils from test_forward to make a context for ohem
import sys
from os.path import dirname
sys.path.insert(0, dirname(__file__))
from test_forward import _get_detector_cfg
model, train_cfg, test_cfg = _get_detector_cfg(
'faster_rcnn_ohem_r50_fpn_1x.py')
model['pretrained'] = None
# torchvision roi align supports CPU
model['bbox_roi_extractor']['roi_layer']['use_torchvision'] = True
from mmdet.models import build_detector
context = build_detector(model, train_cfg=train_cfg, test_cfg=test_cfg)
return context
def test_ohem_sampler():
assigner = MaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
ignore_iof_thr=0.5,
ignore_wrt_candidates=False,
)
bboxes = torch.FloatTensor([
[0, 0, 10, 10],
[10, 10, 20, 20],
[5, 5, 15, 15],
[32, 32, 38, 42],
])
gt_bboxes = torch.FloatTensor([
[0, 0, 10, 9],
[0, 10, 10, 19],
])
gt_labels = torch.LongTensor([1, 2])
gt_bboxes_ignore = torch.Tensor([
[30, 30, 40, 40],
])
assign_result = assigner.assign(
bboxes,
gt_bboxes,
gt_bboxes_ignore=gt_bboxes_ignore,
gt_labels=gt_labels)
context = _context_for_ohem()
sampler = OHEMSampler(
num=10,
pos_fraction=0.5,
context=context,
neg_pos_ub=-1,
add_gt_as_proposals=True)
feats = [torch.rand(1, 256, int(2**i), int(2**i)) for i in [6, 5, 4, 3, 2]]
sample_result = sampler.sample(
assign_result, bboxes, gt_bboxes, gt_labels, feats=feats)
assert len(sample_result.pos_bboxes) == len(sample_result.pos_inds)
assert len(sample_result.neg_bboxes) == len(sample_result.neg_inds)
def test_ohem_sampler_empty_gt():
assigner = MaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
ignore_iof_thr=0.5,
ignore_wrt_candidates=False,
)
bboxes = torch.FloatTensor([
[0, 0, 10, 10],
[10, 10, 20, 20],
[5, 5, 15, 15],
[32, 32, 38, 42],
])
gt_bboxes = torch.empty(0, 4)
gt_labels = torch.LongTensor([])
gt_bboxes_ignore = torch.Tensor([])
assign_result = assigner.assign(
bboxes,
gt_bboxes,
gt_bboxes_ignore=gt_bboxes_ignore,
gt_labels=gt_labels)
context = _context_for_ohem()
sampler = OHEMSampler(
num=10,
pos_fraction=0.5,
context=context,
neg_pos_ub=-1,
add_gt_as_proposals=True)
feats = [torch.rand(1, 256, int(2**i), int(2**i)) for i in [6, 5, 4, 3, 2]]
sample_result = sampler.sample(
assign_result, bboxes, gt_bboxes, gt_labels, feats=feats)
assert len(sample_result.pos_bboxes) == len(sample_result.pos_inds)
assert len(sample_result.neg_bboxes) == len(sample_result.neg_inds)
def test_ohem_sampler_empty_pred():
assigner = MaxIoUAssigner(
pos_iou_thr=0.5,
neg_iou_thr=0.5,
ignore_iof_thr=0.5,
ignore_wrt_candidates=False,
)
bboxes = torch.empty(0, 4)
gt_bboxes = torch.FloatTensor([
[0, 0, 10, 10],
[10, 10, 20, 20],
[5, 5, 15, 15],
[32, 32, 38, 42],
])
gt_labels = torch.LongTensor([1, 2, 2, 3])
gt_bboxes_ignore = torch.Tensor([])
assign_result = assigner.assign(
bboxes,
gt_bboxes,
gt_bboxes_ignore=gt_bboxes_ignore,
gt_labels=gt_labels)
context = _context_for_ohem()
sampler = OHEMSampler(
num=10,
pos_fraction=0.5,
context=context,
neg_pos_ub=-1,
add_gt_as_proposals=True)
feats = [torch.rand(1, 256, int(2**i), int(2**i)) for i in [6, 5, 4, 3, 2]]
sample_result = sampler.sample(
assign_result, bboxes, gt_bboxes, gt_labels, feats=feats)
assert len(sample_result.pos_bboxes) == len(sample_result.pos_inds)
assert len(sample_result.neg_bboxes) == len(sample_result.neg_inds)
def test_random_sample_result():
from mmdet.core.bbox.samplers.sampling_result import SamplingResult
SamplingResult.random(num_gts=0, num_preds=0)
SamplingResult.random(num_gts=0, num_preds=3)
SamplingResult.random(num_gts=3, num_preds=3)
SamplingResult.random(num_gts=0, num_preds=3)
SamplingResult.random(num_gts=7, num_preds=7)
SamplingResult.random(num_gts=7, num_preds=64)
SamplingResult.random(num_gts=24, num_preds=3)
for i in range(3):
SamplingResult.random(rng=i)
import numpy.testing as npt
from mmdet.utils.flops_counter import params_to_string
def test_params_to_string():
npt.assert_equal(params_to_string(1e9), '1000.0 M')
npt.assert_equal(params_to_string(2e5), '200.0 k')
npt.assert_equal(params_to_string(3e-9), '3e-09')
......@@ -32,28 +32,27 @@ def cal_train_time(log_dicts, args):
def plot_curve(log_dicts, args):
if args.backend is not None:
plt.switch_backend(args.backend)
sns.set_style(args.style)
if args['backend'] is not None:
plt.switch_backend(args['backend'])
# if legend is None, use {filename}_{key} as legend
legend = args.legend
legend = args['legend']
if legend is None:
legend = []
for json_log in args.json_logs:
for metric in args.keys:
for json_log in args['json_logs']:
for metric in args['keys']:
legend.append('{}_{}'.format(json_log, metric))
assert len(legend) == (len(args.json_logs) * len(args.keys))
metrics = args.keys
assert len(legend) == (len(args['json_logs']) * len(args['keys']))
metrics = args['keys']
num_metrics = len(metrics)
for i, log_dict in enumerate(log_dicts):
epochs = list(log_dict.keys())
for j, metric in enumerate(metrics):
print('plot curve of {}, metric is {}'.format(
args.json_logs[i], metric))
assert metric in log_dict[epochs[
0]], '{} does not contain metric {}'.format(
args.json_logs[i], metric)
args['json_logs'][i], metric))
if metric not in log_dict[epochs[0]]:
raise KeyError('{} does not contain metric {}'.format(
args['json_logs'][i], metric))
if 'mAP' in metric:
xs = np.arange(1, max(epochs) + 1)
......@@ -81,13 +80,13 @@ def plot_curve(log_dicts, args):
plt.plot(
xs, ys, label=legend[i * num_metrics + j], linewidth=0.5)
plt.legend()
if args.title is not None:
plt.title(args.title)
if args.out is None:
if args['title'] is not None:
plt.title(args['title'])
if args['out'] is None:
plt.show()
else:
print('save curve to: {}'.format(args.out))
plt.savefig(args.out)
print('save curve to: {}'.format(args['out']))
plt.savefig(args['out'])
plt.cla()
......