Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to pretrain in multi gpus? #17

Open
cherrylambo opened this issue Dec 15, 2023 · 7 comments
Open

How to pretrain in multi gpus? #17

cherrylambo opened this issue Dec 15, 2023 · 7 comments

Comments

@cherrylambo
Copy link

Interested in the pretraining process of UniSRec, I followed the instruction of README.md to pretrain in multi gpus. All the codes were downloaded from this github correctly.
image

I run the code with:

CUDA_VISIBLE_DEVICES=0,1,2,3 python ddp_pretrain.py

With long time waiting, however, it came with the error:

RuntimeError: Default process group has not been initialized, please make sure to call init_process_group.

How can I deal with it?

@louhangyu
Copy link

Have you solved it

@hyp1231
Copy link
Member

hyp1231 commented Mar 19, 2024

Sorry for the late reply! I guess it's the version mismatch of PyTorch or something. Could you please share the versions of python, torch, cudatoolkit, and recbole in your environment, which could be really helpful to debug? Thanks! @louhangyu @cherrylambo

@louhangyu
Copy link

louhangyu commented Mar 19, 2024

python==3.9.7
pytorch==1.11.0
cudatoolkit==11.3.1
recbole=1.1.1
机器是A100

@hyp1231
Copy link
Member

hyp1231 commented Mar 19, 2024

python==3.9.7 pytorch==1.11.0 cudatoolkit==11.3.1 recbole=1.1.1 机器是A100

Thanks! I'll try to reproduce the bug and get back to you as soon as I can.

@HeyWeCome
Copy link

Hi, yupeng. I think the RuntimeError: Default process group has not been initialized, please make sure to call init_process_group error is likely caused by the fact that the torch.distributed.init_process_group function is called inside the _build_distribute method, which is called from the __init__ method of the DDPPretrainTrainer class.

Maybe, you can try to move the initialization of the process group outside the class and before creating an instance of the DDPPretrainTrainer class, like:

def pretrain(rank, world_size, dataset, **kwargs):
    # Initialize the process group outside the class
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    ...

    # trainer loading and initialization
    trainer = DDPPretrainTrainer(config, model)

    # model pre-training
    trainer.pretrain(pretrain_data, show_progress=(rank == 0))

    dist.destroy_process_group()

    return config['model'], config['dataset']

By moving the process group initialization outside the class and before creating an instance of the DDPPretrainTrainer class, the updated code ensures that the process group is properly initialized before any distributed operations are performed. I think it may resolve this error.

However, I haven't had this problem either. I hope it will helps.

@cywuuuu
Copy link

cywuuuu commented Nov 11, 2024

i fix this problem by doing following step, hope it will help

  1. trainer.py:
import os
from time import time
from tqdm import tqdm
import torch
from torch.nn.utils.clip_grad import clip_grad_norm_
from recbole.trainer import Trainer
from recbole.utils import set_color, get_gpu_usage


class DDPPretrainTrainer(Trainer):
    def __init__(self, config, model):
        super(DDPPretrainTrainer, self).__init__(config, model)
        self.pretrain_epochs = self.config['pretrain_epochs']
        self.save_step = self.config['save_step']
        self.rank = config['rank']
        self.world_size = config['world_size']
        self.lrank = self._build_distribute(rank=self.rank, world_size=self.world_size)
        self.logger.info(f'Let\'s use {torch.cuda.device_count()} GPUs to train {self.config["model"]} ...')

    def _build_distribute(self, rank, world_size):
        from torch.nn.parallel import DistributedDataParallel
        # credit to @Juyong Jiang
        # 1 set backend
        # torch.distributed.init_process_group(backend='nccl', rank=rank, world_size=world_size)
        # 2 get distributed id
        local_rank = torch.distributed.get_rank()
        torch.cuda.set_device(local_rank)
        device_dis = torch.device("cuda", local_rank)
        # 3, 4 assign model to be distributed

        # self.model.to(device_dis)
        self.model = DistributedDataParallel(self.model, 
                                             device_ids=[local_rank],
                                             output_device=local_rank).module
        return local_rank

    def save_pretrained_model(self, epoch, saved_model_file):
        r"""Store the model parameters information and training information.

        Args:
            epoch (int): the current epoch id
            saved_model_file (str): file name for saved pretrained model

        """
        state = {
            'config': self.config,
            'epoch': epoch,
            'state_dict': self.model.state_dict(),
            'optimizer': self.optimizer.state_dict(),
        }
        torch.save(state, saved_model_file)

    def _trans_dataload(self, interaction):
        from torch.utils.data import DataLoader
        from torch.utils.data.distributed import DistributedSampler

        #using pytorch dataload to re-wrap dataset
        def sub_trans(dataset):
            dis_loader = DataLoader(dataset=dataset,
                                    batch_size=dataset.shape[0],
                                    sampler=DistributedSampler(dataset, shuffle=False))
            for data in dis_loader:
                batch_data = data

            return batch_data
        #change `interaction` datatype to a python `dict` object.  
        #for some methods, you may need transfer more data unit like the following way.  

        data_dict = {}
        for k, v in interaction.interaction.items():
            data_dict[k] = sub_trans(v)
        return data_dict

    def _train_epoch(self, train_data, epoch_idx, loss_func=None, show_progress=False):
        self.model.train()
        loss_func = loss_func or self.model.calculate_loss
        total_loss = None
        iter_data = (
            tqdm(
                train_data,
                total=len(train_data),
                ncols=100,
                desc=set_color(f"Train {epoch_idx:>5}", 'pink'),
            ) if show_progress else train_data
        )
        for batch_idx, interaction in enumerate(iter_data):
            interaction = interaction.to(self.device)
            interaction = self._trans_dataload(interaction)
            self.optimizer.zero_grad()
            losses = loss_func(interaction)
            if isinstance(losses, tuple):
                loss = sum(losses)
                loss_tuple = tuple(per_loss.item() for per_loss in losses)
                total_loss = loss_tuple if total_loss is None else tuple(map(sum, zip(total_loss, loss_tuple)))
            else:
                loss = losses
                total_loss = losses.item() if total_loss is None else total_loss + losses.item()
            self._check_nan(loss)
            loss.backward()
            if self.clip_grad_norm:
                clip_grad_norm_(self.model.parameters(), **self.clip_grad_norm)
            self.optimizer.step()
            if self.gpu_available and show_progress:
                iter_data.set_postfix_str(set_color('GPU RAM: ' + get_gpu_usage(self.device), 'yellow'))
        return total_loss

    def pretrain(self, train_data, verbose=True, show_progress=False):
        for epoch_idx in range(self.start_epoch, self.pretrain_epochs):
            # train
            training_start_time = time()
            train_loss = self._train_epoch(train_data, epoch_idx, show_progress=show_progress)
            self.train_loss_dict[epoch_idx] = sum(train_loss) if isinstance(train_loss, tuple) else train_loss
            training_end_time = time()
            train_loss_output = \
                self._generate_train_loss_output(epoch_idx, training_start_time, training_end_time, train_loss)
            if verbose:
                self.logger.info(train_loss_output)
            self._add_train_loss_to_tensorboard(epoch_idx, train_loss)

            if (epoch_idx + 1) % self.save_step == 0 and self.lrank == 0:
                saved_model_file = os.path.join(
                    self.checkpoint_dir,
                    '{}-{}-{}.pth'.format(self.config['model'], self.config['dataset'], str(epoch_idx + 1))
                )
                self.save_pretrained_model(epoch_idx, saved_model_file)
                update_output = set_color('Saving current', 'blue') + ': %s' % saved_model_file
                if verbose:
                    self.logger.info(update_output)

        return self.best_valid_score, self.best_valid_result
  1. ddp_pretrain.py:
import os
import argparse
from logging import getLogger
import torch
import torch.multiprocessing as mp
import torch.distributed as dist
from recbole.utils import init_seed, init_logger

from config import Config
from unisrec import UniSRec
from data.dataset import PretrainUniSRecDataset
from data.dataloader import CustomizedTrainDataLoader
from trainer import DDPPretrainTrainer


def pretrain(rank, world_size, dataset, **kwargs):
    # configurations initialization
    torch.distributed.init_process_group(backend='nccl', rank=rank, world_size=world_size)
    props = ['props/UniSRec.yaml', 'props/pretrain.yaml']
    if rank == 0:
        print('DDP Pre-training on:', dataset)
        print(props)

    # configurations initialization
    kwargs.update({'ddp': True, 'rank': rank, 'world_size': world_size})
    config = Config(model=UniSRec, dataset=dataset, config_file_list=props, config_dict=kwargs)
    init_seed(config['seed'], config['reproducibility'])
    # logger initialization
    if config['rank'] not in [-1, 0]:
        config['state'] = 'warning'
    init_logger(config)
    logger = getLogger()
    logger.info(config)

    # dataset filtering
    dataset = PretrainUniSRecDataset(config)
    logger.info(dataset)

    pretrain_dataset = dataset.build()[0]
    pretrain_data = CustomizedTrainDataLoader(config, pretrain_dataset, None, shuffle=True)

    # Move model to GPU before wrapping it with DistributedDataParallel
    device = torch.device(f'cuda:{rank}')
    model = UniSRec(config, pretrain_data.dataset).to(device)  # Ensure model is moved to the correct GPU
    logger.info(model)

    # trainer loading and initialization
    trainer = DDPPretrainTrainer(config, model)

    # model pre-training
    trainer.pretrain(pretrain_data, show_progress=(rank == 0))

    dist.destroy_process_group()

    return config['model'], config['dataset']


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-d', type=str, default='FHCKM', help='dataset name')
    parser.add_argument('-p', type=str, default='12355', help='port for ddp')
    args, unparsed = parser.parse_known_args()

    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}."
    world_size = n_gpus

    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = args.p

    mp.spawn(pretrain,
             args=(world_size, args.d,),
             nprocs=world_size,
             join=True)

@louhangyu
Copy link

我通过以下步骤来解决这个问题,希望它会有所帮助

  1. trainer.py:
import os
from time import time
from tqdm import tqdm
import torch
from torch.nn.utils.clip_grad import clip_grad_norm_
from recbole.trainer import Trainer
from recbole.utils import set_color, get_gpu_usage


class DDPPretrainTrainer(Trainer):
    def __init__(self, config, model):
        super(DDPPretrainTrainer, self).__init__(config, model)
        self.pretrain_epochs = self.config['pretrain_epochs']
        self.save_step = self.config['save_step']
        self.rank = config['rank']
        self.world_size = config['world_size']
        self.lrank = self._build_distribute(rank=self.rank, world_size=self.world_size)
        self.logger.info(f'Let\'s use {torch.cuda.device_count()} GPUs to train {self.config["model"]} ...')

    def _build_distribute(self, rank, world_size):
        from torch.nn.parallel import DistributedDataParallel
        # credit to @Juyong Jiang
        # 1 set backend
        # torch.distributed.init_process_group(backend='nccl', rank=rank, world_size=world_size)
        # 2 get distributed id
        local_rank = torch.distributed.get_rank()
        torch.cuda.set_device(local_rank)
        device_dis = torch.device("cuda", local_rank)
        # 3, 4 assign model to be distributed

        # self.model.to(device_dis)
        self.model = DistributedDataParallel(self.model, 
                                             device_ids=[local_rank],
                                             output_device=local_rank).module
        return local_rank

    def save_pretrained_model(self, epoch, saved_model_file):
        r"""Store the model parameters information and training information.

        Args:
            epoch (int): the current epoch id
            saved_model_file (str): file name for saved pretrained model

        """
        state = {
            'config': self.config,
            'epoch': epoch,
            'state_dict': self.model.state_dict(),
            'optimizer': self.optimizer.state_dict(),
        }
        torch.save(state, saved_model_file)

    def _trans_dataload(self, interaction):
        from torch.utils.data import DataLoader
        from torch.utils.data.distributed import DistributedSampler

        #using pytorch dataload to re-wrap dataset
        def sub_trans(dataset):
            dis_loader = DataLoader(dataset=dataset,
                                    batch_size=dataset.shape[0],
                                    sampler=DistributedSampler(dataset, shuffle=False))
            for data in dis_loader:
                batch_data = data

            return batch_data
        #change `interaction` datatype to a python `dict` object.  
        #for some methods, you may need transfer more data unit like the following way.  

        data_dict = {}
        for k, v in interaction.interaction.items():
            data_dict[k] = sub_trans(v)
        return data_dict

    def _train_epoch(self, train_data, epoch_idx, loss_func=None, show_progress=False):
        self.model.train()
        loss_func = loss_func or self.model.calculate_loss
        total_loss = None
        iter_data = (
            tqdm(
                train_data,
                total=len(train_data),
                ncols=100,
                desc=set_color(f"Train {epoch_idx:>5}", 'pink'),
            ) if show_progress else train_data
        )
        for batch_idx, interaction in enumerate(iter_data):
            interaction = interaction.to(self.device)
            interaction = self._trans_dataload(interaction)
            self.optimizer.zero_grad()
            losses = loss_func(interaction)
            if isinstance(losses, tuple):
                loss = sum(losses)
                loss_tuple = tuple(per_loss.item() for per_loss in losses)
                total_loss = loss_tuple if total_loss is None else tuple(map(sum, zip(total_loss, loss_tuple)))
            else:
                loss = losses
                total_loss = losses.item() if total_loss is None else total_loss + losses.item()
            self._check_nan(loss)
            loss.backward()
            if self.clip_grad_norm:
                clip_grad_norm_(self.model.parameters(), **self.clip_grad_norm)
            self.optimizer.step()
            if self.gpu_available and show_progress:
                iter_data.set_postfix_str(set_color('GPU RAM: ' + get_gpu_usage(self.device), 'yellow'))
        return total_loss

    def pretrain(self, train_data, verbose=True, show_progress=False):
        for epoch_idx in range(self.start_epoch, self.pretrain_epochs):
            # train
            training_start_time = time()
            train_loss = self._train_epoch(train_data, epoch_idx, show_progress=show_progress)
            self.train_loss_dict[epoch_idx] = sum(train_loss) if isinstance(train_loss, tuple) else train_loss
            training_end_time = time()
            train_loss_output = \
                self._generate_train_loss_output(epoch_idx, training_start_time, training_end_time, train_loss)
            if verbose:
                self.logger.info(train_loss_output)
            self._add_train_loss_to_tensorboard(epoch_idx, train_loss)

            if (epoch_idx + 1) % self.save_step == 0 and self.lrank == 0:
                saved_model_file = os.path.join(
                    self.checkpoint_dir,
                    '{}-{}-{}.pth'.format(self.config['model'], self.config['dataset'], str(epoch_idx + 1))
                )
                self.save_pretrained_model(epoch_idx, saved_model_file)
                update_output = set_color('Saving current', 'blue') + ': %s' % saved_model_file
                if verbose:
                    self.logger.info(update_output)

        return self.best_valid_score, self.best_valid_result
  1. ddp_pretrain.py:
import os
import argparse
from logging import getLogger
import torch
import torch.multiprocessing as mp
import torch.distributed as dist
from recbole.utils import init_seed, init_logger

from config import Config
from unisrec import UniSRec
from data.dataset import PretrainUniSRecDataset
from data.dataloader import CustomizedTrainDataLoader
from trainer import DDPPretrainTrainer


def pretrain(rank, world_size, dataset, **kwargs):
    # configurations initialization
    torch.distributed.init_process_group(backend='nccl', rank=rank, world_size=world_size)
    props = ['props/UniSRec.yaml', 'props/pretrain.yaml']
    if rank == 0:
        print('DDP Pre-training on:', dataset)
        print(props)

    # configurations initialization
    kwargs.update({'ddp': True, 'rank': rank, 'world_size': world_size})
    config = Config(model=UniSRec, dataset=dataset, config_file_list=props, config_dict=kwargs)
    init_seed(config['seed'], config['reproducibility'])
    # logger initialization
    if config['rank'] not in [-1, 0]:
        config['state'] = 'warning'
    init_logger(config)
    logger = getLogger()
    logger.info(config)

    # dataset filtering
    dataset = PretrainUniSRecDataset(config)
    logger.info(dataset)

    pretrain_dataset = dataset.build()[0]
    pretrain_data = CustomizedTrainDataLoader(config, pretrain_dataset, None, shuffle=True)

    # Move model to GPU before wrapping it with DistributedDataParallel
    device = torch.device(f'cuda:{rank}')
    model = UniSRec(config, pretrain_data.dataset).to(device)  # Ensure model is moved to the correct GPU
    logger.info(model)

    # trainer loading and initialization
    trainer = DDPPretrainTrainer(config, model)

    # model pre-training
    trainer.pretrain(pretrain_data, show_progress=(rank == 0))

    dist.destroy_process_group()

    return config['model'], config['dataset']


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-d', type=str, default='FHCKM', help='dataset name')
    parser.add_argument('-p', type=str, default='12355', help='port for ddp')
    args, unparsed = parser.parse_known_args()

    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}."
    world_size = n_gpus

    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = args.p

    mp.spawn(pretrain,
             args=(world_size, args.d,),
             nprocs=world_size,
             join=True)

thanks love you brother

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants