| | |
| | |
| | |
| | |
| | |
| |
|
| | """ |
| | Echo Environment Client. |
| | |
| | This module provides the client for connecting to an Echo Environment server |
| | via WebSocket for persistent sessions. |
| | """ |
| |
|
| | from typing import Any, Dict |
| |
|
| | |
| | try: |
| | |
| | from openenv.core.client_types import StepResult |
| | from openenv.core.env_server.types import State |
| | from openenv.core.env_client import EnvClient |
| | from .models import EchoAction, EchoObservation |
| | except ImportError: |
| | |
| | from openenv.core.client_types import StepResult |
| | from openenv.core.env_server.types import State |
| | from openenv.core.env_client import EnvClient |
| | from models import EchoAction, EchoObservation |
| |
|
| |
|
| | class EchoEnv(EnvClient[EchoAction, EchoObservation, State]): |
| | """ |
| | Client for the Echo Environment. |
| | |
| | This client maintains a persistent WebSocket connection to the environment |
| | server, enabling efficient multi-step interactions with lower latency. |
| | Each client instance has its own dedicated environment session on the server. |
| | |
| | Example: |
| | >>> # Connect to a running server |
| | >>> with EchoEnv(base_url="http://localhost:8000") as client: |
| | ... result = client.reset() |
| | ... print(result.observation.echoed_message) |
| | ... |
| | ... result = client.step(EchoAction(message="Hello!")) |
| | ... print(result.observation.echoed_message) |
| | ... print(result.reward) |
| | |
| | Example with Docker: |
| | >>> # Automatically start container and connect |
| | >>> client = EchoEnv.from_docker_image("echo-env:latest") |
| | >>> try: |
| | ... result = client.reset() |
| | ... result = client.step(EchoAction(message="Test")) |
| | ... finally: |
| | ... client.close() |
| | """ |
| |
|
| | def _step_payload(self, action: EchoAction) -> Dict: |
| | """ |
| | Convert EchoAction to JSON payload for step request. |
| | |
| | Args: |
| | action: EchoAction instance |
| | |
| | Returns: |
| | Dictionary representation suitable for JSON encoding |
| | """ |
| | return { |
| | "message": action.message, |
| | } |
| |
|
| | def _parse_result(self, payload: Dict) -> StepResult[EchoObservation]: |
| | """ |
| | Parse server response into StepResult[EchoObservation]. |
| | |
| | Args: |
| | payload: JSON response from server |
| | |
| | Returns: |
| | StepResult with EchoObservation |
| | """ |
| | obs_data = payload.get("observation", {}) |
| | observation = EchoObservation( |
| | echoed_message=obs_data.get("echoed_message", ""), |
| | message_length=obs_data.get("message_length", 0), |
| | done=payload.get("done", False), |
| | reward=payload.get("reward"), |
| | metadata=obs_data.get("metadata", {}), |
| | ) |
| |
|
| | return StepResult( |
| | observation=observation, |
| | reward=payload.get("reward"), |
| | done=payload.get("done", False), |
| | ) |
| |
|
| | def _parse_state(self, payload: Dict) -> State: |
| | """ |
| | Parse server response into State object. |
| | |
| | Args: |
| | payload: JSON response from /state endpoint |
| | |
| | Returns: |
| | State object with episode_id and step_count |
| | """ |
| | return State( |
| | episode_id=payload.get("episode_id"), |
| | step_count=payload.get("step_count", 0), |
| | ) |
| |
|