Source code for protomotions.agents.callbacks.slurm_autoresume_srun

# SPDX-FileCopyrightText: Copyright (c) 2025 The ProtoMotions Developers
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import time

import wandb

from pytorch_lightning import Callback

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from protomotions.agents.ppo import PPO
else:
    PPO = object

log = logging.getLogger(__name__)


[docs] def wandb_run_exists(): return isinstance(wandb.run, wandb.sdk.wandb_run.Run)
[docs] class AutoResumeCallbackSrun(Callback):
[docs] def __init__(self, autoresume_after=12600) -> None: self.start_time = None self.autoresume_after = autoresume_after print("************************************") print("will autoresume after ", self.autoresume_after)
def _check_autoresume(self, agent: PPO): agent.fabric.strategy.barrier() if self.start_time is None: self.start_time = time.time() if time.time() - self.start_time >= self.autoresume_after: log.info("Should autoresume!") agent.save() # if agent.fabric.global_rank == 0: # wandb_id = wandb.run.id if wandb_run_exists() else "", # message = f"Terminating... wandb_id: {wandb_id}" # log.info(message) # print(f"[Auto Resume] Rank {agent.fabric.global_rank} exiting.", flush=True) agent._should_stop = True log.info(f"should stop, {agent.should_stop}") # agent.fabric.strategy.barrier()
[docs] def before_play_steps(self, agent: PPO) -> None: self._check_autoresume(agent)
[docs] def on_fit_start(self, agent: PPO) -> None: pass
[docs] def on_fit_end(self, agent: PPO) -> None: pass
# # class AutoResumeCallbackSrun(Callback): # def __init__(self, autoresume_after=12600) -> None: # self.start_time = time.time() # self.exit_signal_received = False # # Register the signal handler for SIGUSR2 (signal 12) # signal.signal(signal.SIGUSR2, self._handle_exit_signal) # def _handle_exit_signal(self, signum, frame): # """ # Signal handler for SIGUSR2 (signal 12). # """ # log.info("Received signal %d, initiating save and exit.", signum) # self.exit_signal_received = True # def _check_autoresume(self, agent: PPO): # # Check if signal has been received # if self.exit_signal_received: # log.info("Signal received, saving agent state and stopping.") # agent.save() # # Set the stop flag # agent._should_stop = True # log.info(f"should stop, {agent.should_stop}") # return # def before_play_steps(self, agent: PPO) -> None: # self._check_autoresume(agent) # def on_fit_start(self, agent: PPO) -> None: # pass # def on_fit_end(self, agent: PPO) -> None: # pass