from navsim.agents.abstract_agent import AbstractAgent from omegaconf import DictConfig import hydra from hydra.utils import instantiate import argparse import os from hugsim.visualize import to_video, save_frame from hugsim.dataparser import parse_raw import torch from hugsim_client import HugsimClient CONFIG_PATH = "navsim/planning/script/config/HUGSIM" CONFIG_NAME = "transfuser" hugsim_client = HugsimClient() hugsim_client.reset_env() def get_opts(): parser = argparse.ArgumentParser() parser.add_argument('--output', type=str, required=True) return parser.parse_args() @hydra.main(config_path=CONFIG_PATH, config_name=CONFIG_NAME, version_base=None) def main(cfg: DictConfig) -> None: cfg.agent.config.latent = True cfg.agent.checkpoint_path = "./ckpts/ltf_seed_0.ckpt" print(cfg) agent: AbstractAgent = instantiate(cfg.agent) agent.initialize() agent = agent.cuda() print('Ready for recieving') cnt = 0 output_folder = os.path.join(cfg.output, 'ltf') os.makedirs(output_folder, exist_ok=True) env_state = hugsim_client.get_current_state() while True: if env_state.done: return data = parse_raw((env_state.state.obs, env_state.state.info)) del data['raw_imgs'] try: with torch.no_grad(): traj = agent.compute_trajectory(data['input']) except RuntimeError as e: traj = None print(e) if traj is None: print('Waiting for visualize tasks...') return # imu to lidar imu_way_points = traj.poses[:, :3] way_points = imu_way_points[:, [1, 0, 2]] way_points[:, 0] *= -1 env_state = hugsim_client.execute_action(way_points[:, :2]) print('sent') cnt += 1 if env_state.cur_scene_done: print('Scene done') if __name__ == '__main__': # args = get_opts() main()