Skip to content
Snippets Groups Projects
Forked from nikhil_rayaprolu / food-round2
723 commits behind the upstream repository.
dist_utils.py 2.79 KiB
import os
from collections import OrderedDict

import torch
import torch.multiprocessing as mp
import torch.distributed as dist
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from torch.nn.utils import clip_grad
from mmcv.torchpack import Hook, OptimizerHook

__all__ = [
    'init_dist', 'reduce_grads', 'DistOptimizerHook', 'DistSamplerSeedHook'
]


def init_dist(launcher, backend='nccl', **kwargs):
    if mp.get_start_method(allow_none=True) is None:
        mp.set_start_method('spawn')
    if launcher == 'pytorch':
        _init_dist_pytorch(backend, **kwargs)
    elif launcher == 'mpi':
        _init_dist_mpi(backend, **kwargs)
    elif launcher == 'slurm':
        _init_dist_slurm(backend, **kwargs)
    else:
        raise ValueError('Invalid launcher type: {}'.format(launcher))


def _init_dist_pytorch(backend, **kwargs):
    # TODO: use local_rank instead of rank % num_gpus
    rank = int(os.environ['RANK'])
    num_gpus = torch.cuda.device_count()
    torch.cuda.set_device(rank % num_gpus)
    dist.init_process_group(backend=backend, **kwargs)


def _init_dist_mpi(backend, **kwargs):
    raise NotImplementedError


def _init_dist_slurm(backend, **kwargs):
    raise NotImplementedError


# modified from https://github.com/NVIDIA/apex/blob/master/apex/parallel/distributed.py#L9
def coalesce_all_reduce(tensors):
    buckets = OrderedDict()
    for tensor in tensors:
        tp = tensor.type()
        if tp not in buckets:
            buckets[tp] = []
        buckets[tp].append(tensor)

    for tp in buckets:
        bucket = buckets[tp]
        coalesced = _flatten_dense_tensors(bucket)
        dist.all_reduce(coalesced)
        coalesced /= dist.get_world_size()

        for buf, synced in zip(bucket,
                               _unflatten_dense_tensors(coalesced, bucket)):
            buf.copy_(synced)


def reduce_grads(model, coalesce=True):
    grads = [
        param.grad.data for param in model.parameters()
        if param.requires_grad and param.grad is not None
    ]
    if coalesce:
        coalesce_all_reduce(grads)
    else:
        for tensor in grads:
            dist.all_reduce(tensor)


class DistOptimizerHook(OptimizerHook):

    def __init__(self, grad_clip=None, coalesce=True):
        self.grad_clip = grad_clip
        self.coalesce = coalesce

    def after_train_iter(self, runner):
        runner.optimizer.zero_grad()
        runner.outputs['loss'].backward()
        reduce_grads(runner.model, self.coalesce)
        if self.grad_clip is not None:
            clip_grad.clip_grad_norm_(
                filter(lambda p: p.requires_grad, runner.model.parameters()),
                **self.grad_clip)
        runner.optimizer.step()


class DistSamplerSeedHook(Hook):

    def before_epoch(self, runner):
        runner.data_loader.sampler.set_epoch(runner.epoch)