From 41ff5d7ffed0806ee0b1097382b0bc9c779e8d3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Badenes?= Date: Tue, 19 May 2026 16:31:44 -0300 Subject: [PATCH 1/2] Remove deprecated InorbitConnectorConfig class BREAKING CHANGE: InorbitConnectorConfig has been removed. Use ConnectorConfig with a fleet list instead. Co-Authored-By: Claude Opus 4.6 --- docs/contents/specification/models.md | 10 - inorbit_connector/connector.py | 26 +- inorbit_connector/models.py | 37 +-- tests/test_connector.py | 156 +++++------- tests/test_models.py | 350 ++++---------------------- 5 files changed, 112 insertions(+), 467 deletions(-) diff --git a/docs/contents/specification/models.md b/docs/contents/specification/models.md index 3ded9e5..2f7376a 100644 --- a/docs/contents/specification/models.md +++ b/docs/contents/specification/models.md @@ -56,13 +56,3 @@ Logging configuration used by the connector at startup: - `defaults`: dictionary passed to the logging config (e.g. `log_file`). See [setup_logger()](logging.md#spec-logging-setup-logger) for how it is applied. - -## Deprecated: `InorbitConnectorConfig` - -`InorbitConnectorConfig` is a deprecated single-robot configuration format. It can be converted to a fleet config via: - -- `to_fleet_config(robot_id) -> ConnectorConfig` - -New implementations should use `ConnectorConfig` directly. - - diff --git a/inorbit_connector/connector.py b/inorbit_connector/connector.py index 5ddce43..6f6260a 100644 --- a/inorbit_connector/connector.py +++ b/inorbit_connector/connector.py @@ -11,7 +11,6 @@ import socket from pathlib import Path import tempfile -import warnings import asyncio import threading import traceback @@ -67,7 +66,6 @@ ) from inorbit_connector.models import ( ConnectorConfig, - InorbitConnectorConfig, MapConfig, MapConfigTemp, RobotConfig, @@ -926,12 +924,10 @@ def __init__(self, robot_id: str, config: ConnectorConfig, **kwargs) -> None: Args: robot_id (str): The ID of the InOrbit robot - config (ConnectorConfig): The connector configuration. - - New API: pass `ConnectorConfig` with a `fleet` field containing - multiple robot configurations. The one for the current robot will be - selected by the `robot_id` parameter. - - Deprecated: pass `InorbitConnectorConfig` (single-robot); it will be - converted to a `ConnectorConfig`. + config (ConnectorConfig): The connector configuration. Pass a + `ConnectorConfig` with a `fleet` field containing robot + configurations. The one for the current robot will be selected + by the `robot_id` parameter. Keyword Args: register_user_scripts (bool): Register user scripts automatically. @@ -944,19 +940,7 @@ def __init__(self, robot_id: str, config: ConnectorConfig, **kwargs) -> None: Relevant only if register_user_scripts is True. """ self.robot_id = robot_id - - if isinstance(config, InorbitConnectorConfig): - # Deprecated behavior - warnings.warn( - "Passing InorbitConnectorConfig to Connector.__init__ is deprecated; " - "pass ConnectorConfig instead.", - DeprecationWarning, - stacklevel=2, - ) - fleet_config = config.to_fleet_config(robot_id) - else: - fleet_config = config.to_singular_config(robot_id) - + fleet_config = config.to_singular_config(robot_id) super().__init__(fleet_config, **kwargs) def _get_session(self) -> RobotSession: diff --git a/inorbit_connector/models.py b/inorbit_connector/models.py index db01fa1..db74924 100644 --- a/inorbit_connector/models.py +++ b/inorbit_connector/models.py @@ -304,7 +304,7 @@ def warn_log_level_deprecated(self) -> "ConnectorConfig": """Warn if log_level is set as it is deprecated. Returns: - InorbitConnectorConfig: The model instance + ConnectorConfig: The model instance """ if self.log_level is not None: warnings.warn( @@ -403,38 +403,3 @@ def check_positive(cls, update_freq: float | None) -> float | None: if update_freq <= 0: raise ValueError("Must be positive and non-zero") return update_freq - - -class InorbitConnectorConfig(ConnectorConfig, RobotConfig): - """Class representing an Inorbit connector model for a single robot. - - This class is deprecated. Use ConnectorConfig instead. - """ - - # Exclude robot_id from single-robot configs - it will be provided when converting - # to fleet - robot_id: str | None = Field(default=None, exclude=True) - fleet: list[RobotConfig] = Field(default_factory=list, exclude=True) - - def to_fleet_config(self, robot_id: str) -> ConnectorConfig: - """Convert a single-robot config to a fleet config. - - Creates a ConnectorConfig with a fleet list containing this robot's - configuration. - - Args: - robot_id: The robot ID to use for the fleet config (ensures consistency) - """ - # Get the full config dump, excluding fleet and robot-specific fields - connector_data = self.model_dump(exclude={"fleet", "robot_id", "cameras"}) - - # Create RobotConfig instance from this config's robot-specific fields - robot_config = RobotConfig( - robot_id=robot_id, - cameras=self.cameras, - ) - - return ConnectorConfig( - **connector_data, - fleet=[robot_config], - ) diff --git a/tests/test_connector.py b/tests/test_connector.py index 18fc353..97f1666 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -24,7 +24,6 @@ ) from inorbit_connector.models import ( ConnectorConfig, - InorbitConnectorConfig, RobotConfig, ) @@ -783,10 +782,7 @@ class SubConnector(Connector): with pytest.raises(TypeError): SubConnector("TestRobot", MagicMock()) - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_can_be_subclassed_with_all_abstract_methods_implemented(self): - """Test subclassing with deprecated InorbitConnectorConfig.""" - class SubConnector(Connector): async def _connect(self): pass @@ -802,15 +798,16 @@ async def _inorbit_command_handler(self, command_name, args, options): connector = SubConnector( "TestRobot", - InorbitConnectorConfig( + ConnectorConfig( api_key="valid_key", api_url="https://valid.com/", connector_type="valid_connector", connector_config=DummyConfig(), + fleet=[RobotConfig(robot_id="TestRobot")], ), ) assert isinstance(connector, Connector) - assert isinstance(connector, FleetConnector) # Connector is a FleetConnector + assert isinstance(connector, FleetConnector) class TestConnector: @@ -830,25 +827,12 @@ def make_connector_not_abstract(self): @pytest.fixture def base_connector(self, base_model): - return Connector("TestRobot", InorbitConnectorConfig(**base_model)) + return Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_init(self, base_model): - """Test initialization with deprecated InorbitConnectorConfig.""" - config = InorbitConnectorConfig(**base_model) - robot_id = "TestRobot" - - connector = Connector(robot_id, config) - assert connector.robot_id == robot_id - assert connector.robot_ids == [robot_id] # Single robot wrapped in list - # Config is converted from InorbitConnectorConfig to ConnectorConfig - assert isinstance(connector.config, ConnectorConfig) - assert len(connector.config.fleet) == 1 - assert connector.config.fleet[0].robot_id == robot_id - assert connector._logger.name == Connector.__module__ - - def test_init_with_connector_config(self, base_model): - """Test initialization with new ConnectorConfig API.""" robot_id = "TestRobot" config = ConnectorConfig( **base_model, @@ -861,31 +845,27 @@ def test_init_with_connector_config(self, base_model): assert isinstance(connector.config, ConnectorConfig) assert len(connector.config.fleet) == 1 assert connector.config.fleet[0].robot_id == robot_id + assert connector._logger.name == Connector.__module__ - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_init_with_robot_key(self, base_model, mock_robot_session_pool): - """Test initialization with robot key using deprecated InorbitConnectorConfig.""" - config = InorbitConnectorConfig( - **base_model, inorbit_robot_key="valid_robot_key" + config = ConnectorConfig( + **base_model, + inorbit_robot_key="valid_robot_key", + fleet=[RobotConfig(robot_id="TestRobot")], ) robot_id = "TestRobot" connector = Connector(robot_id, config) - # Session is created on-demand, check when accessed session = connector._get_session() assert session.robot_id == "TestRobot" - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_get_session(self, base_connector, mock_robot_session_pool): - """Test that _get_session returns the session for the single robot.""" session = base_connector._get_session() assert session is not None assert session.robot_id == "TestRobot" mock_robot_session_pool.get_session.assert_called() - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_publish_map(self, base_model, mock_robot_session_pool): - """Test publish_map delegates to FleetConnector.publish_robot_map (deprecated config).""" base_model["maps"] = { "frameA": { "file": f"{os.path.dirname(__file__)}/dir/test_map.png", @@ -896,16 +876,17 @@ def test_publish_map(self, base_model, mock_robot_session_pool): "resolution": 0.1, } } - connector = Connector("TestRobot", InorbitConnectorConfig(**base_model)) + connector = Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) connector.publish_map("frameA") session = connector._get_session() session.publish_map.assert_called_once() - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_publish_pose(self, base_model, mock_robot_session_pool): - """Test publish_pose delegates to FleetConnector.publish_robot_pose (deprecated config).""" base_model["maps"] = { "frameA": { "file": f"{os.path.dirname(__file__)}/dir/test_map.png", @@ -916,16 +897,17 @@ def test_publish_pose(self, base_model, mock_robot_session_pool): "resolution": 0.1, } } - connector = Connector("TestRobot", InorbitConnectorConfig(**base_model)) + connector = Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) connector.publish_pose(1.0, 2.0, 3.14, "frameA") session = connector._get_session() session.publish_pose.assert_called() - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_publish_pose_updates_maps(self, base_model, mock_robot_session_pool): - """Test that publishing pose with new frame_id updates the map (deprecated config).""" base_model["maps"] = { "frameA": { "file": f"{os.path.dirname(__file__)}/dir/test_map.png", @@ -943,7 +925,10 @@ def test_publish_pose_updates_maps(self, base_model, mock_robot_session_pool): "resolution": 0.1, }, } - connector = Connector("TestRobot", InorbitConnectorConfig(**base_model)) + connector = Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) session = connector._get_session() @@ -961,52 +946,38 @@ def test_publish_pose_updates_maps(self, base_model, mock_robot_session_pool): assert session.publish_map.call_count == 2 # Now 2 assert session.publish_pose.call_count == 3 # Now 3 - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_publish_odometry(self, base_connector, mock_robot_session_pool): - """Test publish_odometry delegates correctly.""" base_connector.publish_odometry(linear_speed=1.0, angular_speed=0.5) session = base_connector._get_session() session.publish_odometry.assert_called() - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_publish_key_values(self, base_connector, mock_robot_session_pool): - """Test publish_key_values delegates correctly.""" base_connector.publish_key_values(key1="value1") session = base_connector._get_session() session.publish_key_values.assert_called() - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_publish_system_stats_stores_stats( self, base_connector, mock_robot_session_pool ): - """Test publish_system_stats stores stats for deferred publishing.""" base_connector.publish_system_stats(cpu_load_percentage=0.5) - # Stats should be stored, not published immediately session = base_connector._get_session() session.publish_system_stats.assert_not_called() - # Verify stats were stored pending_stats = base_connector._FleetConnector__pending_system_stats assert base_connector.robot_id in pending_stats assert pending_stats[base_connector.robot_id]["cpu_load_percentage"] == 0.5 - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_is_robot_online_default_implementation(self, base_connector): - """Test that _is_robot_online returns True by default.""" assert base_connector._is_robot_online() is True - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_is_fleet_robot_online_delegates_to_is_robot_online(self, base_connector): - """Test that _is_fleet_robot_online delegates to _is_robot_online.""" assert base_connector._is_fleet_robot_online("TestRobot") is True @pytest.mark.asyncio - @pytest.mark.filterwarnings("ignore::DeprecationWarning") async def test_inorbit_robot_command_handler_delegates(self, base_connector): - """Test that _inorbit_robot_command_handler delegates to _inorbit_command_handler.""" base_connector._inorbit_command_handler = AsyncMock() await base_connector._inorbit_robot_command_handler( @@ -1018,40 +989,37 @@ async def test_inorbit_robot_command_handler_delegates(self, base_connector): ) @pytest.mark.asyncio - @pytest.mark.filterwarnings("ignore::DeprecationWarning") async def test_register_user_scripts( self, base_model, tmp_path, mock_robot_session_pool ): - """Test user scripts registration for single robot connector (deprecated config).""" - # Create a connector with user scripts enabled connector = Connector( "TestRobot", - InorbitConnectorConfig(**base_model), + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), register_user_scripts=True, default_user_scripts_dir=tmp_path, ) connector._connect = AsyncMock() - # Initialize sessions (this is what happens during start/connect) await connector._FleetConnector__connect() - # Verify register_commands_path was called session = connector._get_session() session.register_commands_path.assert_called_once() - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_uses_env_vars(self, base_model): - """Test environment variables with deprecated config.""" base_model["env_vars"] = {"ENV_VAR": "env_value"} - Connector("TestRobot", InorbitConnectorConfig(**base_model)) + Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) assert "ENV_VAR" in os.environ assert os.environ["ENV_VAR"] == "env_value" @pytest.mark.asyncio - @pytest.mark.filterwarnings("ignore::DeprecationWarning") async def test_start_stop_integration(self, base_model): - """Integration test for start/stop functionality (deprecated config).""" - connector = Connector("TestRobot", InorbitConnectorConfig(**base_model)) + connector = Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) connector._execution_loop = AsyncMock() connector._connect = AsyncMock() connector._disconnect = AsyncMock() @@ -1064,16 +1032,12 @@ async def test_start_stop_integration(self, base_model): assert not connector._FleetConnector__loop.is_running() @pytest.mark.asyncio - @pytest.mark.filterwarnings("ignore::DeprecationWarning") async def test_fetch_map_default_returns_none(self, base_connector): - """Test that default fetch_map returns None.""" result = await base_connector.fetch_map("frame1") assert result is None @pytest.mark.asyncio - @pytest.mark.filterwarnings("ignore::DeprecationWarning") async def test_fetch_robot_map_delegates_to_fetch_map(self, base_connector): - """Test that fetch_robot_map delegates to fetch_map.""" from inorbit_connector.models import MapConfigTemp mock_map = MapConfigTemp( @@ -1087,7 +1051,6 @@ async def test_fetch_robot_map_delegates_to_fetch_map(self, base_connector): result = await base_connector.fetch_robot_map("TestRobot", "frame1") - # Verify fetch_map was called with just frame_id base_connector.fetch_map.assert_awaited_once_with("frame1") assert result == mock_map @@ -1109,70 +1072,67 @@ def make_connector_not_abstract(self): @pytest.fixture def base_connector(self, base_model): - return Connector("TestRobot", InorbitConnectorConfig(**base_model)) + return Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) @pytest.mark.asyncio - @pytest.mark.filterwarnings("ignore::DeprecationWarning") async def test_register_command_handler_by_default( self, base_model, mock_robot_session_pool ): - """Test that command handler is registered by default (deprecated config).""" - connector = Connector("TestRobot", InorbitConnectorConfig(**base_model)) + connector = Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) connector._connect = AsyncMock() - # Initialize sessions (this triggers command handler registration) await connector._FleetConnector__connect() - # Verify register_command_callback was called session = connector._get_session() session.register_command_callback.assert_called_once() @pytest.mark.asyncio - @pytest.mark.filterwarnings("ignore::DeprecationWarning") async def test_does_not_register_when_disabled( self, base_model, mock_robot_session_pool ): - """Test that command handler is not registered when disabled (deprecated config).""" connector = Connector( "TestRobot", - InorbitConnectorConfig(**base_model), + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), register_custom_command_handler=False, ) connector._connect = AsyncMock() - # Initialize sessions await connector._FleetConnector__connect() - # Verify register_command_callback was NOT called session = connector._get_session() session.register_command_callback.assert_not_called() @pytest.mark.asyncio - @pytest.mark.filterwarnings("ignore::DeprecationWarning") async def test_sets_online_status_callback( self, base_model, mock_robot_session_pool ): - """Test that online status callback is set on EdgeSDK (deprecated config).""" - connector = Connector("TestRobot", InorbitConnectorConfig(**base_model)) + connector = Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) connector._connect = AsyncMock() - # Initialize sessions await connector._FleetConnector__connect() - # Verify callback was set session = connector._get_session() session.set_online_status_callback.assert_called_once() - # Verify the callback calls _is_robot_online callback = session.set_online_status_callback.call_args[0][0] - assert callback() is True # Should return True by default + assert callback() is True - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_handle_command_exception_with_command_failure( self, base_model, mock_robot_session_pool ): - """Test that CommandFailure exceptions are properly handled and passed to result_function.""" - connector = Connector("TestRobot", InorbitConnectorConfig(**base_model)) + connector = Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) result_function = MagicMock() command_failure = CommandFailure( @@ -1194,12 +1154,13 @@ def test_handle_command_exception_with_command_failure( stderr="Error details here", ) - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_handle_command_exception_with_generic_exception( self, base_model, mock_robot_session_pool ): - """Test that generic exceptions are handled and passed to result_function with generic message.""" - connector = Connector("TestRobot", InorbitConnectorConfig(**base_model)) + connector = Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) result_function = MagicMock() error = ValueError("Something went wrong") @@ -1218,12 +1179,13 @@ def test_handle_command_exception_with_generic_exception( stderr="Something went wrong", ) - @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_handle_command_exception_without_message( self, base_model, mock_robot_session_pool ): - """Test that exceptions without a message use the class name as stderr.""" - connector = Connector("TestRobot", InorbitConnectorConfig(**base_model)) + connector = Connector( + "TestRobot", + ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), + ) result_function = MagicMock() class CustomException(Exception): diff --git a/tests/test_models.py b/tests/test_models.py index 872f53a..854b461 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -20,7 +20,6 @@ # InOrbit from inorbit_connector.models import ( - InorbitConnectorConfig, ConnectorConfig, MapConfig, MapConfigBase, @@ -41,286 +40,6 @@ class InvalidDummyConfig(IndexError): pass -class TestInorbitConnectorConfig: - - @pytest.fixture - def base_model(self): - return { - "api_key": "valid_key", - "api_url": "https://valid.com/", - "connector_type": "valid_connector", - "connector_config": DummyConfig(), - "update_freq": 2.0, - "location_tz": "Asia/Kolkata", - "logging": LoggingConfig(defaults={"log_file": "./test.log"}), - } - - def test_with_valid_input(self, base_model): - model = InorbitConnectorConfig(**base_model) - assert model.api_key == base_model["api_key"] - assert str(model.api_url) == base_model["api_url"] - assert model.connector_type == base_model["connector_type"] - assert isinstance(model.connector_config, DummyConfig) - assert model.update_freq == base_model["update_freq"] - assert model.location_tz == base_model["location_tz"] - assert model.cameras == [] - assert model.user_scripts_dir is None - assert model.logging == base_model["logging"] - assert model.account_id is None - assert model.inorbit_robot_key is None - assert model.maps == {} - assert model.env_vars == {} - - def test_with_valid_input_and_user_scripts_dir(self, base_model): - model = InorbitConnectorConfig(**base_model, user_scripts_dir=".") - assert str(model.user_scripts_dir) == "." - - def test_with_valid_input_and_cameras(self, base_model): - model = InorbitConnectorConfig( - **base_model, cameras=[CameraConfig(video_url="https://test.com/")] - ) - assert len(model.cameras) == 1 - assert str(model.cameras[0].video_url) == "https://test.com/" - - def test_with_valid_input_and_maps(self, base_model): - model = InorbitConnectorConfig( - **base_model, - maps={ - "frameA": { - "file": f"{os.path.dirname(__file__)}/dir/test_map.png", - "map_id": "valid_map_id", - "origin_x": 0.0, - "origin_y": 0.0, - "resolution": 0.1, - } - }, - ) - assert len(model.maps.keys()) == 1 - - def test_format_version_invalid_value(self, base_model): - with pytest.raises(ValidationError, match="format_version must be 1 or 2"): - InorbitConnectorConfig( - **base_model, - maps={ - "frameA": { - "file": f"{os.path.dirname(__file__)}/dir/test_map.png", - "map_id": "valid_map_id", - "origin_x": 0.0, - "origin_y": 0.0, - "resolution": 0.1, - "format_version": 3, - } - }, - ) - - def test_format_version_defaults_to_2(self, base_model): - model = InorbitConnectorConfig( - **base_model, - maps={ - "frameA": { - "file": f"{os.path.dirname(__file__)}/dir/test_map.png", - "map_id": "valid_map_id", - "origin_x": 0.0, - "origin_y": 0.0, - "resolution": 0.1, - # format_version omitted - } - }, - ) - assert model.maps["frameA"].format_version == 2 - - def test_format_version_accepts_1(self, base_model): - model = InorbitConnectorConfig( - **base_model, - maps={ - "frameA": { - "file": f"{os.path.dirname(__file__)}/dir/test_map.png", - "map_id": "valid_map_id", - "origin_x": 0.0, - "origin_y": 0.0, - "resolution": 0.1, - "format_version": 1, - } - }, - ) - assert model.maps["frameA"].format_version == 1 - - def test_with_valid_input_and_env_vars(self, base_model): - model = InorbitConnectorConfig( - **base_model, - env_vars={"ENV_VAR": "env_value"}, - ) - assert model.env_vars == {"ENV_VAR": "env_value"} - - def test_with_valid_input_and_logging_config(self, base_model): - logging_config = LoggingConfig( - log_level=LogLevels.INFO, defaults={"log_file": "./test.log"} - ) - base_model = base_model.copy() - base_model.pop("logging", None) - model = InorbitConnectorConfig( - **base_model, - logging=logging_config, - ) - assert model.logging.log_level == LogLevels.INFO - assert model.logging.defaults == {"log_file": "./test.log"} - - def test_with_valid_input_and_account_id(self, base_model): - model = InorbitConnectorConfig( - **base_model, - account_id="valid_account_id", - ) - assert model.account_id == "valid_account_id" - - def test_with_valid_input_and_robot_key(self, base_model): - model = InorbitConnectorConfig( - **base_model, - inorbit_robot_key="valid_robot_key", - ) - assert model.inorbit_robot_key == "valid_robot_key" - - def test_invalid_api_key(self, base_model): - init_input = base_model.copy() - init_input["api_key"] = "key with spaces" - with pytest.raises(ValidationError, match="Whitespaces are not allowed"): - InorbitConnectorConfig(**init_input) - - def test_invalid_account_id(self, base_model): - init_input = base_model.copy() - init_input["account_id"] = "account id with spaces" - with pytest.raises(ValidationError, match="Whitespaces are not allowed"): - InorbitConnectorConfig(**init_input) - - def test_invalid_connector_config(self, base_model): - init_input = { - "connector_type": "valid_connector", - "connector_config": InvalidDummyConfig(), - } - - error = re.escape( - "1 validation error for InorbitConnectorConfig\nconnector_config\n " - "Input should be a valid dictionary or instance of BaseModel " - "[type=model_type, input_value=InvalidDummyConfig(), " - "input_type=InvalidDummyConfig]\n For further information visit " - ) - with pytest.raises(ValidationError, match=error): - InorbitConnectorConfig(**init_input) - - def test_invalid_location_tz(self, base_model): - init_input = base_model.copy() - init_input["location_tz"] = "invalid_tz" - with pytest.raises(ValidationError, match="Timezone must exist in pytz"): - InorbitConnectorConfig(**init_input) - - def test_invalid_update_freq(self, base_model): - init_input = base_model.copy() - init_input["update_freq"] = -2.0 - with pytest.raises(ValidationError, match="Must be positive and non-zero"): - InorbitConnectorConfig(**init_input) - - init_input["update_freq"] = 0.0 - with pytest.raises(ValidationError, match="Must be positive and non-zero"): - InorbitConnectorConfig(**init_input) - - def test_invalid_log_level(self, base_model): - init_input = base_model.copy() - init_input["logging"] = {"log_level": "BAD"} - error = r"Input should be 'DEBUG', 'INFO', 'WARNING', 'ERROR' or 'CRITICAL'" - with pytest.raises(ValidationError, match=error): - InorbitConnectorConfig(**init_input) - - def test_invalid_user_scripts_dir(self, base_model): - init_input = base_model.copy() - init_input["user_scripts_dir"] = "/does/not/exist" - - error = ( - "1 validation error for InorbitConnectorConfig\nuser_scripts_dir\n " - "Path does not point to a directory [type=path_not_directory, input_value=" - "'/does/not/exist', input_type=str]" - ) - with pytest.raises(ValidationError, match=re.escape(error)): - InorbitConnectorConfig(**init_input) - - def test_invalid_maps(self, base_model): - init_input = base_model.copy() - init_input["maps"] = { - "frameA": { - "file": f"{os.path.dirname(__file__)}/dir/invalid_map.png", - "map_id": "valid_map_id", - "origin_x": 0.0, - "origin_y": 0.0, - "resolution": 0.1, - } - } - with pytest.raises(ValidationError, match="Path does not point to a file"): - InorbitConnectorConfig(**init_input) - init_input = base_model.copy() - init_input["maps"] = { - "frameA": { - "file": f"{os.path.dirname(__file__)}/dir/not_a_map.txt", - "map_id": "valid_map_id", - "origin_x": 0.0, - "origin_y": 0.0, - "resolution": 0.1, - } - } - with pytest.raises(ValidationError, match="The map file must be a PNG file"): - InorbitConnectorConfig(**init_input) - - @mock.patch.dict(os.environ, {"INORBIT_API_KEY": "env_valid_key"}) - def test_reads_api_key_from_environment_variable(self, base_model): - # Re-import after Mock - importlib.reload(sys.modules["inorbit_connector.models"]) - from inorbit_connector.models import InorbitConnectorConfig - - init_input = { - "api_url": "https://valid.video_url/", - "connector_type": "valid_connector", - "connector_config": DummyConfig(), - } - model = InorbitConnectorConfig(**init_input) - assert model.api_key == "env_valid_key" - - @mock.patch.dict(os.environ, {"INORBIT_API_URL": "https://valid.env/"}) - def test_reads_api_url_from_environment_variable(self, base_model): - # Re-import after Mock - importlib.reload(sys.modules["inorbit_connector.models"]) - from inorbit_connector.models import InorbitConnectorConfig - - init_input = { - "connector_type": "valid_connector", - "connector_config": DummyConfig(), - } - model = InorbitConnectorConfig(**init_input) - assert str(model.api_url) == "https://valid.env/" - - @mock.patch.dict(os.environ, {}, clear=True) - def test_reads_api_url_from_environment_variable_default(self, base_model): - # Re-import after Mock - importlib.reload(sys.modules["inorbit_connector.models"]) - from inorbit_connector.models import InorbitConnectorConfig - - init_input = { - "connector_type": "valid_connector", - "connector_config": DummyConfig(), - } - model = InorbitConnectorConfig(**init_input) - assert str(model.api_url) == INORBIT_CLOUD_SDK_ROBOT_CONFIG_URL - - @mock.patch.dict(os.environ, {}, clear=True) - def test_missing_api_key_environment_variable(self, base_model): - # Re-import after Mock - importlib.reload(sys.modules["inorbit_connector.models"]) - from inorbit_connector.models import InorbitConnectorConfig - - init_input = { - "connector_type": "valid_connector", - "connector_config": DummyConfig(), - } - model = InorbitConnectorConfig(**init_input) - assert model.api_key is None - - class TestRobotConfig: def test_with_valid_input(self): robot_config = RobotConfig(robot_id="test_robot") @@ -437,33 +156,58 @@ def test_use_websockets_preserved_in_to_singular_config(self, base_model): singular = model.to_singular_config("robot1") assert singular.use_websockets is True + @mock.patch.dict(os.environ, {"INORBIT_API_KEY": "env_valid_key"}) + def test_reads_api_key_from_environment_variable(self, base_model): + importlib.reload(sys.modules["inorbit_connector.models"]) + from inorbit_connector.models import ConnectorConfig as ReloadedConfig -class TestInorbitConnectorConfigToFleetConfig: - @pytest.fixture - def base_model(self): - return { - "api_key": "valid_key", - "api_url": "https://valid.com/", + init_input = { + "api_url": "https://valid.video_url/", "connector_type": "valid_connector", "connector_config": DummyConfig(), + "fleet": [{"robot_id": "robot1"}], } + model = ReloadedConfig(**init_input) + assert model.api_key == "env_valid_key" - def test_to_fleet_config(self, base_model): - model = InorbitConnectorConfig(**base_model) - fleet_config = model.to_fleet_config("test_robot") - assert type(fleet_config).__name__ == "ConnectorConfig" - assert len(fleet_config.fleet) == 1 - assert fleet_config.fleet[0].robot_id == "test_robot" - assert fleet_config.connector_type == model.connector_type - assert fleet_config.api_key == model.api_key - - def test_to_fleet_config_with_cameras(self, base_model): - model = InorbitConnectorConfig( - **base_model, cameras=[CameraConfig(video_url="https://test.com/")] - ) - fleet_config = model.to_fleet_config("test_robot") - assert len(fleet_config.fleet[0].cameras) == 1 - assert str(fleet_config.fleet[0].cameras[0].video_url) == "https://test.com/" + @mock.patch.dict(os.environ, {"INORBIT_API_URL": "https://valid.env/"}) + def test_reads_api_url_from_environment_variable(self, base_model): + importlib.reload(sys.modules["inorbit_connector.models"]) + from inorbit_connector.models import ConnectorConfig as ReloadedConfig + + init_input = { + "connector_type": "valid_connector", + "connector_config": DummyConfig(), + "fleet": [{"robot_id": "robot1"}], + } + model = ReloadedConfig(**init_input) + assert str(model.api_url) == "https://valid.env/" + + @mock.patch.dict(os.environ, {}, clear=True) + def test_reads_api_url_from_environment_variable_default(self, base_model): + importlib.reload(sys.modules["inorbit_connector.models"]) + from inorbit_connector.models import ConnectorConfig as ReloadedConfig + + init_input = { + "connector_type": "valid_connector", + "connector_config": DummyConfig(), + "fleet": [{"robot_id": "robot1"}], + } + model = ReloadedConfig(**init_input) + assert str(model.api_url) == INORBIT_CLOUD_SDK_ROBOT_CONFIG_URL + + @mock.patch.dict(os.environ, {}, clear=True) + def test_missing_api_key_environment_variable(self, base_model): + importlib.reload(sys.modules["inorbit_connector.models"]) + from inorbit_connector.models import ConnectorConfig as ReloadedConfig + + init_input = { + "connector_type": "valid_connector", + "connector_config": DummyConfig(), + "fleet": [{"robot_id": "robot1"}], + } + model = ReloadedConfig(**init_input) + assert model.api_key is None class TestMapConfigBase: From 0af406aaae5787c2239477616e89f22c595f6112 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Badenes?= Date: Tue, 19 May 2026 16:37:37 -0300 Subject: [PATCH 2/2] Restore docstrings and comments in test_connector.py Co-Authored-By: Claude Opus 4.6 --- tests/test_connector.py | 42 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/tests/test_connector.py b/tests/test_connector.py index 97f1666..74e2b97 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -783,6 +783,8 @@ class SubConnector(Connector): SubConnector("TestRobot", MagicMock()) def test_can_be_subclassed_with_all_abstract_methods_implemented(self): + """Test subclassing with all abstract methods implemented.""" + class SubConnector(Connector): async def _connect(self): pass @@ -807,7 +809,7 @@ async def _inorbit_command_handler(self, command_name, args, options): ), ) assert isinstance(connector, Connector) - assert isinstance(connector, FleetConnector) + assert isinstance(connector, FleetConnector) # Connector is a FleetConnector class TestConnector: @@ -833,6 +835,7 @@ def base_connector(self, base_model): ) def test_init(self, base_model): + """Test Connector initialization.""" robot_id = "TestRobot" config = ConnectorConfig( **base_model, @@ -848,6 +851,7 @@ def test_init(self, base_model): assert connector._logger.name == Connector.__module__ def test_init_with_robot_key(self, base_model, mock_robot_session_pool): + """Test initialization with robot key.""" config = ConnectorConfig( **base_model, inorbit_robot_key="valid_robot_key", @@ -860,12 +864,14 @@ def test_init_with_robot_key(self, base_model, mock_robot_session_pool): assert session.robot_id == "TestRobot" def test_get_session(self, base_connector, mock_robot_session_pool): + """Test that _get_session returns the session for the single robot.""" session = base_connector._get_session() assert session is not None assert session.robot_id == "TestRobot" mock_robot_session_pool.get_session.assert_called() def test_publish_map(self, base_model, mock_robot_session_pool): + """Test publish_map delegates to FleetConnector.publish_robot_map.""" base_model["maps"] = { "frameA": { "file": f"{os.path.dirname(__file__)}/dir/test_map.png", @@ -887,6 +893,7 @@ def test_publish_map(self, base_model, mock_robot_session_pool): session.publish_map.assert_called_once() def test_publish_pose(self, base_model, mock_robot_session_pool): + """Test publish_pose delegates to FleetConnector.publish_robot_pose.""" base_model["maps"] = { "frameA": { "file": f"{os.path.dirname(__file__)}/dir/test_map.png", @@ -908,6 +915,7 @@ def test_publish_pose(self, base_model, mock_robot_session_pool): session.publish_pose.assert_called() def test_publish_pose_updates_maps(self, base_model, mock_robot_session_pool): + """Test that publishing pose with new frame_id updates the map.""" base_model["maps"] = { "frameA": { "file": f"{os.path.dirname(__file__)}/dir/test_map.png", @@ -947,12 +955,14 @@ def test_publish_pose_updates_maps(self, base_model, mock_robot_session_pool): assert session.publish_pose.call_count == 3 # Now 3 def test_publish_odometry(self, base_connector, mock_robot_session_pool): + """Test publish_odometry delegates correctly.""" base_connector.publish_odometry(linear_speed=1.0, angular_speed=0.5) session = base_connector._get_session() session.publish_odometry.assert_called() def test_publish_key_values(self, base_connector, mock_robot_session_pool): + """Test publish_key_values delegates correctly.""" base_connector.publish_key_values(key1="value1") session = base_connector._get_session() @@ -961,23 +971,29 @@ def test_publish_key_values(self, base_connector, mock_robot_session_pool): def test_publish_system_stats_stores_stats( self, base_connector, mock_robot_session_pool ): + """Test publish_system_stats stores stats for deferred publishing.""" base_connector.publish_system_stats(cpu_load_percentage=0.5) + # Stats should be stored, not published immediately session = base_connector._get_session() session.publish_system_stats.assert_not_called() + # Verify stats were stored pending_stats = base_connector._FleetConnector__pending_system_stats assert base_connector.robot_id in pending_stats assert pending_stats[base_connector.robot_id]["cpu_load_percentage"] == 0.5 def test_is_robot_online_default_implementation(self, base_connector): + """Test that _is_robot_online returns True by default.""" assert base_connector._is_robot_online() is True def test_is_fleet_robot_online_delegates_to_is_robot_online(self, base_connector): + """Test that _is_fleet_robot_online delegates to _is_robot_online.""" assert base_connector._is_fleet_robot_online("TestRobot") is True @pytest.mark.asyncio async def test_inorbit_robot_command_handler_delegates(self, base_connector): + """Test that _inorbit_robot_command_handler delegates to _inorbit_command_handler.""" base_connector._inorbit_command_handler = AsyncMock() await base_connector._inorbit_robot_command_handler( @@ -992,6 +1008,8 @@ async def test_inorbit_robot_command_handler_delegates(self, base_connector): async def test_register_user_scripts( self, base_model, tmp_path, mock_robot_session_pool ): + """Test user scripts registration for single robot connector.""" + # Create a connector with user scripts enabled connector = Connector( "TestRobot", ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), @@ -1000,12 +1018,15 @@ async def test_register_user_scripts( ) connector._connect = AsyncMock() + # Initialize sessions (this is what happens during start/connect) await connector._FleetConnector__connect() + # Verify register_commands_path was called session = connector._get_session() session.register_commands_path.assert_called_once() def test_uses_env_vars(self, base_model): + """Test environment variables are set from config.""" base_model["env_vars"] = {"ENV_VAR": "env_value"} Connector( "TestRobot", @@ -1016,6 +1037,7 @@ def test_uses_env_vars(self, base_model): @pytest.mark.asyncio async def test_start_stop_integration(self, base_model): + """Integration test for start/stop functionality.""" connector = Connector( "TestRobot", ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), @@ -1033,11 +1055,13 @@ async def test_start_stop_integration(self, base_model): @pytest.mark.asyncio async def test_fetch_map_default_returns_none(self, base_connector): + """Test that default fetch_map returns None.""" result = await base_connector.fetch_map("frame1") assert result is None @pytest.mark.asyncio async def test_fetch_robot_map_delegates_to_fetch_map(self, base_connector): + """Test that fetch_robot_map delegates to fetch_map.""" from inorbit_connector.models import MapConfigTemp mock_map = MapConfigTemp( @@ -1051,6 +1075,7 @@ async def test_fetch_robot_map_delegates_to_fetch_map(self, base_connector): result = await base_connector.fetch_robot_map("TestRobot", "frame1") + # Verify fetch_map was called with just frame_id base_connector.fetch_map.assert_awaited_once_with("frame1") assert result == mock_map @@ -1081,14 +1106,17 @@ def base_connector(self, base_model): async def test_register_command_handler_by_default( self, base_model, mock_robot_session_pool ): + """Test that command handler is registered by default.""" connector = Connector( "TestRobot", ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), ) connector._connect = AsyncMock() + # Initialize sessions (this triggers command handler registration) await connector._FleetConnector__connect() + # Verify register_command_callback was called session = connector._get_session() session.register_command_callback.assert_called_once() @@ -1096,6 +1124,7 @@ async def test_register_command_handler_by_default( async def test_does_not_register_when_disabled( self, base_model, mock_robot_session_pool ): + """Test that command handler is not registered when disabled.""" connector = Connector( "TestRobot", ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), @@ -1103,8 +1132,10 @@ async def test_does_not_register_when_disabled( ) connector._connect = AsyncMock() + # Initialize sessions await connector._FleetConnector__connect() + # Verify register_command_callback was NOT called session = connector._get_session() session.register_command_callback.assert_not_called() @@ -1112,23 +1143,28 @@ async def test_does_not_register_when_disabled( async def test_sets_online_status_callback( self, base_model, mock_robot_session_pool ): + """Test that online status callback is set on EdgeSDK.""" connector = Connector( "TestRobot", ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), ) connector._connect = AsyncMock() + # Initialize sessions await connector._FleetConnector__connect() + # Verify callback was set session = connector._get_session() session.set_online_status_callback.assert_called_once() + # Verify the callback calls _is_robot_online callback = session.set_online_status_callback.call_args[0][0] - assert callback() is True + assert callback() is True # Should return True by default def test_handle_command_exception_with_command_failure( self, base_model, mock_robot_session_pool ): + """Test that CommandFailure exceptions are properly handled and passed to result_function.""" connector = Connector( "TestRobot", ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), @@ -1157,6 +1193,7 @@ def test_handle_command_exception_with_command_failure( def test_handle_command_exception_with_generic_exception( self, base_model, mock_robot_session_pool ): + """Test that generic exceptions are handled and passed to result_function with generic message.""" connector = Connector( "TestRobot", ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]), @@ -1182,6 +1219,7 @@ def test_handle_command_exception_with_generic_exception( def test_handle_command_exception_without_message( self, base_model, mock_robot_session_pool ): + """Test that exceptions without a message use the class name as stderr.""" connector = Connector( "TestRobot", ConnectorConfig(**base_model, fleet=[RobotConfig(robot_id="TestRobot")]),