Skip to content
Snippets Groups Projects

Draft: Add support for data parallelism on a single node

Open Vlad-Andrei BĂDOIU (78692) requested to merge vladb/ddp into main
+ 32
10
@@ -2,12 +2,13 @@ import fire
import torch
from torch import nn
from optimus.datasets import WikiText103Dataset
from optimus.datasets import TinyStoriesDataset
from optimus.tokenizers import SentencePieceTokenizer
from optimus.dataloader import OptimusDataLoader
from optimus.distributon.dataloader import build_dataloader
from optimus.models import OptimusTransformer
from optimus.trainer import Trainer
from optimus.distributon import Distributon
def main(batch_size: int = 8,
grad_acc_steps: int = 1,
@@ -21,7 +22,8 @@ def main(batch_size: int = 8,
n_layers: int = 6,
n_heads: int = 8,
dropout: float = 0.0,
use_fp16: bool = True):
use_fp16: bool = True,
distributed: bool = False):
"""
Run the main training loop for the model.
@@ -61,21 +63,37 @@ def main(batch_size: int = 8,
f"\t- 16-bit floating-point training (fp16): {use_fp16}\n"
f"Please see '--help' if you want to change these settings")
# Launch the distributed proccesses
if distributed:
distributon = Distributon([f"cuda:{i}" for i in range(torch.cuda.device_count())])
distributon.launch()
device = distributon._strategy.root_device
else:
distributon = None
# load tokenizer
tok = SentencePieceTokenizer(model_path=tokenizer_path)
# load dataset splits
train_ds = WikiText103Dataset(split='train')
test_ds = WikiText103Dataset(split='test')
train_ds = TinyStoriesDataset(split='train', tokenizer=tok)
test_ds = TinyStoriesDataset(split='test', tokenizer=tok)
print(f"Number of examples in training set: {len(train_ds)}")
print(f"Number of examples in testing set: {len(test_ds)}")
# create dataloader object and move to device
dl = OptimusDataLoader(train_ds, test_ds, tok,
# create the dataloaders
train_loader = build_dataloader(train_ds,
bs=batch_size,
seq_len=seq_len,
device='cuda')
device=device,
distributed=distributed)
valid_loader = build_dataloader(train_ds,
bs=batch_size,
seq_len=seq_len,
device=device,
distributed=distributed)
# create model and move to device
model = OptimusTransformer(len(tok),
@@ -84,7 +102,9 @@ def main(batch_size: int = 8,
n_heads=n_heads,
p_drop=dropout,
weight_tying=False)
model = model.to('cuda')
if not distributed:
model = model.to('cuda')
_total_params = sum(p.numel() for p in model.parameters())
print(f"Number of model parameters: {_total_params}")
@@ -101,7 +121,8 @@ def main(batch_size: int = 8,
print("Starting training...")
# create trainer and start fitting
trainer = Trainer(dl=dl,
trainer = Trainer(train_loader=train_loader,
valid_loader=valid_loader,
model=model,
criterion=criterion,
optimizer=optimizer,
@@ -110,6 +131,7 @@ def main(batch_size: int = 8,
grad_clip_norm=grad_clip_norm,
model_save_path=checkpoints_path,
use_fp16=use_fp16,
distributon=distributon,
progress_bar=True)
trainer.fit(epochs)
Loading