fsrl.data

Data package.

class fsrl.data.FastCollector(policy: BasePolicy, env: Env | BaseVectorEnv, buffer: ReplayBuffer | None = None, preprocess_fn: Callable[[...], Batch] | None = None, exploration_noise: bool = False)[source]

Bases: object

Collector enables the policy to interact with different types of envs with exact number of episodes.

This collector is a simplified version of Tianshou’s collector, so it is safe to check their documentation for details. The main change is the support to extract the cost signals from the interaction data.

Parameters:
  • policy – an instance of the BasePolicy class.

  • env – a gym.Env environment or an instance of the BaseVectorEnv class.

  • buffer – an instance of the ReplayBuffer class. If set to None, it will not store the data. Default to None.

  • preprocess_fn (function) – a function called before the data has been added to the buffer. Default to None.

  • exploration_noise (bool) – determine whether the action needs to be modified with corresponding policy’s exploration noise. If so, “policy. exploration_noise(act, batch)” will be called automatically to add the exploration noise into action. Default to False.

Note

Please make sure the given environment has a time limitation (can be done), because we only support the n_episode collect option.

reset(reset_buffer: bool = True, gym_reset_kwargs: Dict[str, Any] | None = None) None[source]

Reset the environment, statistics, current data and possibly replay memory.

Parameters:
  • reset_buffer (bool) – if true, reset the replay buffer that is attached to the collector.

  • gym_reset_kwargs – extra keyword arguments to pass into the environment’s reset function. Defaults to None (extra keyword arguments)

reset_stat() None[source]

Reset the statistic variables.

reset_buffer(keep_statistics: bool = False) None[source]

Reset the data buffer.

reset_env(gym_reset_kwargs: Dict[str, Any] | None = None) None[source]

Reset all of the environments.

collect(n_episode: int = 1, random: bool = False, render: bool = False, no_grad: bool = True, gym_reset_kwargs: Dict[str, Any] | None = None) Dict[str, Any][source]

Collect a specified number of step or episode.

To ensure unbiased sampling result with n_episode option, this function will first collect n_episode - env_num episodes, then for the last env_num episodes, they will be collected evenly from each env.

Parameters:
  • n_episode (int) – how many episodes you want to collect.

  • random (bool) – whether to use random policy for collecting data. Default to False.

  • render (bool) – Whether to render the environment during evaluation, defaults to False

  • no_grad (bool) – whether to retain gradient in policy.forward(). Default to True (no gradient retaining).

  • gym_reset_kwargs – extra keyword arguments to pass into the environment’s reset function. Defaults to None (extra keyword arguments)

Note

We don not support the n_step collection method in Tianshou, because using n_episode only can facilitate the episodic cost computation and better evaluate the agent.

Returns:

A dict including the following keys

  • n/ep collected number of episodes.

  • n/st collected number of steps.

  • rew mean of episodic rewards.

  • len mean of episodic lengths.

  • total_cost cumulative costs in this collect.

  • cost mean of episodic costs.

  • truncated mean of episodic truncation.

  • terminated mean of episodic termination.

class fsrl.data.BasicCollector(policy: BasePolicy, env: Env, buffer: ReplayBuffer | None = None, exploration_noise: bool | None = False, traj_buffer: TrajectoryBuffer | None = None)[source]

Bases: object

A basic collector for a single environment.

This collector doesn’t support vector env and is served as experimental purpose. It supports to store collected data in the TrajectoryBuffer with a grid filter, which can be used to memory-efficiently collect trajectory-wise interaction dataset.

Example of data saving:

traj_buffer = TrajectoryBuffer(max_traj_num) collector = BasicCollector(policy,
env, traj_buffer=traj_buffer) collector.collect(n_episodes)

traj_buffer.save(logdir)
Parameters:
  • policy – an instance of the BasePolicy class.

  • env – a gym.Env environment or an instance of the BaseVectorEnv class.

  • buffer – an instance of the ReplayBuffer class. If set to None, it will not store the data. Default to None.

  • exploration_noise (bool) – determine whether the action needs to be modified with corresponding policy’s exploration noise. If so, “policy. exploration_noise(act, batch)” will be called automatically to add the exploration noise into action. Default to False.

  • traj_buffer (TrajectoryBuffer) – the buffer used to store trajectories

Note

Please make sure the given environment has a time limitation (can be done), because we only support the n_episode collect option.

reset(reset_buffer: bool = True, gym_reset_kwargs: Dict[str, Any] | None = None) None[source]

Reset the environment, statistics, current data and possibly replay memory.

Parameters:
  • reset_buffer (bool) – if true, reset the replay buffer that is attached to the collector.

  • gym_reset_kwargs – extra keyword arguments to pass into the environment’s reset function. Defaults to None (extra keyword arguments)

reset_buffer(keep_statistics: bool = False) None[source]

Reset the data buffer.

reset_stat() None[source]

Reset the statistic variables.

reset_env(gym_reset_kwargs: Dict[str, Any] | None = None) None[source]

Reset all of the environments.

collect(n_episode: int = 0, random: bool = False, render: float | None = None, no_grad: bool = True, gym_reset_kwargs: Dict[str, Any] | None = None) Dict[str, Any][source]

Collect a specified number of step or episode.

To ensure unbiased sampling result with n_episode option, this function will first collect n_episode - env_num episodes, then for the last env_num episodes, they will be collected evenly from each env.

Parameters:
  • n_episode (int) – how many episodes you want to collect.

  • random (bool) – whether to use random policy for collecting data. Default to False.

  • render (float) – the sleep time between rendering consecutive frames. Default to None (no rendering).

  • no_grad (bool) – whether to retain gradient in policy.forward(). Default to True (no gradient retaining).

  • gym_reset_kwargs – extra keyword arguments to pass into the environment’s reset function. Defaults to None (extra keyword arguments)

Note

We don not support the n_step collection method in Tianshou, because using n_episode only can facilitate the episodic cost computation and better evaluate the agent.

Returns:

A dict including the following keys

  • n/ep collected number of episodes.

  • n/st collected number of steps.

  • rew mean of episodic rewards.

  • len mean of episodic lengths.

  • total_cost cumulative costs in this collect.

  • cost mean of episodic costs.

  • truncated mean of episodic truncation.

  • terminated mean of episodic termination.

class fsrl.data.TrajectoryBuffer(max_trajectory: int = 99999, use_grid_filter: bool = True, rmin: float = -inf, rmax: float = inf, cmin: float = -inf, cmax: float = inf, filter_interval: float = 2)[source]

Bases: object

Buffer for storing trajectories collected during training.

If use grid filter, it will discard exceeded trajectories based on the density over the cost-return and reward-return space. It will only store the trajectory in the buffer if its reward return and cost return are within the user- defined ranges: rmin, rmax, cmin, cmax.

Parameters:
  • max_trajectory (int) – Maximum number of trajectories to store. (default=99999)

  • use_grid_filter (bool) – If True, use grid filtering to downsample the data. (default=True)

  • rmin (float) – The minimum reward return of trajectory that can be stored in the buffer

  • rmax (float) – The maximum reward return of trajectory that can be stored in the buffer

  • cmin (float) – The minimum cost return of trajectory that can be stored in the buffer

  • cmax (float) – The maximum cost return of trajectory that can be stored in the buffer

  • filter_interval (float) – Only used when use_grid_filter is True. The filter interval is the ratio of trajectory numbers to keep in the buffer. (default=2.0)

store(data: Batch) None[source]

Stores a batch of data in the buffer.

Parameters:

data (Batch) – Batch of data to store.

apply_grid_filter() None[source]

Apply grid filtering to the buffer and metrics data.

The filter will removing some trajectories with the highest density.

— Note: This method modifies the buffer and metrics arrays in place.

static filter_points(points: list, target_size: int) list[source]

Filter a list of 2D points and returns a list of filtered indices.

The filtering is done by keeping a certain number of points (determined by the target_size parameter) while trying to preserve the spatial distribution of the original points as much as possible.

Parameters:
  • points – A list of 2D points represented as a numpy array of shape (N, 2).

  • target_size – The number of points to keep after filtering.

Returns:

A list of indices that represent the filtered points.

sample(batch_size: int) Batch[source]

Samples a batch of transitions from the buffer.

Parameters:

batch_size (int) – Number of transitions to sample.

Returns:

Batch of sampled transitions.

get_all() Batch[source]

Returns all the transitions stored in the buffer as a single batch.

Returns:

All stored transitions as a single batch.

Return type:

Batch

save(log_dir: str, dataset_name: str = 'dataset.hdf5') None[source]

Saves the entire buffer to disk as an HDF5 file.

Parameters:
  • log_dir (str) – Directory to save the dataset in.

  • dataset_name (str, optional (default="dataset.hdf5")) – Name of the dataset file to save.