From 2d4de5d848432ce9dbab83567e5d18a2cadd3cf3 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Wed, 22 Jan 2025 14:55:29 +0100 Subject: [PATCH] fix Signed-off-by: sven1977 --- rllib/connectors/tests/test_action.py | 113 --- rllib/connectors/tests/test_agent.py | 646 ------------------ rllib/connectors/tests/test_connector.py | 100 --- .../models/tests/test_action_distributions.py | 12 +- 4 files changed, 2 insertions(+), 869 deletions(-) delete mode 100644 rllib/connectors/tests/test_action.py delete mode 100644 rllib/connectors/tests/test_agent.py delete mode 100644 rllib/connectors/tests/test_connector.py diff --git a/rllib/connectors/tests/test_action.py b/rllib/connectors/tests/test_action.py deleted file mode 100644 index ff6b8c47f513..000000000000 --- a/rllib/connectors/tests/test_action.py +++ /dev/null @@ -1,113 +0,0 @@ -# @OldAPIStack - -import unittest - -import gymnasium as gym -import numpy as np - -from ray.rllib.connectors.action.clip import ClipActionsConnector -from ray.rllib.connectors.action.immutable import ImmutableActionsConnector -from ray.rllib.connectors.action.lambdas import ConvertToNumpyConnector -from ray.rllib.connectors.action.normalize import NormalizeActionsConnector -from ray.rllib.connectors.action.pipeline import ActionConnectorPipeline -from ray.rllib.connectors.connector import ConnectorContext -from ray.rllib.connectors.registry import get_connector -from ray.rllib.utils.framework import try_import_torch -from ray.rllib.utils.typing import ActionConnectorDataType - -torch, _ = try_import_torch() - - -class TestActionConnector(unittest.TestCase): - def test_connector_pipeline(self): - ctx = ConnectorContext() - connectors = [ConvertToNumpyConnector(ctx)] - pipeline = ActionConnectorPipeline(ctx, connectors) - name, params = pipeline.to_state() - restored = get_connector(name, ctx, params) - self.assertTrue(isinstance(restored, ActionConnectorPipeline)) - self.assertTrue(isinstance(restored.connectors[0], ConvertToNumpyConnector)) - # There should not be any timer yet - self.assertFalse(bool(pipeline.timers.values())) - pipeline(ActionConnectorDataType(0, 0, {}, ([1], [], None))) - # After a first input, there should be one timer - self.assertEqual(len(pipeline.timers.values()), 1) - - def test_convert_to_numpy_connector(self): - ctx = ConnectorContext() - c = ConvertToNumpyConnector(ctx) - - name, params = c.to_state() - - self.assertEqual(name, "ConvertToNumpyConnector") - - restored = get_connector(name, ctx, params) - self.assertTrue(isinstance(restored, ConvertToNumpyConnector)) - - action = torch.Tensor([8, 9]) - states = torch.Tensor([[1, 1, 1], [2, 2, 2]]) - ac_data = ActionConnectorDataType(0, 1, {}, (action, states, {})) - - converted = c(ac_data) - self.assertTrue(isinstance(converted.output[0], np.ndarray)) - self.assertTrue(isinstance(converted.output[1], np.ndarray)) - - def test_normalize_action_connector(self): - ctx = ConnectorContext( - action_space=gym.spaces.Box(low=0.0, high=6.0, shape=[1]) - ) - c = NormalizeActionsConnector(ctx) - - name, params = c.to_state() - self.assertEqual(name, "NormalizeActionsConnector") - - restored = get_connector(name, ctx, params) - self.assertTrue(isinstance(restored, NormalizeActionsConnector)) - - ac_data = ActionConnectorDataType(0, 1, {}, (0.5, [], {})) - - normalized = c(ac_data) - self.assertEqual(normalized.output[0], 4.5) - - def test_clip_action_connector(self): - ctx = ConnectorContext( - action_space=gym.spaces.Box(low=0.0, high=6.0, shape=[1]) - ) - c = ClipActionsConnector(ctx) - - name, params = c.to_state() - self.assertEqual(name, "ClipActionsConnector") - - restored = get_connector(name, ctx, params) - self.assertTrue(isinstance(restored, ClipActionsConnector)) - - ac_data = ActionConnectorDataType(0, 1, {}, (8.8, [], {})) - - clipped = c(ac_data) - self.assertEqual(clipped.output[0], 6.0) - - def test_immutable_action_connector(self): - ctx = ConnectorContext( - action_space=gym.spaces.Box(low=0.0, high=6.0, shape=[1]) - ) - c = ImmutableActionsConnector(ctx) - - name, params = c.to_state() - self.assertEqual(name, "ImmutableActionsConnector") - - restored = get_connector(name, ctx, params) - self.assertTrue(isinstance(restored, ImmutableActionsConnector)) - - ac_data = ActionConnectorDataType(0, 1, {}, (np.array([8.8]), [], {})) - - immutable = c(ac_data) - - with self.assertRaises(ValueError): - immutable.output[0][0] = 5 - - -if __name__ == "__main__": - import pytest - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/rllib/connectors/tests/test_agent.py b/rllib/connectors/tests/test_agent.py deleted file mode 100644 index cc1acab22588..000000000000 --- a/rllib/connectors/tests/test_agent.py +++ /dev/null @@ -1,646 +0,0 @@ -# @OldAPIStack - -import gymnasium as gym -from gymnasium.spaces import Box -import numpy as np -import tree # pip install dm_tree -import unittest - -from ray.rllib.algorithms.ppo.ppo import PPOConfig -from ray.rllib.connectors.agent.clip_reward import ClipRewardAgentConnector -from ray.rllib.connectors.agent.lambdas import FlattenDataAgentConnector -from ray.rllib.connectors.agent.obs_preproc import ObsPreprocessorConnector -from ray.rllib.connectors.agent.pipeline import AgentConnectorPipeline -from ray.rllib.connectors.agent.state_buffer import StateBufferConnector -from ray.rllib.connectors.agent.view_requirement import ViewRequirementAgentConnector -from ray.rllib.connectors.connector import ConnectorContext -from ray.rllib.connectors.registry import get_connector -from ray.rllib.policy.view_requirement import ViewRequirement -from ray.rllib.policy.sample_batch import SampleBatch -from ray.rllib.utils.test_utils import check -from ray.rllib.utils.typing import ( - ActionConnectorDataType, - AgentConnectorDataType, - AgentConnectorsOutput, -) -from ray.rllib.connectors.agent.mean_std_filter import ( - MeanStdObservationFilterAgentConnector, -) - - -class TestAgentConnector(unittest.TestCase): - def test_connector_pipeline(self): - ctx = ConnectorContext() - connectors = [ClipRewardAgentConnector(ctx, False, 1.0)] - pipeline = AgentConnectorPipeline(ctx, connectors) - name, params = pipeline.to_state() - restored = get_connector(name, ctx, params) - self.assertTrue(isinstance(restored, AgentConnectorPipeline)) - self.assertTrue(isinstance(restored.connectors[0], ClipRewardAgentConnector)) - - def test_obs_preprocessor_connector(self): - obs_space = gym.spaces.Dict( - { - "a": gym.spaces.Box(low=0, high=1, shape=(1,)), - "b": gym.spaces.Tuple( - [gym.spaces.Discrete(2), gym.spaces.MultiDiscrete(nvec=[2, 3])] - ), - } - ) - ctx = ConnectorContext(config={}, observation_space=obs_space) - - c = ObsPreprocessorConnector(ctx) - name, params = c.to_state() - - restored = get_connector(name, ctx, params) - self.assertTrue(isinstance(restored, ObsPreprocessorConnector)) - - obs = obs_space.sample() - # Fake deterministic data. - obs["a"][0] = 0.5 - obs["b"] = (1, np.array([0, 2])) - - d = AgentConnectorDataType( - 0, - 1, - { - SampleBatch.OBS: obs, - }, - ) - preprocessed = c([d]) - - # obs is completely flattened. - self.assertTrue( - (preprocessed[0].data[SampleBatch.OBS] == [0.5, 0, 1, 1, 0, 0, 0, 1]).all() - ) - - def test_clip_reward_connector(self): - ctx = ConnectorContext() - - c = ClipRewardAgentConnector(ctx, limit=2.0) - name, params = c.to_state() - - self.assertEqual(name, "ClipRewardAgentConnector") - self.assertAlmostEqual(params["limit"], 2.0) - - restored = get_connector(name, ctx, params) - self.assertTrue(isinstance(restored, ClipRewardAgentConnector)) - - d = AgentConnectorDataType( - 0, - 1, - { - SampleBatch.REWARDS: 5.8, - }, - ) - clipped = restored([d]) - - self.assertEqual(len(clipped), 1) - self.assertEqual(clipped[0].data[SampleBatch.REWARDS], 2.0) - - def test_flatten_data_connector(self): - ctx = ConnectorContext() - - c = FlattenDataAgentConnector(ctx) - - name, params = c.to_state() - restored = get_connector(name, ctx, params) - self.assertTrue(isinstance(restored, FlattenDataAgentConnector)) - - sample_batch = { - SampleBatch.NEXT_OBS: { - "sensor1": [[1, 1], [2, 2]], - "sensor2": 8.8, - }, - SampleBatch.REWARDS: 5.8, - SampleBatch.ACTIONS: [[1, 1], [2, 2]], - SampleBatch.INFOS: {"random": "info"}, - } - - d = AgentConnectorDataType( - 0, - 1, - # FlattenDataAgentConnector does NOT touch raw_dict, - # so simply pass None here. - AgentConnectorsOutput(None, sample_batch), - ) - - flattened = c([d]) - self.assertEqual(len(flattened), 1) - - batch = flattened[0].data.sample_batch - self.assertTrue((batch[SampleBatch.NEXT_OBS] == [1, 1, 2, 2, 8.8]).all()) - self.assertEqual(batch[SampleBatch.REWARDS][0], 5.8) - # Not flattened. - self.assertEqual(len(batch[SampleBatch.ACTIONS]), 2) - self.assertEqual(batch[SampleBatch.INFOS]["random"], "info") - - def test_state_buffer_connector(self): - ctx = ConnectorContext( - action_space=gym.spaces.Box(low=-1.0, high=1.0, shape=(3,)), - ) - c = StateBufferConnector(ctx) - - # Reset without any buffered data should do nothing. - c.reset(env_id=0) - - d = AgentConnectorDataType( - 0, - 1, - { - SampleBatch.NEXT_OBS: { - "sensor1": [[1, 1], [2, 2]], - "sensor2": 8.8, - }, - }, - ) - - with_buffered = c([d]) - self.assertEqual(len(with_buffered), 1) - self.assertTrue((with_buffered[0].data[SampleBatch.ACTIONS] == [0, 0, 0]).all()) - - c.on_policy_output(ActionConnectorDataType(0, 1, {}, ([1, 2, 3], [], {}))) - - with_buffered = c([d]) - self.assertEqual(len(with_buffered), 1) - self.assertEqual(with_buffered[0].data[SampleBatch.ACTIONS], [1, 2, 3]) - - def test_mean_std_observation_filter_connector(self): - for bounds in [ - (-1, 1), # normalized - (-2, 2), # scaled - (0, 2), # shifted - (0, 4), # scaled and shifted - ]: - print("Testing uniform sampling with bounds: {}".format(bounds)) - - observation_space = Box(bounds[0], bounds[1], (3, 64, 64)) - ctx = ConnectorContext(observation_space=observation_space) - filter_connector = MeanStdObservationFilterAgentConnector(ctx) - - # Warm up Mean-Std filter - for i in range(1000): - obs = observation_space.sample() - sample_batch = { - SampleBatch.NEXT_OBS: obs, - } - ac = AgentConnectorDataType(0, 0, sample_batch) - filter_connector.transform(ac) - - # Create another connector to set state to - _, state = filter_connector.to_state() - another_filter_connector = ( - MeanStdObservationFilterAgentConnector.from_state(ctx, state) - ) - - another_filter_connector.in_eval() - - # Collect transformed observations - transformed_observations = [] - for i in range(1000): - obs = observation_space.sample() - sample_batch = { - SampleBatch.NEXT_OBS: obs, - } - ac = AgentConnectorDataType(0, 0, sample_batch) - connector_output = another_filter_connector.transform(ac) - transformed_observations.append( - connector_output.data[SampleBatch.NEXT_OBS] - ) - - # Check if transformed observations are actually mean-std filtered - self.assertTrue(np.isclose(np.mean(transformed_observations), 0, atol=0.1)) - self.assertTrue(np.isclose(np.var(transformed_observations), 1, atol=0.1)) - - # Check if filter parameters where frozen because we are not training - self.assertTrue( - filter_connector.filter.running_stats.num_pushes - == another_filter_connector.filter.running_stats.num_pushes, - ) - self.assertTrue( - np.all( - filter_connector.filter.running_stats.mean_array - == another_filter_connector.filter.running_stats.mean_array, - ) - ) - self.assertTrue( - np.all( - filter_connector.filter.running_stats.std_array - == another_filter_connector.filter.running_stats.std_array, - ) - ) - self.assertTrue( - filter_connector.filter.buffer.num_pushes - == another_filter_connector.filter.buffer.num_pushes, - ) - self.assertTrue( - np.all( - filter_connector.filter.buffer.mean_array - == another_filter_connector.filter.buffer.mean_array, - ) - ) - self.assertTrue( - np.all( - filter_connector.filter.buffer.std_array - == another_filter_connector.filter.buffer.std_array, - ) - ) - - -class TestViewRequirementAgentConnector(unittest.TestCase): - def test_vr_connector_respects_training_or_inference_vr_flags(self): - """Tests that the connector respects the flags within view_requirements (i.e. - used_for_training, used_for_compute_actions). - - the returned data is the input dict itself, which the policy collector in - env_runner will use to construct the episode, and a SampleBatch that can be - used to run corresponding policy. - """ - view_rq_dict = { - "both": ViewRequirement( - "obs", used_for_training=True, used_for_compute_actions=True - ), - "only_inference": ViewRequirement( - "obs", used_for_training=False, used_for_compute_actions=True - ), - "none": ViewRequirement( - "obs", used_for_training=False, used_for_compute_actions=False - ), - "only_training": ViewRequirement( - "obs", used_for_training=True, used_for_compute_actions=False - ), - } - - obs_arr = np.array([0, 1, 2, 3]) - agent_data = {SampleBatch.NEXT_OBS: obs_arr} - data = AgentConnectorDataType(0, 1, agent_data) - - config = PPOConfig().to_dict() - config["_enable_new_api_stack"] = False - ctx = ConnectorContext( - view_requirements=view_rq_dict, - config=config, - is_policy_recurrent=True, - ) - - sample_batch_expected = SampleBatch( - { - "both": obs_arr[None], - # Output in training model as well. - "only_inference": obs_arr[None], - "seq_lens": np.array([1]), - } - ) - - c = ViewRequirementAgentConnector(ctx) - c.in_training() - processed = c([data]) - - raw_dict = processed[0].data.raw_dict - sample_batch = processed[0].data.sample_batch - - check(raw_dict, agent_data) - check(sample_batch, sample_batch_expected) - - def test_vr_connector_shift_by_one(self): - view_rq_dict = { - "state": ViewRequirement("obs"), - "next_state": ViewRequirement( - "obs", shift=1, used_for_compute_actions=False - ), - "prev_state": ViewRequirement("obs", shift=-1), - } - - obs_arrs = np.arange(10)[:, None] + 1 - config = PPOConfig().to_dict() - config["_enable_new_api_stack"] = False - ctx = ConnectorContext( - view_requirements=view_rq_dict, config=config, is_policy_recurrent=True - ) - c = ViewRequirementAgentConnector(ctx) - - # keep a running list of observations - obs_list = [] - for t, obs in enumerate(obs_arrs): - # t=0 is the next state of t=-1 - data = AgentConnectorDataType(0, 1, {SampleBatch.NEXT_OBS: obs}) - processed = c([data]) # env.reset() for t == -1 else env.step() - sample_batch = processed[0].data.sample_batch - # add cur obs to the list - obs_list.append(obs) - - if t == 0: - check(sample_batch["prev_state"], sample_batch["state"]) - else: - # prev state should be equal to the prev time step obs - check(sample_batch["prev_state"], obs_list[-2][None]) - - def test_vr_connector_causal_slice(self): - """Test that the ViewRequirementAgentConnector can handle slice shifts.""" - view_rq_dict = { - "state": ViewRequirement("obs"), - # shift array should be [-2, -1, 0] - "prev_states": ViewRequirement("obs", shift="-2:0"), - # shift array should be [-4, -2, 0] - "prev_strided_states_even": ViewRequirement("obs", shift="-4:0:2"), - # shift array should be [-3, -1] - "prev_strided_states_odd": ViewRequirement("obs", shift="-3:0:2"), - } - - obs_arrs = np.arange(10)[:, None] + 1 - config = PPOConfig().to_dict() - config["_enable_new_api_stack"] = False - ctx = ConnectorContext( - view_requirements=view_rq_dict, config=config, is_policy_recurrent=True - ) - c = ViewRequirementAgentConnector(ctx) - - # keep a queue of observations - obs_list = [] - for t, obs in enumerate(obs_arrs): - # t=0 is the next state of t=-1 - data = AgentConnectorDataType(0, 1, {SampleBatch.NEXT_OBS: obs}) - processed = c([data]) - sample_batch = processed[0].data.sample_batch - - if t == 0: - obs_list.extend([obs for _ in range(5)]) - else: - # remove the first obs and add the current obs to the end - obs_list.pop(0) - obs_list.append(obs) - - # check state - check(sample_batch["state"], obs[None]) - - # check prev_states - check( - sample_batch["prev_states"], - np.stack(obs_list)[np.array([-3, -2, -1])][None], - ) - - # check prev_strided_states_even - check( - sample_batch["prev_strided_states_even"], - np.stack(obs_list)[np.array([-5, -3, -1])][None], - ) - - check( - sample_batch["prev_strided_states_odd"], - np.stack(obs_list)[np.array([-4, -2])][None], - ) - - def test_vr_connector_with_multiple_buffers(self): - """Test that the ViewRequirementAgentConnector can handle slice shifts correctly - when it has multiple buffers to shift.""" - context_len = 5 - # This view requirement simulates the use-case of a decision transformer - # without reward-to-go. - view_rq_dict = { - # obs[t-context_len+1:t] - "context_obs": ViewRequirement( - "obs", - shift=f"-{context_len-1}:0", - space=Box(-np.inf, np.inf, shape=(1,), dtype=np.float64), - ), - # next_obs[t-context_len+1:t] - "context_next_obs": ViewRequirement( - "obs", - shift=f"-{context_len}:1", - used_for_compute_actions=False, - space=Box(-np.inf, np.inf, shape=(1,), dtype=np.float64), - ), - # act[t-context_len+1:t] - "context_act": ViewRequirement( - SampleBatch.ACTIONS, - shift=f"-{context_len-1}:-1", - space=Box(-np.inf, np.inf, shape=(1,)), - ), - } - - obs_arrs = np.arange(10)[:, None] + 1 - act_arrs = (np.arange(10)[:, None] + 1) * 100 - n_steps = obs_arrs.shape[0] - config = PPOConfig().to_dict() - config["_enable_new_api_stack"] = False - ctx = ConnectorContext( - view_requirements=view_rq_dict, config=config, is_policy_recurrent=True - ) - c = ViewRequirementAgentConnector(ctx) - - # keep a queue of length ctx_len of observations - obs_list, act_list = [], [] - for t in range(n_steps): - # next state and action at time t-1 are the following - timestep_data = { - SampleBatch.NEXT_OBS: obs_arrs[t], - } - if t > 0: - timestep_data[SampleBatch.ACTIONS] = act_arrs[t - 1] - data = AgentConnectorDataType(0, 1, timestep_data) - processed = c([data]) - sample_batch = processed[0].data.sample_batch - - if t == 0: - obs_list.extend([obs_arrs[0] for _ in range(context_len)]) - act_list.extend( - [np.zeros_like(act_arrs[0]) for _ in range(context_len)] - ) - else: - obs_list.pop(0) - act_list.pop(0) - obs_list.append(obs_arrs[t]) - act_list.append(act_arrs[t - 1]) - - self.assertTrue("context_next_obs" not in sample_batch) - # We should have the 5 (context_len) most recent observations here - check(sample_batch["context_obs"], np.stack(obs_list)[None]) - # The context for actions is [t-context_len+1:t]. Since we build sample - # batch for inference in ViewRequirementAgentConnector, it always - # includes everything up until the last action (at t-1), but not the - # action current action (at t). - check(sample_batch["context_act"], np.stack(act_list[1:])[None]) - - def test_connector_pipline_with_view_requirement(self): - """A very minimal test that checks wheter pipeline connectors work in a - simulation rollout.""" - config = ( - PPOConfig() - .api_stack( - enable_rl_module_and_learner=False, - enable_env_runner_and_connector_v2=False, - ) - .framework("torch") - .environment(env="CartPole-v1") - .env_runners(create_env_on_local_worker=True) - ) - - env = gym.make("CartPole-v1") - policy = config.build().get_policy() - - REQUIRED_KEYS = { - SampleBatch.OBS, - SampleBatch.NEXT_OBS, - SampleBatch.REWARDS, - SampleBatch.TERMINATEDS, - SampleBatch.TRUNCATEDS, - SampleBatch.INFOS, - SampleBatch.ACTIONS, - } - policy.view_requirements = { - k: v for k, v in policy.view_requirements.items() if k in REQUIRED_KEYS - } - - # create a connector context - ctx = ConnectorContext( - view_requirements=policy.view_requirements, - config=policy.config, - initial_states=policy.get_initial_state(), - is_policy_recurrent=policy.is_recurrent(), - observation_space=policy.observation_space, - action_space=policy.action_space, - ) - - # build chain of connectors - connectors = [ - ObsPreprocessorConnector(ctx), - StateBufferConnector(ctx), - ViewRequirementAgentConnector(ctx), - ] - agent_connector = AgentConnectorPipeline(ctx, connectors) - - name, params = agent_connector.to_state() - restored = get_connector(name, ctx, params) - self.assertTrue(isinstance(restored, AgentConnectorPipeline)) - for cidx, c in enumerate(connectors): - check(restored.connectors[cidx].to_state(), c.to_state()) - - # simulate a rollout - n_steps = 10 - obs, info = env.reset() - env_out = AgentConnectorDataType( - 0, 1, {SampleBatch.NEXT_OBS: obs, SampleBatch.T: -1} - ) - agent_obs = agent_connector([env_out])[0] - t = 0 - total_rewards = 0 - while t < n_steps: - policy_output = policy.compute_actions_from_input_dict( - agent_obs.data.sample_batch - ) - # Removes batch dimension - policy_output = tree.map_structure(lambda x: x[0], policy_output) - - agent_connector.on_policy_output( - ActionConnectorDataType(0, 1, {}, policy_output) - ) - action = policy_output[0] - - next_obs, rewards, terminateds, truncateds, info = env.step(action) - env_out_dict = { - SampleBatch.NEXT_OBS: next_obs, - SampleBatch.REWARDS: rewards, - SampleBatch.TERMINATEDS: terminateds, - SampleBatch.TRUNCATEDS: truncateds, - SampleBatch.INFOS: info, - SampleBatch.ACTIONS: action, - # state_out - } - env_out = AgentConnectorDataType(0, 1, env_out_dict) - agent_obs = agent_connector([env_out])[0] - total_rewards += rewards - t += 1 - print(total_rewards) - - def test_vr_connector_only_keeps_useful_timesteps(self): - """Tests that the connector respects the flags within view_requirements (i.e. - used_for_training, used_for_compute_actions). - - the returned data is the input dict itself, which the policy collector in - env_runner will use to construct the episode, and a SampleBatch that can be - used to run corresponding policy. - """ - view_rqs = { - "obs": ViewRequirement( - None, used_for_training=True, used_for_compute_actions=True - ), - } - - config = PPOConfig().to_dict() - config["_enable_new_api_stack"] = False - ctx = ConnectorContext( - view_requirements=view_rqs, - config=config, - is_policy_recurrent=False, - ) - - c = ViewRequirementAgentConnector(ctx) - c.in_training() - - for i in range(5): - obs_arr = np.array([0, 1, 2, 3]) + i - agent_data = {SampleBatch.NEXT_OBS: obs_arr} - data = AgentConnectorDataType(0, 1, agent_data) - - # Feed ViewRequirementAgentConnector 5 samples. - c([data]) - - obs_data = c.agent_collectors[0][1].buffers["obs"][0] - # Only keep data for the last timestep. - self.assertEqual(len(obs_data), 1) - # Data matches the latest timestep. - self.assertTrue(np.array_equal(obs_data[0], np.array([4, 5, 6, 7]))) - - def test_vr_connector_default_agent_collector_is_empty(self): - """Tests that after reset() the view_requirement connector will - create a fresh new agent collector. - """ - view_rqs = { - "obs": ViewRequirement( - None, used_for_training=True, used_for_compute_actions=True - ), - } - - config = PPOConfig().to_dict() - config["_enable_new_api_stack"] = False - ctx = ConnectorContext( - view_requirements=view_rqs, - config=config, - is_policy_recurrent=False, - ) - - c = ViewRequirementAgentConnector(ctx) - c.in_training() - - for i in range(5): - obs_arr = np.array([0, 1, 2, 3]) + i - agent_data = {SampleBatch.NEXT_OBS: obs_arr} - data = AgentConnectorDataType(0, 1, agent_data) - - # Feed ViewRequirementAgentConnector 5 samples. - c([data]) - - # 1 init_obs, plus 4 agent steps. - self.assertEqual(c.agent_collectors[0][1].agent_steps, 4) - - # Reset. - c.reset(0) # env_id = 0 - - # Process a new timestep. - obs_arr = np.array([0, 1, 2, 3]) + i - agent_data = {SampleBatch.NEXT_OBS: obs_arr} - data = AgentConnectorDataType(0, 1, agent_data) - - # Feed ViewRequirementAgentConnector 5 samples. - c([data]) - - # Start fresh with 0 agent step. - self.assertEqual(c.agent_collectors[0][1].agent_steps, 0) - - -if __name__ == "__main__": - import sys - - import pytest - - sys.exit(pytest.main(["-v", __file__])) diff --git a/rllib/connectors/tests/test_connector.py b/rllib/connectors/tests/test_connector.py deleted file mode 100644 index 2d1e5a18855c..000000000000 --- a/rllib/connectors/tests/test_connector.py +++ /dev/null @@ -1,100 +0,0 @@ -# @OldAPIStack - -import unittest - -import gymnasium as gym - -from ray.rllib.connectors.connector import Connector, ConnectorPipeline -from ray.rllib.connectors.connector import ConnectorContext -from ray.rllib.connectors.agent.synced_filter import SyncedFilterAgentConnector -from ray.rllib.connectors.agent.mean_std_filter import ( - MeanStdObservationFilterAgentConnector, -) -from ray.rllib.connectors.agent.obs_preproc import ObsPreprocessorConnector -from ray.rllib.connectors.agent.clip_reward import ClipRewardAgentConnector - - -class TestConnectorPipeline(unittest.TestCase): - class Tom(Connector): - def to_state(): - return "tom" - - class Bob(Connector): - def to_state(): - return "bob" - - class Mary(Connector): - def to_state(): - return "mary" - - class MockConnectorPipeline(ConnectorPipeline): - def __init__(self, ctx, connectors): - # Real connector pipelines should keep a list of - # Connectors. - # Use strings here for simple unit tests. - self.connectors = connectors - - def test_sanity_check(self): - ctx = {} - - m = self.MockConnectorPipeline(ctx, [self.Tom(ctx), self.Bob(ctx)]) - m.insert_before("Bob", self.Mary(ctx)) - self.assertEqual(len(m.connectors), 3) - self.assertEqual(m.connectors[1].__class__.__name__, "Mary") - - m = self.MockConnectorPipeline(ctx, [self.Tom(ctx), self.Bob(ctx)]) - m.insert_after("Tom", self.Mary(ctx)) - self.assertEqual(len(m.connectors), 3) - self.assertEqual(m.connectors[1].__class__.__name__, "Mary") - - m = self.MockConnectorPipeline(ctx, [self.Tom(ctx), self.Bob(ctx)]) - m.prepend(self.Mary(ctx)) - self.assertEqual(len(m.connectors), 3) - self.assertEqual(m.connectors[0].__class__.__name__, "Mary") - - m = self.MockConnectorPipeline(ctx, [self.Tom(ctx), self.Bob(ctx)]) - m.append(self.Mary(ctx)) - self.assertEqual(len(m.connectors), 3) - self.assertEqual(m.connectors[2].__class__.__name__, "Mary") - - m.remove("Bob") - self.assertEqual(len(m.connectors), 2) - self.assertEqual(m.connectors[0].__class__.__name__, "Tom") - self.assertEqual(m.connectors[1].__class__.__name__, "Mary") - - m.remove("Bob") - # Bob does not exist anymore, still 2. - self.assertEqual(len(m.connectors), 2) - self.assertEqual(m.connectors[0].__class__.__name__, "Tom") - self.assertEqual(m.connectors[1].__class__.__name__, "Mary") - - self.assertEqual(m["Tom"], [m.connectors[0]]) - self.assertEqual(m[0], [m.connectors[0]]) - self.assertEqual(m[m.connectors[1].__class__], [m.connectors[1]]) - - def test_pipeline_indexing(self): - """Tests if ConnectorPipeline.__getitem__() works as intended.""" - context = ConnectorContext({}, observation_space=gym.spaces.Box(-1, 1, (1,))) - some_connector = MeanStdObservationFilterAgentConnector(context) - some_other_connector = ObsPreprocessorConnector(context) - # Create a dummy pipeline just for indexing purposes - pipeline = ConnectorPipeline(context, [some_connector, some_other_connector]) - - for key, expected_value in [ - (MeanStdObservationFilterAgentConnector, [some_connector]), - ("MeanStdObservationFilterAgentConnector", [some_connector]), - (SyncedFilterAgentConnector, [some_connector]), - ("SyncedFilterAgentConnector", []), - (ClipRewardAgentConnector, []), - ("can i get something?", []), - (0, [some_connector]), - (1, [some_other_connector]), - ]: - self.assertEqual(pipeline[key], expected_value) - - -if __name__ == "__main__": - import pytest - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/rllib/models/tests/test_action_distributions.py b/rllib/models/tests/test_action_distributions.py index 6aca70f1fca4..254b8ba315cd 100644 --- a/rllib/models/tests/test_action_distributions.py +++ b/rllib/models/tests/test_action_distributions.py @@ -1,22 +1,14 @@ -from functools import partial -from gymnasium.spaces import Box, Dict, Tuple +from gymnasium.spaces import Box import numpy as np -from scipy.stats import beta, norm -import tree # pip install dm_tree +from scipy.stats import norm import unittest from ray.rllib.models.torch.torch_action_dist import ( - TorchBeta, TorchCategorical, TorchDiagGaussian, - TorchMultiActionDistribution, - TorchMultiCategorical, - TorchSquashedGaussian, ) from ray.rllib.utils.framework import try_import_tf, try_import_torch from ray.rllib.utils.numpy import ( - MIN_LOG_NN_OUTPUT, - MAX_LOG_NN_OUTPUT, softmax, SMALL_NUMBER, LARGE_INTEGER,