Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
O
Optimus Prime
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
NetSys
Optimus Prime
Commits
017197d6
Unverified
Commit
017197d6
authored
9 months ago
by
Alexandru-Mihai GHERGHESCU
Browse files
Options
Downloads
Patches
Plain Diff
WIP training checkpointing
parent
cb1a7974
No related branches found
No related tags found
No related merge requests found
Pipeline
#72405
passed
9 months ago
Stage: test_gpu
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
optimus/trainer.py
+85
-7
85 additions, 7 deletions
optimus/trainer.py
optimus/utils/checkpoint_utils.py
+84
-0
84 additions, 0 deletions
optimus/utils/checkpoint_utils.py
with
169 additions
and
7 deletions
optimus/trainer.py
+
85
−
7
View file @
017197d6
...
@@ -8,7 +8,7 @@ import torch.nn as nn
...
@@ -8,7 +8,7 @@ import torch.nn as nn
from
torch.utils.data
import
DataLoader
from
torch.utils.data
import
DataLoader
from
transformers.tokenization_utils_base
import
PreTrainedTokenizerBase
from
transformers.tokenization_utils_base
import
PreTrainedTokenizerBase
from
optimus.utils
import
dist_utils
,
logging_utils
from
optimus.utils
import
dist_utils
,
logging_utils
,
checkpoint_utils
logger
=
logging
.
getLogger
(
__name__
)
logger
=
logging
.
getLogger
(
__name__
)
...
@@ -81,6 +81,39 @@ class TrainingArguments():
...
@@ -81,6 +81,39 @@ class TrainingArguments():
self
.
save_limit
=
save_limit
self
.
save_limit
=
save_limit
class
TrainerState
():
"""
State of a trainer. This contains information that needs to be persisted
across checkpoints, such as current epoch or current number of steps.
"""
def
__init__
(
self
):
self
.
current_epoch
=
0
self
.
global_steps
=
0
def
to_json_file
(
self
,
file
:
Path
):
"""
Save the current trainer state as a json file.
Args:
file (Path): Path where the trainer state json file should be saved.
"""
print
(
'
saving json trainer state
'
)
# TODO probably should just be to_json
@classmethod
def
from_json_file
(
cls
,
file
:
Path
):
"""
Reload the state of a trainer from a json file. Note: It is very likely
that training *will not work as expected* if you resume training from a
checkpoint with a trainer state that has been manually altered.
Args:
file (Path): Path to a json file (usually contained inside a
checkpoint directory),
"""
print
(
'
loading trainer state from json
'
)
#TODO
class
Trainer
():
class
Trainer
():
"""
"""
Generic PyTorch trainer implementation.
Generic PyTorch trainer implementation.
...
@@ -112,13 +145,25 @@ class Trainer():
...
@@ -112,13 +145,25 @@ class Trainer():
self
.
eval_dataloader
=
eval_dataloader
self
.
eval_dataloader
=
eval_dataloader
self
.
tokenizer
=
tokenizer
self
.
tokenizer
=
tokenizer
# initialize an empty trainer state (if resuming from a checkpoint,
# will be updated later)
self
.
state
=
TrainerState
()
if
self
.
args
.
log_on_main_rank_only
:
if
self
.
args
.
log_on_main_rank_only
:
logger
.
addFilter
(
logging_utils
.
FilterMainRankOnly
())
logger
.
addFilter
(
logging_utils
.
FilterMainRankOnly
())
def
train
(
self
)
->
None
:
# TODO also expose 'resume_from_checkpoint' as CLI arg in training.py
def
train
(
self
,
resume_from_checkpoint
:
bool
|
Path
=
False
)
->
None
:
"""
"""
Training loop of the trainer.
Training loop of the trainer.
Args:
resume_from_checkpoint (bool or Path): Whether training should
resume from a checkpoint or start from scratch. If yes, then
training resumes from the checkpoint with the latest
modification time saved in `checkpoints_dir` (see
`TrainingArguments`).
"""
"""
num_examples
=
len
(
self
.
train_dataloader
)
*
self
.
args
.
per_device_batch_size
num_examples
=
len
(
self
.
train_dataloader
)
*
self
.
args
.
per_device_batch_size
num_update_steps_per_epoch
=
len
(
self
.
train_dataloader
)
//
self
.
args
.
gradient_accumulation_steps
num_update_steps_per_epoch
=
len
(
self
.
train_dataloader
)
//
self
.
args
.
gradient_accumulation_steps
...
@@ -134,6 +179,30 @@ class Trainer():
...
@@ -134,6 +179,30 @@ class Trainer():
# scaler used for mixed precision fp16 training on GPU
# scaler used for mixed precision fp16 training on GPU
self
.
scaler
=
torch
.
cuda
.
amp
.
GradScaler
(
enabled
=
self
.
args
.
use_fp16
)
self
.
scaler
=
torch
.
cuda
.
amp
.
GradScaler
(
enabled
=
self
.
args
.
use_fp16
)
epochs_trained
=
0
steps_trained_in_current_epoch
=
0
# potentially resume from saved checkpoint
if
isinstance
(
resume_from_checkpoint
,
bool
)
and
resume_from_checkpoint
:
last_checkpoint
:
Path
=
checkpoint_utils
.
get_last_checkpoint_dir
(
self
.
args
.
checkpoints_dir
)
if
last_checkpoint
is
not
None
:
logger
.
info
(
f
"
Resuming training from checkpoint
{
str
(
last_checkpoint
)
}
"
)
trainer_state
=
TrainerState
.
from_json_file
(
os
.
path
.
join
(
'
checkpoint-1500
'
,
'
trainer_state.json
'
))
#trainer_state.to_json_file(os.path.join('checkpoint-1500', 'trainer_state.json'))
# TODO write this function and check it
# TODO also write cu.save_checkpoint()
# checkpoint_utils.load_from_checkpoint(
# last_checkpoint,
# self.state,
# self.model,
# self.optimizer,
# self.scheduler,
# #self.scaler, # ?
# )
self
.
model
.
train
()
logger
.
info
(
"
***** Running training *****
"
)
logger
.
info
(
"
***** Running training *****
"
)
logger
.
info
(
f
"
Num examples =
{
num_examples
:
,
}
"
)
logger
.
info
(
f
"
Num examples =
{
num_examples
:
,
}
"
)
logger
.
info
(
f
"
Num epochs =
{
self
.
args
.
num_train_epochs
:
,
}
"
)
logger
.
info
(
f
"
Num epochs =
{
self
.
args
.
num_train_epochs
:
,
}
"
)
...
@@ -142,16 +211,24 @@ class Trainer():
...
@@ -142,16 +211,24 @@ class Trainer():
logger
.
info
(
f
"
Global batch size (w. distributed & accumulation) =
{
global_batch_size
:
,
}
"
)
logger
.
info
(
f
"
Global batch size (w. distributed & accumulation) =
{
global_batch_size
:
,
}
"
)
logger
.
info
(
f
"
Total optimization steps =
{
max_steps
:
,
}
"
)
logger
.
info
(
f
"
Total optimization steps =
{
max_steps
:
,
}
"
)
self
.
model
.
train
()
# start training for num_train_epochs (or less if resumed from checkpoint)
for
epoch
in
range
(
self
.
state
.
current_epoch
,
self
.
args
.
num_train_epochs
):
# start training for num_train_epochs
for
epoch
in
range
(
self
.
args
.
num_train_epochs
):
# needed for distributed sampler RNG state
# needed for distributed sampler RNG state
if
hasattr
(
self
.
train_dataloader
,
"
set_epoch
"
):
if
hasattr
(
self
.
train_dataloader
,
"
set_epoch
"
):
self
.
train_dataloader
.
set_epoch
(
epoch
)
self
.
train_dataloader
.
set_epoch
(
epoch
)
# resume from checkpoint, update steps
# TODO grad_acc_steps for this?
for
step
in
range
(
0
,
self
.
state
.
global_steps
):
self
.
args
.
lr_scheduler
.
step
()
for
step
,
inputs
in
enumerate
(
self
.
train_dataloader
):
for
step
,
inputs
in
enumerate
(
self
.
train_dataloader
):
steps_trained_in_current_epoch
+=
1
# check if resume from training
if
steps_trained_in_current_epoch
<
self
.
state
.
global_steps
:
continue
inputs
=
inputs
[
'
input_ids
'
]
inputs
=
inputs
[
'
input_ids
'
]
inputs
=
inputs
.
to
(
self
.
args
.
device
)
inputs
=
inputs
.
to
(
self
.
args
.
device
)
...
@@ -164,7 +241,7 @@ class Trainer():
...
@@ -164,7 +241,7 @@ class Trainer():
logits
=
logits
[...,
:
-
1
,
:].
contiguous
().
view
(
logits
=
logits
[...,
:
-
1
,
:].
contiguous
().
view
(
-
1
,
-
1
,
self
.
model
.
module
.
vocab_size
self
.
model
.
module
.
vocab_size
if
dist_utils
.
is_dist_available_and_initialized
()
if
dist_utils
.
is_dist_available_and_initialized
()
# TODO this should probably be a hasattr(self.model, 'module')
else
self
.
model
.
vocab_size
else
self
.
model
.
vocab_size
)
)
...
@@ -195,6 +272,7 @@ class Trainer():
...
@@ -195,6 +272,7 @@ class Trainer():
if
(
step
+
1
)
%
self
.
args
.
log_steps
*
self
.
args
.
gradient_accumulation_steps
==
0
:
if
(
step
+
1
)
%
self
.
args
.
log_steps
*
self
.
args
.
gradient_accumulation_steps
==
0
:
logger
.
info
(
f
"
Loss is
{
tr_loss
:
,
}
"
)
logger
.
info
(
f
"
Loss is
{
tr_loss
:
,
}
"
)
trainer_state
.
to_json_file
(
os
.
path
.
join
(
'
checkpoint-1500
'
,
'
trainer_state.json
'
))
self
.
progress
.
update
(
1
)
self
.
progress
.
update
(
1
)
...
...
This diff is collapsed.
Click to expand it.
optimus/utils/checkpoint_utils.py
0 → 100644
+
84
−
0
View file @
017197d6
import
os
import
re
import
logging
from
pathlib
import
Path
from
optimus.trainer
import
TrainerState
logger
=
logging
.
getLogger
(
__name__
)
# files used for checkpointing
TRAINING_ARGS_NAME
=
"
training_args.bin
"
TRAINER_STATE_NAME
=
"
trainer_state.json
"
OPTIMIZER_NAME
=
"
optimizer.pt
"
OPTIMIZER_NAME_BIN
=
"
optimizer.bin
"
SCHEDULER_NAME
=
"
scheduler.pt
"
SCALER_NAME
=
"
scaler.pt
"
# FSDP_MODEL_NAME = "pytorch_model_fsdp"
def
get_last_checkpoint_dir
(
dir
:
Path
):
"""
Get last saved (highest global step) checkpoint in a directory, matching the
pattern `checkpoint-<number>`, where number is the global step of training.
Each checkpoint should be in itself another directory.
Args:
dir (Path): The checkpoints directory.
Returns (Path or None): Returns the path to the last checkpoint, or `None`
if no valid checkpoints were found.
"""
checkpoint_regex
=
re
.
compile
(
r
"
^checkpoint-(\d+)$
"
)
checkpoints_dirs
=
os
.
listdir
(
dir
)
checkpoints
=
[
path
for
path
in
checkpoints_dirs
if
checkpoint_regex
.
search
(
path
)
is
not
None
and
os
.
path
.
isdir
(
os
.
path
.
join
(
dir
,
path
))
]
if
len
(
checkpoints
)
==
0
:
return
None
return
os
.
path
.
join
(
dir
,
max
(
checkpoints
,
key
=
lambda
x
:
int
(
checkpoint_regex
.
search
(
x
).
groups
()[
0
]))
)
def
load_from_checkpoint
(
chkpt
:
Path
,
trainer_state
:
TrainerState
,
# model: nn.Module,
# optimizer: torch.optim.Optimizer,
# scheduler: torch.optim.lr_scheduler.LRScheduler,
):
"""
Load the TODO from an existing checkpoint. This ensures training can
resume exactly as it was before checkpointing.
Args:
chkpt (Path): Path to checkpoint directory.
trainer_state (TrainerState): Trainer state to load.
"""
trainer_state
=
TrainerState
.
from_json_file
(
os
.
path
.
join
(
chkpt
,
TRAINER_STATE_NAME
))
pass
def
save_checkpoint
(
chkpt
:
Path
,
trainer_state
:
TrainerState
,
):
"""
Save TODO states as a checkpoint. This ensures training can be resumed
exactly as it was before saving.
Args:
chkpt (Path): Path to checkpoint directory.
trainer_state (TrainerState): Trainer state to save.
"""
pass
# chkpt.mkdir(parents=True, exist_ok=True)
# trainer_state.to_json_file()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment