| from typing import Dict, Any |
|
|
| from flows.base_flows import SequentialFlow |
| from flows.utils import logging |
| from .PlanGeneratorAtomicFlow import PlanGeneratorAtomicFlow |
|
|
| logging.set_verbosity_debug() |
| log = logging.get_logger(__name__) |
|
|
| class WritePlanFlow(SequentialFlow): |
| @SequentialFlow.output_msg_payload_processor |
| def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: PlanGeneratorAtomicFlow) -> Dict[str, Any]: |
| if "plan" not in output_payload: |
| return { |
| "EARLY_EXIT": True, |
| "plan_writer_output": output_payload |
| } |
| else: |
| return output_payload |