from typing import List, Optional, Union
from copy import deepcopy
import invertedai as iai
from invertedai.common import (
AgentProperties,
AgentState,
RecurrentState,
TrafficLightStatesDict
)
from invertedai.large.drive import large_drive
from invertedai.large.initialize import large_initialize, get_regions_default
from invertedai.api.drive import DriveResponse
from invertedai.api.initialize import InitializeResponse
[docs]class BasicCosimulation:
"""
Stateful wrapper around the Inverted AI API to simplify co-simulation.
All keyword arguments to :func:`large_initialize` can be passed to the constructor
here and a sufficient combination of them must be passed as required by :func:`large_initialize`.
This wrapper caches static agent properties and propagates the recurrent state,
so that only states of conditional agents need to be exchanged with it to
perform co-simulation. Typically, each time step requires a single call to :func:`self.step`.
This wrapper only supports a minimal co-simulation functionality.
For more advanced use cases, call :func:`large_initialize` and :func:`large_drive` directly.
location:
Location name as expected by :func:`large_initialize` and :func:`large_drive`.
conditional_agent_properties:
Agent properties for all conditional agents that must be given to the API so the NPC agents are aware of them.
Please refer to the documentation for :func:`large_initialize` for more information on how to format this parameter (treating the
conditional agents as "predefined agents"). Furthermore, any predefined agents for which the user
wishes the IAI API to control must be defined at the end of this list.
conditional_agent_agent_states:
Agent states for all conditional agents that must be given to the API so the NPC agents are aware of their states.
Please refer to the documentation for :func:`large_initialize` for more information on how to format this parameter (treating the
ego agents as "predefined agents"). Furthermore, any predefined agents for which the user
wishes the IAI API to control must be defined at the end of this list.
num_non_ego_conditional_agents:
The ego agents are the subset of the conditional agents that are NOT controlled by the Inverted AI API. This
parameter allows some of the conditional agents with predefined states and properties to nonetheless be
controlled by the Inverted AI API. The non-ego conditional agents must be placed at the end of the conditional
agents list and the ego agents must be placed at the beginning of the conditional agents list.
"""
def __init__(
self,
location: str,
conditional_agent_properties: Optional[List[AgentProperties]] = None,
conditional_agent_agent_states: Optional[List[AgentState]] = None,
num_non_ego_conditional_agents: Optional[int] = 0,
**kwargs # sufficient arguments to initialize must also be included
):
self._conditional_agent_properties = conditional_agent_properties
self._conditional_agent_agent_states = conditional_agent_agent_states
self._location = location
self._response = large_initialize(
location=self._location,
agent_properties=self._conditional_agent_properties,
agent_states=self._conditional_agent_agent_states,
**kwargs,
)
self.init_response = deepcopy(self._response)
self._light_state = self.init_response.traffic_lights_states
self._light_recurrent_state = self.init_response.light_recurrent_states
self._total_agent_count = len(self.init_response.agent_properties) # initialize may produce different agent count
self._conditional_agent_count = len(self._conditional_agent_agent_states) - num_non_ego_conditional_agents
self._npc_agent_count = self._total_agent_count - self._conditional_agent_count
assert self._conditional_agent_count >= 0, "Invalid number of ego and conditional agents."
self._agent_properties = self.init_response.agent_properties
self._agent_states = self.init_response.agent_states
self._recurrent_states = self.init_response.recurrent_states
@property
def location(self) -> str:
"""
Location name as recognized by Inverted AI API.
"""
return self._location
@property
def agent_count(self) -> int:
"""
The total number of agents, both ego and NPCs.
"""
return self._total_agent_count
@property
def agent_states(self) -> List[AgentState]:
"""
The predicted states for all agents, including ego.
"""
return self._agent_states
@property
def agent_properties(self) -> List[AgentProperties]:
"""
The properties (length, width, rear_axis_offset, max_speed) for all agents, including ego.
"""
return self._agent_properties
@property
def ego_states(self) -> List[AgentState]:
"""
Returns the predicted states of ego agents in order.
The NPC agents are excluded.
"""
return self._agent_states[:self._conditional_agent_count]
@property
def ego_properties(self) -> List[AgentProperties]:
"""
Returns the properties of ego agents in order.
The NPC agents are excluded.
"""
return self._agent_properties[:self._conditional_agent_count]
@property
def npc_states(self) -> List[AgentState]:
"""
Returns the predicted states of NPCs (non-ego agents) in order.
The predictions for ego agents are excluded.
"""
return self._agent_states[self._conditional_agent_count:]
@property
def npc_properties(self) -> List[AgentProperties]:
"""
Returns the properties of NPCs (non-ego agents) in order.
The ego agents are excluded.
"""
return self._agent_properties[self._conditional_agent_count:]
@property
def npc_recurrent_states(self) -> List[RecurrentState]:
"""
Returns the recurrent states of NPCs (non-ego agents) in order.
The ego agents are excluded.
"""
return self._recurrent_states[self._conditional_agent_count:]
@property
def light_states(self) -> Optional[TrafficLightStatesDict]:
"""
Returns the traffic light states if any exists on the map.
"""
return self._light_state
@property
def response(self) -> Union[DriveResponse,InitializeResponse]:
"""
Get the current response data object containing all information received from the API.
"""
return self._response
[docs] def step(
self,
current_conditional_agent_states: List[AgentState],
**kwargs
) -> None:
"""
Calls :func:`large_drive` to advance the simulation by one time step.
Current states of ego agents need to be provided to synchronize with
your local simulator.
This function assumes the conditional agents are placed at the beginning
of given agent states list.
All remaining keyword arguments to :func:`large_drive` can be passed
to this function here to receive the desired information from the API.
current_conditional_agent_states:
States of ego agents before the step which must match the number of
given ego, conditional agents during initialization.
"""
self._update_conditional_states(current_conditional_agent_states)
self._response = large_drive(
location=self.location,
agent_properties=self._agent_properties,
agent_states=self._agent_states,
recurrent_states=self._recurrent_states,
light_recurrent_states=self._light_recurrent_state,
**kwargs
)
self._agent_states = self._response.agent_states
self._recurrent_states = self._response.recurrent_states
self._light_state = self._response.traffic_lights_states
self._light_recurrent_state = self._response.light_recurrent_states
def _update_conditional_states(self, conditional_agent_states):
assert len(conditional_agent_states) == self._conditional_agent_count, "Given number of agents in this step must match the number of ego agents in the co-simulation."
self._agent_states[:self._conditional_agent_count] = conditional_agent_states