| from typing import Dict, Any |
| import os |
|
|
| from flow_modules.Tachi67.ContentWriterFlowModule import ContentWriterFlow |
| from flows.base_flows import CircularFlow |
|
|
|
|
| class CodeWriterFlow(ContentWriterFlow): |
| def _on_reach_max_round(self): |
| self._state_update_dict({ |
| "code": "The maximum amount of rounds was reached before the model generated the code.", |
| "status": "unfinished" |
| }) |
|
|
| @CircularFlow.output_msg_payload_processor |
| def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: |
| command = output_payload["command"] |
| if command == "finish": |
| |
| keys_to_fetch_from_state = ["temp_code_file_location", "code"] |
| fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
| temp_code_file_location = fetched_state["temp_code_file_location"] |
| code_content = fetched_state["code"] |
| if os.path.exists(temp_code_file_location): |
| os.remove(temp_code_file_location) |
| |
| return { |
| "EARLY_EXIT": True, |
| "code": code_content, |
| "result": output_payload["command_args"]["summary"], |
| "summary": "ExtendLibrary/CodeWriter: " + output_payload["command_args"]["summary"], |
| "status": "finished" |
| } |
| elif command == "manual_finish": |
| |
| keys_to_fetch_from_state = ["temp_code_file_location"] |
| fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
| temp_code_file_location = fetched_state["temp_code_file_location"] |
| if os.path.exists(temp_code_file_location): |
| os.remove(temp_code_file_location) |
| |
| return { |
| "EARLY_EXIT": True, |
| "code": "no code was generated", |
| "result": "CodeWriter was terminated explicitly by the user, process is unfinished", |
| "summary": "ExtendLibrary/CodeWriter: CodeWriter was terminated explicitly by the user, process is unfinished", |
| "status": "unfinished" |
| } |
| elif command == "test": |
| |
| keys_to_fetch_from_state = ["code"] |
| fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
|
|
| |
| code_content = fetched_state["code"] |
| output_payload["command_args"]["code"] = code_content |
|
|
| return output_payload |
| else: |
| return output_payload |
|
|
| def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
| |
| self._state_update_dict(update_data=input_data) |
|
|
| max_rounds = self.flow_config.get("max_rounds", 1) |
| if max_rounds is None: |
| log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") |
|
|
| self._sequential_run(max_rounds=max_rounds) |
|
|
| output = self._get_output_from_state() |
|
|
| self.reset(full_reset=True, recursive=True, src_flow=self) |
|
|
| return output |