diff --git a/rdev_requirements.txt b/rdev_requirements.txt index 47fa2266..23568350 100644 --- a/rdev_requirements.txt +++ b/rdev_requirements.txt @@ -4,7 +4,7 @@ packaging pydantic<2,!=1.10.20 pytest requests -setuptools +setuptools < 80.0 setuptools_scm >= 6.2, < 8 tomlkit tomli diff --git a/subprojects/robotpy-wpilib/tests/conftest.py b/subprojects/robotpy-wpilib/tests/conftest.py index 88728aa9..1c61662d 100644 --- a/subprojects/robotpy-wpilib/tests/conftest.py +++ b/subprojects/robotpy-wpilib/tests/conftest.py @@ -5,6 +5,25 @@ import wpilib from wpilib.simulation._simulation import _resetWpilibSimulationData +from pathlib import Path + +import gc + +import weakref + +import hal +import hal.simulation +import wpilib.shuffleboard +from wpilib.simulation import DriverStationSim, pauseTiming, restartTiming +import wpilib.simulation +from pyfrc.test_support.controller import TestController +from pyfrc.physics.core import PhysicsInterface + +try: + import commands2 +except ImportError: + commands2 = None + @pytest.fixture def cfg_logging(caplog): @@ -29,3 +48,133 @@ def nt(cfg_logging, wpilib_state): finally: instance.stopLocal() instance._reset() + + +@pytest.fixture(scope="class", autouse=False) +def physics_and_decorated_robot_class( + myrobot_class, robots_sim_enable_physics +) -> tuple: + # attach physics + + robotClass = myrobot_class + physicsInterface = None + if robots_sim_enable_physics: + physicsInterface, robotClass = PhysicsInterface._create_and_attach( + myrobot_class, + Path(__file__).parent, + ) + + if physicsInterface: + physicsInterface.log_init_errors = False + + # Tests need to know when robotInit is called, so override the robot + # to do that + class TestRobot(robotClass): + def robotInit(self): + try: + super().robotInit() + finally: + self.__robotInitialized() + + TestRobot.__name__ = robotClass.__name__ + TestRobot.__module__ = robotClass.__module__ + TestRobot.__qualname__ = robotClass.__qualname__ + + return (physicsInterface, TestRobot) + + +@pytest.fixture(scope="function", autouse=False) +def robot_with_sim_setup_teardown(physics_and_decorated_robot_class): + """ + Your robot instance + + .. note:: RobotPy/WPILib testing infrastructure is really sensitive + to ensuring that things get cleaned up properly. Make sure + that you don't store references to your robot or other + WPILib objects in a global or static context. + """ + + # + # This function needs to do the same things that RobotBase.main does + # plus some extra things needed for testing + # + # Previously this was separate from robot fixture, but we need to + # ensure that the robot cleanup happens deterministically relative to + # when handle cleanup/etc happens, otherwise unnecessary HAL errors will + # bubble up to the user + # + + nt_inst = ntcore.NetworkTableInstance.getDefault() + nt_inst.startLocal() + + pauseTiming() + restartTiming() + + wpilib.DriverStation.silenceJoystickConnectionWarning(True) + DriverStationSim.setAutonomous(False) + DriverStationSim.setEnabled(False) + DriverStationSim.notifyNewData() + + robot = physics_and_decorated_robot_class[1]() + + # Tests only get a proxy to ensure cleanup is more reliable + yield weakref.proxy(robot) + + # If running in separate processes, no need to do cleanup + # if ISOLATED: + # # .. and funny enough, in isolated mode we *don't* want the + # # robot to be cleaned up, as that can deadlock + # self._saved_robot = robot + # return + + # reset engine to ensure it gets cleaned up too + # -> might be holding wpilib objects, or the robot + if physics_and_decorated_robot_class[0]: + physics_and_decorated_robot_class[0].engine = None + + # HACK: avoid motor safety deadlock + wpilib.simulation._simulation._resetMotorSafety() + + del robot + + if commands2 is not None: + commands2.CommandScheduler.resetInstance() + + # Double-check all objects are destroyed so that HAL handles are released + gc.collect() + + # shutdown networktables before other kinds of global cleanup + # -> some reset functions will re-register listeners, so it's important + # to do this before so that the listeners are active on the current + # NetworkTables instance + nt_inst.stopLocal() + nt_inst._reset() + + # Cleanup WPILib globals + # -> preferences, SmartDashboard, Shuffleboard, LiveWindow, MotorSafety + wpilib.simulation._simulation._resetWpilibSimulationData() + wpilib._wpilib._clearSmartDashboardData() + wpilib.shuffleboard._shuffleboard._clearShuffleboardData() + + # Cancel all periodic callbacks + hal.simulation.cancelAllSimPeriodicCallbacks() + + # Reset the HAL handles + hal.simulation.resetGlobalHandles() + + # Reset the HAL data + hal.simulation.resetAllSimData() + + # Don't call HAL shutdown! This is only used to cleanup HAL extensions, + # and functions will only be called the first time (unless re-registered) + # hal.shutdown() + + +@pytest.fixture(scope="function") +def getTestController( + reraise, robot_with_sim_setup_teardown: wpilib.RobotBase +) -> TestController: + """ + A pytest fixture that provides control over your robot_with_sim_setup_teardown + """ + return TestController(reraise, robot_with_sim_setup_teardown) diff --git a/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py b/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py new file mode 100644 index 00000000..25900c85 --- /dev/null +++ b/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py @@ -0,0 +1,112 @@ +""" +Proof of concept to test TimedRobotPy using PyTest + +This POC was made by deconstructing pytest_plugin.py so that it is no longer a plugin but a class that provides +fixtures. + +To run / debug this: + +pytest subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py --no-header -vvv -s + +""" + +import pytest + +from wpilib.timedrobotpy import TimedRobotPy + + +def run_practice(control: "TestController"): + """Runs through the entire span of a practice match""" + + with control.run_robot(): + # Run disabled for a short period + control.step_timing(seconds=0.5, autonomous=True, enabled=False) + + # Run autonomous + enabled for 15 seconds + control.step_timing(seconds=15, autonomous=True, enabled=True) + + # Disabled for another short period + control.step_timing(seconds=0.5, autonomous=False, enabled=False) + + # Run teleop + enabled for 2 minutes + control.step_timing(seconds=120, autonomous=False, enabled=True) + + +class TestThings: + + class MyRobot(TimedRobotPy): + def __init__(self): + super().__init__(period=0.020) + self.robotInitialized = False + self.robotPeriodicCount = 0 + + ######################################################### + ## Common init/update for all modes + def robotInit(self): + self.robotInitialized = True + + def robotPeriodic(self): + self.robotPeriodicCount += 1 + + ######################################################### + ## Autonomous-Specific init and update + def autonomousInit(self): + pass + + def autonomousPeriodic(self): + pass + + def autonomousExit(self): + pass + + ######################################################### + ## Teleop-Specific init and update + def teleopInit(self): + pass + + def teleopPeriodic(self): + pass + + ######################################################### + ## Disabled-Specific init and update + def disabledPeriodic(self): + pass + + def disabledInit(self): + pass + + ######################################################### + ## Test-Specific init and update + def testInit(self): + pass + + def testPeriodic(self): + pass + + @classmethod + @pytest.fixture(scope="class", autouse=True) + def myrobot_class(cls) -> type[MyRobot]: + return cls.MyRobot + + @classmethod + @pytest.fixture(scope="class", autouse=True) + def robots_sim_enable_physics(cls) -> bool: + return False + + def test_iterative(self, getTestController, robot_with_sim_setup_teardown): + """Ensure that all states of the iterative robot run""" + assert robot_with_sim_setup_teardown.robotInitialized == False + assert robot_with_sim_setup_teardown.robotPeriodicCount == 0 + run_practice(getTestController) + + assert robot_with_sim_setup_teardown.robotInitialized == True + assert robot_with_sim_setup_teardown.robotPeriodicCount > 0 + + def test_iterative_again(self, getTestController, robot_with_sim_setup_teardown): + """Ensure that all states of the iterative robot run""" + assert robot_with_sim_setup_teardown.robotInitialized == False + assert robot_with_sim_setup_teardown.robotPeriodicCount == 0 + run_practice(getTestController) + + assert robot_with_sim_setup_teardown.robotInitialized == True + assert robot_with_sim_setup_teardown.robotPeriodicCount > 0 diff --git a/subprojects/robotpy-wpilib/tests/test_timedrobot.py b/subprojects/robotpy-wpilib/tests/test_timedrobot.py new file mode 100644 index 00000000..168d5b87 --- /dev/null +++ b/subprojects/robotpy-wpilib/tests/test_timedrobot.py @@ -0,0 +1,41 @@ +import pytest + +from wpilib.timedrobotpy import _Callback + + +def test_calcFutureExpirationUs() -> None: + cb = _Callback(func=None, periodUs=20_000, expirationUs=100) + assert cb.calcFutureExpirationUs(100) == 20_100 + assert cb.calcFutureExpirationUs(101) == 20_100 + assert cb.calcFutureExpirationUs(20_099) == 20_100 + assert cb.calcFutureExpirationUs(20_100) == 40_100 + assert cb.calcFutureExpirationUs(20_101) == 40_100 + + cb = _Callback(func=None, periodUs=40_000, expirationUs=500) + assert cb.calcFutureExpirationUs(500) == 40_500 + assert cb.calcFutureExpirationUs(501) == 40_500 + assert cb.calcFutureExpirationUs(40_499) == 40_500 + assert cb.calcFutureExpirationUs(40_500) == 80_500 + assert cb.calcFutureExpirationUs(40_501) == 80_500 + + cb = _Callback(func=None, periodUs=1_000, expirationUs=0) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_000_000) + == 1_000_000_000_000_001_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_000_001) + == 1_000_000_000_000_001_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_000_999) + == 1_000_000_000_000_001_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_001_000) + == 1_000_000_000_000_002_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_001_001) + == 1_000_000_000_000_002_000 + ) diff --git a/subprojects/robotpy-wpilib/wpilib/__init__.py b/subprojects/robotpy-wpilib/wpilib/__init__.py index 9ee20f8d..ca43bbf5 100644 --- a/subprojects/robotpy-wpilib/wpilib/__init__.py +++ b/subprojects/robotpy-wpilib/wpilib/__init__.py @@ -233,6 +233,8 @@ from .cameraserver import CameraServer from .deployinfo import getDeployData +from .iterativerobotpy import IterativeRobotPy +from .timedrobotpy import TimedRobotPy try: from .version import version as __version__ @@ -241,4 +243,4 @@ from ._impl.main import run -__all__ += ["CameraServer", "run"] +__all__ += ["CameraServer", "run", "IterativeRobotPy", "TimedRobotPy"] diff --git a/subprojects/robotpy-wpilib/wpilib/iterativerobotpy.py b/subprojects/robotpy-wpilib/wpilib/iterativerobotpy.py new file mode 100644 index 00000000..3a572c92 --- /dev/null +++ b/subprojects/robotpy-wpilib/wpilib/iterativerobotpy.py @@ -0,0 +1,476 @@ +from enum import Enum + +from hal import ( + report, + tResourceType, + tInstances, + observeUserProgramDisabled, + observeUserProgramTest, + observeUserProgramAutonomous, + observeUserProgramTeleop, + simPeriodicBefore, + simPeriodicAfter, +) +from ntcore import NetworkTableInstance +from wpilib import ( + DriverStation, + DSControlWord, + Watchdog, + LiveWindow, + RobotBase, + SmartDashboard, + reportWarning, +) +from wpilib.shuffleboard import Shuffleboard +import wpimath.units + +_kResourceType_SmartDashboard = tResourceType.kResourceType_SmartDashboard +_kSmartDashboard_LiveWindow = tInstances.kSmartDashboard_LiveWindow + + +class IterativeRobotMode(Enum): + kNone = 0 + kDisabled = 1 + kAutonomous = 2 + kTeleop = 3 + kTest = 4 + + +# todo should this be IterativeRobotPy or IterativeRobotBase (replacing) or IterativeRobotBasePy +class IterativeRobotPy(RobotBase): + """ + IterativeRobotPy implements a specific type of robot program framework, + extending the RobotBase class. It provides similar functionality as + IterativeRobotBase does in a python wrapper of C++, but in pure python. + + The IterativeRobotPy class does not implement StartCompetition(), so it + should not be used by teams directly. + + This class provides the following functions which are called by the main + loop, StartCompetition(), at the appropriate times: + + robotInit() -- provide for initialization at robot power-on + + DriverStationConnected() -- provide for initialization the first time the DS + is connected + + Init() functions -- each of the following functions is called once when the + appropriate mode is entered: + + - disabledInit() -- called each and every time disabled is entered from another mode + - autonomousInit() -- called each and every time autonomous is entered from another mode + - teleopInit() -- called each and every time teleop is entered from another mode + - testInit() -- called each and every time test is entered from another mode + + Periodic() functions -- each of these functions is called on an interval: + + - robotPeriodic() + - disabledPeriodic() + - autonomousPeriodic() + - teleopPeriodic() + - testPeriodic() + + Exit() functions -- each of the following functions is called once when the + appropriate mode is exited: + + - disabledExit() -- called each and every time disabled is exited + - autonomousExit() -- called each and every time autonomous is exited + - teleopExit() -- called each and every time teleop is exited + - testExit() -- called each and every time test is exited + """ + + def __init__(self, period: wpimath.units.seconds) -> None: + """ + Constructor for IterativeRobotPy. + + :param period: period of the main robot periodic loop in seconds. + """ + super().__init__() + self._periodS = period + self.watchdog = Watchdog(self._periodS, self.printLoopOverrunMessage) + self._networkTableInstanceDefault = NetworkTableInstance.getDefault() + self._mode: IterativeRobotMode = IterativeRobotMode.kNone + self._lastMode: IterativeRobotMode = IterativeRobotMode.kNone + self._ntFlushEnabled: bool = True + self._lwEnabledInTest: bool = False + self._calledDsConnected: bool = False + self._reportedLw: bool = False + self._robotPeriodicHasRun: bool = False + self._simulationPeriodicHasRun: bool = False + self._disabledPeriodicHasRun: bool = False + self._autonomousPeriodicHasRun: bool = False + self._teleopPeriodicHasRun: bool = False + self._testPeriodicHasRun: bool = False + + def robotInit(self) -> None: + """ + Robot-wide initialization code should go here. + + Users should override this method for default Robot-wide initialization + which will be called when the robot is first powered on. It will be called + exactly one time. + + Note: This method is functionally identical to the class constructor so + that should be used instead. + """ + pass + + def driverStationConnected(self) -> None: + """ + Code that needs to know the DS state should go here. + + Users should override this method for initialization that needs to occur + after the DS is connected, such as needing the alliance information. + """ + pass + + def _simulationInit(self) -> None: + """ + Robot-wide simulation initialization code should go here. + + Users should override this method for default Robot-wide simulation + related initialization which will be called when the robot is first + started. It will be called exactly one time after robotInit is called + only when the robot is in simulation. + """ + pass + + def disabledInit(self) -> None: + """ + Initialization code for disabled mode should go here. + + Users should override this method for initialization code which will be + called each time + the robot enters disabled mode. + """ + pass + + def autonomousInit(self) -> None: + """ + Initialization code for autonomous mode should go here. + + Users should override this method for initialization code which will be + called each time the robot enters autonomous mode. + """ + pass + + def teleopInit(self) -> None: + """ + Initialization code for teleop mode should go here. + + Users should override this method for initialization code which will be + called each time the robot enters teleop mode. + """ + pass + + def testInit(self) -> None: + """ + Initialization code for test mode should go here. + + Users should override this method for initialization code which will be + called each time the robot enters test mode. + """ + pass + + def robotPeriodic(self) -> None: + """ + Periodic code for all modes should go here. + + This function is called each time a new packet is received from the driver + station. + """ + if not self._robotPeriodicHasRun: + print(f"Default robotPeriodic() method...Override me!") + self._robotPeriodicHasRun = True + + def _simulationPeriodic(self) -> None: + """ + Periodic simulation code should go here. + + This function is called in a simulated robot after user code executes. + """ + if not self._simulationPeriodicHasRun: + print(f"Default _simulationPeriodic() method...Override me!") + self._simulationPeriodicHasRun = True + + def disabledPeriodic(self) -> None: + """ + Periodic code for disabled mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in disabled + mode. + """ + if not self._disabledPeriodicHasRun: + print(f"Default disabledPeriodic() method...Override me!") + self._disabledPeriodicHasRun = True + + def autonomousPeriodic(self) -> None: + """ + Periodic code for autonomous mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in + autonomous mode. + """ + if not self._autonomousPeriodicHasRun: + print(f"Default autonomousPeriodic() method...Override me!") + self._autonomousPeriodicHasRun = True + + def teleopPeriodic(self) -> None: + """ + Periodic code for teleop mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in teleop + mode. + """ + if not self._teleopPeriodicHasRun: + print(f"Default teleopPeriodic() method...Override me!") + self._teleopPeriodicHasRun = True + + def testPeriodic(self) -> None: + """ + Periodic code for test mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in test + mode. + """ + if not self._testPeriodicHasRun: + print(f"Default testPeriodic() method...Override me!") + self._testPeriodicHasRun = True + + def disabledExit(self) -> None: + """ + Exit code for disabled mode should go here. + + Users should override this method for code which will be called each time + the robot exits disabled mode. + """ + pass + + def autonomousExit(self) -> None: + """ + Exit code for autonomous mode should go here. + + Users should override this method for code which will be called each time + the robot exits autonomous mode. + """ + pass + + def teleopExit(self) -> None: + """ + Exit code for teleop mode should go here. + + Users should override this method for code which will be called each time + the robot exits teleop mode. + """ + pass + + def testExit(self) -> None: + """ + Exit code for test mode should go here. + + Users should override this method for code which will be called each time + the robot exits test mode. + """ + pass + + def setNetworkTablesFlushEnabled(self, enabled: bool) -> None: + """ + Enables or disables flushing NetworkTables every loop iteration. + By default, this is enabled. + + :deprecated: Deprecated without replacement. + + :param enabled: True to enable, false to disable. + """ + + self._ntFlushEnabled = enabled + + def enableLiveWindowInTest(self, testLW: bool) -> None: + """ + Sets whether LiveWindow operation is enabled during test mode. + + :param testLW: True to enable, false to disable. Defaults to false. + @throws if called in test mode. + """ + if self.isTestEnabled(): + raise RuntimeError("Can't configure test mode while in test mode!") + if not self._reportedLw and testLW: + report(_kResourceType_SmartDashboard, _kSmartDashboard_LiveWindow) + self._reportedLw = True + self._lwEnabledInTest = testLW + + def isLiveWindowEnabledInTest(self) -> bool: + """ + Whether LiveWindow operation is enabled during test mode. + """ + return self._lwEnabledInTest + + def getPeriod(self) -> wpimath.units.seconds: + """ + Gets time period between calls to Periodic() functions. + """ + return self._periodS + + def _loopFunc(self) -> None: + """ + Loop function. + """ + DriverStation.refreshData() + self.watchdog.reset() + + isEnabled, isAutonomous, isTest = self.getControlState() + if not isEnabled: + self._mode = IterativeRobotMode.kDisabled + elif isAutonomous: + self._mode = IterativeRobotMode.kAutonomous + elif isTest: + self._mode = IterativeRobotMode.kTest + else: + self._mode = IterativeRobotMode.kTeleop + + if not self._calledDsConnected and DSControlWord().isDSAttached(): + self._calledDsConnected = True + self.driverStationConnected() + + # If self._mode changed, call self._mode exit and entry functions + if self._lastMode is not self._mode: + + if self._lastMode is IterativeRobotMode.kDisabled: + self.disabledExit() + elif self._lastMode is IterativeRobotMode.kAutonomous: + self.autonomousExit() + elif self._lastMode is IterativeRobotMode.kTeleop: + self.teleopExit() + elif self._lastMode is IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(False) + Shuffleboard.disableActuatorWidgets() + self.testExit() + """ + todo switch to match statements when we don't build with python 3.9 + match self._lastMode: + case IterativeRobotMode.kDisabled: + self.disabledExit() + case IterativeRobotMode.kAutonomous: + self.autonomousExit() + case IterativeRobotMode.kTeleop: + self.teleopExit() + case IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(False) + Shuffleboard.disableActuatorWidgets() + self.testExit() + """ + + if self._mode is IterativeRobotMode.kDisabled: + self.disabledInit() + self.watchdog.addEpoch("disabledInit()") + elif self._mode is IterativeRobotMode.kAutonomous: + self.autonomousInit() + self.watchdog.addEpoch("autonomousInit()") + elif self._mode is IterativeRobotMode.kTeleop: + self.teleopInit() + self.watchdog.addEpoch("teleopInit()") + elif self._mode is IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(True) + Shuffleboard.enableActuatorWidgets() + self.testInit() + self.watchdog.addEpoch("testInit()") + """ + match self._mode: + case IterativeRobotMode.kDisabled: + self.disabledInit() + self._watchdog.addEpoch("disabledInit()") + case IterativeRobotMode.kAutonomous: + self.autonomousInit() + self._watchdog.addEpoch("autonomousInit()") + case IterativeRobotMode.kTeleop: + self.teleopInit() + self._watchdog.addEpoch("teleopInit()") + case IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(True) + Shuffleboard.enableActuatorWidgets() + self.testInit() + self._watchdog.addEpoch("testInit()") + """ + self._lastMode = self._mode + + # Call the appropriate function depending upon the current robot mode + if self._mode is IterativeRobotMode.kDisabled: + observeUserProgramDisabled() + self.disabledPeriodic() + self.watchdog.addEpoch("disabledPeriodic()") + elif self._mode is IterativeRobotMode.kAutonomous: + observeUserProgramAutonomous() + self.autonomousPeriodic() + self.watchdog.addEpoch("autonomousPeriodic()") + elif self._mode is IterativeRobotMode.kTeleop: + observeUserProgramTeleop() + self.teleopPeriodic() + self.watchdog.addEpoch("teleopPeriodic()") + elif self._mode is IterativeRobotMode.kTest: + observeUserProgramTest() + self.testPeriodic() + self.watchdog.addEpoch("testPeriodic()") + """ + match self._mode: + case IterativeRobotMode.kDisabled: + observeUserProgramDisabled() + self.disabledPeriodic() + self.watchdog.addEpoch("disabledPeriodic()") + case IterativeRobotMode.kAutonomous: + observeUserProgramAutonomous() + self.autonomousPeriodic() + self.watchdog.addEpoch("autonomousPeriodic()") + case IterativeRobotMode.kTeleop: + observeUserProgramTeleop() + self.teleopPeriodic() + self.watchdog.addEpoch("teleopPeriodic()") + case IterativeRobotMode.kTest: + observeUserProgramTest() + self.testPeriodic() + self.watchdog.addEpoch("testPeriodic()") + """ + + self.robotPeriodic() + self.watchdog.addEpoch("robotPeriodic()") + + SmartDashboard.updateValues() + self.watchdog.addEpoch("SmartDashboard.updateValues()") + + LiveWindow.updateValues() + self.watchdog.addEpoch("LiveWindow.updateValues()") + + Shuffleboard.update() + self.watchdog.addEpoch("Shuffleboard.update()") + + if self.isSimulation(): + simPeriodicBefore() + self._simulationPeriodic() + simPeriodicAfter() + self.watchdog.addEpoch("_simulationPeriodic()") + + self.watchdog.disable() + + # Flush NetworkTables + if self._ntFlushEnabled: + self._networkTableInstanceDefault.flushLocal() + + # Warn on loop time overruns + if self.watchdog.isExpired(): + self.printWatchdogEpochs() + + def printLoopOverrunMessage(self) -> None: + reportWarning(f"Loop time of {self.watchdog.getTimeout()}s overrun", False) + + def printWatchdogEpochs(self) -> None: + """ + Prints list of epochs added so far and their times. + """ + self.watchdog.printEpochs() diff --git a/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py b/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py new file mode 100644 index 00000000..92d3d910 --- /dev/null +++ b/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py @@ -0,0 +1,265 @@ +from typing import Any, Callable, Iterable, ClassVar +from heapq import heappush, heappop +from hal import ( + report, + initializeNotifier, + setNotifierName, + observeUserProgramStarting, + updateNotifierAlarm, + waitForNotifierAlarm, + stopNotifier, + tResourceType, + tInstances, +) +from wpilib import RobotController +import wpimath.units + +from .iterativerobotpy import IterativeRobotPy + +_getFPGATime = RobotController.getFPGATime +_kResourceType_Framework = tResourceType.kResourceType_Framework +_kFramework_Timed = tInstances.kFramework_Timed + +microsecondsAsInt = int + + +class _Callback: + def __init__( + self, + func: Callable[[], None], + periodUs: microsecondsAsInt, + expirationUs: microsecondsAsInt, + ) -> None: + self.func = func + self._periodUs = periodUs + self.expirationUs = expirationUs + + @classmethod + def makeCallBack( + cls, + func: Callable[[], None], + startTimeUs: microsecondsAsInt, + periodUs: microsecondsAsInt, + offsetUs: microsecondsAsInt, + ) -> "_Callback": + + callback = _Callback(func=func, periodUs=periodUs, expirationUs=startTimeUs) + + currentTimeUs = _getFPGATime() + callback.expirationUs = offsetUs + callback.calcFutureExpirationUs( + currentTimeUs + ) + return callback + + def calcFutureExpirationUs( + self, currentTimeUs: microsecondsAsInt + ) -> microsecondsAsInt: + # increment the expiration time by the number of full periods it's behind + # plus one to avoid rapid repeat fires from a large loop overrun. + # + # This routine is called when either: + # this callback has never ran and self.expirationUs is + # TimedRobot._starttimeUs and we are calculating where the first + # expiration should be relative to TimedRobot._starttimeUs + # or: + # this call back has ran and self.expirationUs is when it was scheduled to + # expire and we are calculating when the next expirations should be relative + # to the last scheduled expiration + # + # We assume currentTime ≥ self.expirationUs rather than checking for it since the + # callback wouldn't be running otherwise. + # + # We take when we previously expired or when we started: self.expirationUs + # add + self._periodUs to get at least one period in the future + # then calculate how many whole periods we are behind: + # ((currentTimeUs - self.expirationUs) // self._periodUs) + # and multiply that by self._periodUs to calculate how much time in full + # periods we need to skip to catch up, and add that to the sum to calculate + # when we should run again. + return ( + self.expirationUs + + self._periodUs + + ((currentTimeUs - self.expirationUs) // self._periodUs) * self._periodUs + ) + + def setNextStartTimeUs(self, currentTimeUs: microsecondsAsInt) -> None: + self.expirationUs = self.calcFutureExpirationUs(currentTimeUs) + + def __lt__(self, other) -> bool: + return self.expirationUs < other.expirationUs + + def __bool__(self) -> bool: + return True + + +class _OrderedList: + def __init__(self) -> None: + self._data: list[Any] = [] + + def add(self, item: Any) -> None: + heappush(self._data, item) + + def pop(self) -> Any: + return heappop(self._data) + + def peek( + self, + ) -> Any: # todo change to Any | None when we don't build with python 3.9 + if self._data: + return self._data[0] + else: + return None + + def __len__(self) -> int: + return len(self._data) + + def __iter__(self) -> Iterable[Any]: + return iter(sorted(self._data)) + + def __contains__(self, item) -> bool: + return item in self._data + + def __str__(self) -> str: + return str(sorted(self._data)) + + +# todo what should the name of this class be? +class TimedRobotPy(IterativeRobotPy): + """ + TimedRobotPy implements the IterativeRobotBase robot program framework. + + The TimedRobotPy class is intended to be subclassed by a user creating a + robot program. + + Periodic() functions from the base class are called on an interval by a + Notifier instance. + """ + + kDefaultPeriod: ClassVar[wpimath.units.seconds] = ( + 0.020 # todo this is a change to keep consistent units in the API + ) + + def __init__(self, period: wpimath.units.seconds = kDefaultPeriod) -> None: + """ + Constructor for TimedRobotPy. + + :param period: period of the main robot periodic loop in seconds. + """ + super().__init__(period) + + # All periodic functions created by addPeriodic are relative + # to this self._startTimeUs + self._startTimeUs = _getFPGATime() + self._callbacks = _OrderedList() + self._loopStartTimeUs = 0 + self.addPeriodic(self._loopFunc, period=self._periodS) + + self._notifier, status = initializeNotifier() + if status != 0: + raise RuntimeError( + f"initializeNotifier() returned {self._notifier}, {status}" + ) + + status = setNotifierName(self._notifier, "TimedRobotPy") + if status != 0: + raise RuntimeError(f"setNotifierName() returned {status}") + + report(_kResourceType_Framework, _kFramework_Timed) + + def startCompetition(self) -> None: + """ + Provide an alternate "main loop" via startCompetition(). + """ + self.robotInit() + + if self.isSimulation(): + self._simulationInit() + + # Tell the DS that the robot is ready to be enabled + print("********** Robot program startup complete **********") + observeUserProgramStarting() + + # Loop forever, calling the appropriate mode-dependent function + # (really not forever, there is a check for a break) + while True: + # We don't have to check there's an element in the queue first because + # there's always at least one (the constructor adds one). It's re-enqueued + # at the end of the loop. + callback = self._callbacks.pop() + + status = updateNotifierAlarm(self._notifier, callback.expirationUs) + if status != 0: + raise RuntimeError(f"updateNotifierAlarm() returned {status}") + + currentTimeUs, status = waitForNotifierAlarm(self._notifier) + if status != 0: + raise RuntimeError( + f"waitForNotifierAlarm() returned currentTimeUs={currentTimeUs} status={status}" + ) + + if currentTimeUs == 0: + # when HAL_StopNotifier(self.notifier) is called the above waitForNotifierAlarm + # will return a currentTimeUs==0 and the API requires robots to stop any loops. + # See the API for waitForNotifierAlarm + break + + self._loopStartTimeUs = _getFPGATime() + self._runCallbackAndReschedule(callback, currentTimeUs) + + # Process all other callbacks that are ready to run + while self._callbacks.peek().expirationUs <= currentTimeUs: + callback = self._callbacks.pop() + self._runCallbackAndReschedule(callback, currentTimeUs) + + def _runCallbackAndReschedule( + self, callback: _Callback, currentTimeUs: microsecondsAsInt + ) -> None: + callback.func() + callback.setNextStartTimeUs(currentTimeUs) + self._callbacks.add(callback) + + def endCompetition(self) -> None: + """ + Ends the main loop in startCompetition(). + """ + stopNotifier(self._notifier) + + def getLoopStartTime(self) -> microsecondsAsInt: + """ + Return the system clock time in microseconds + for the start of the current + periodic loop. This is in the same time base as Timer.getFPGATimestamp(), + but is stable through a loop. It is updated at the beginning of every + periodic callback (including the normal periodic loop). + + :returns: Robot running time in microseconds, + as of the start of the current + periodic function. + """ + + return self._loopStartTimeUs + + def addPeriodic( + self, + callback: Callable[[], None], + period: wpimath.units.seconds, + offset: wpimath.units.seconds = 0.0, + ) -> None: + """ + Add a callback to run at a specific period with a starting time offset. + + This is scheduled on TimedRobotPy's Notifier, so TimedRobotPy and the callback + run synchronously. Interactions between them are thread-safe. + + :param callback: The callback to run. + :param period: The period at which to run the callback. + :param offset: The offset from the common starting time. This is useful + for scheduling a callback in a different timeslot relative + to TimedRobotPy. + """ + + self._callbacks.add( + _Callback.makeCallBack( + callback, self._startTimeUs, int(period * 1e6), int(offset * 1e6) + ) + )