| import json |
| from copy import deepcopy |
| from typing import Any, Dict, List |
| from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow |
| from aiflows.messages import FlowMessage |
| from dataclasses import dataclass |
|
|
|
|
| @dataclass |
| class Command: |
| """ The command class is used to store the information about the commands that the user can give to the controller. |
| |
| :param name: The name of the command. |
| :type name: str |
| :param description: The description of the command. |
| :type description: str |
| :param input_args: The input arguments of the command. |
| :type input_args: List[str] |
| """ |
| name: str |
| description: str |
| input_args: List[str] |
|
|
|
|
| class ControllerAtomicFlow(ChatAtomicFlow): |
| """ The ControllerAtomicFlow is an atomic flow that, given an observation and a goal, can call a set of commands and arguments which are then usually executed by an ExecutorAtomicFlow (branching flow). |
| |
| *Configuration Parameters* |
| |
| - `name` (str): The name of the flow. Default: "ControllerFlow" |
| - `description` (str): A description of the flow. This description is used to generate the help message of the flow. |
| Default: "Proposes the next action to take towards achieving the goal, and prepares the input for the executor." |
| - `enable_cache` (bool): Whether to enable caching or not. Default: True |
| - `commands` (List[Dict[str,Any]]): A list of commands that the controller can call. Default: [] |
| - `finish` (Dict[str,Any]): The configuration of the finish command. Default parameters: No default parameters. |
| - `system_message_prompt_template` (Dict[str, Any]): The prompt template used to generate the system message. |
| By default, it's type is aiflows.prompt_template.JinjaPrompt. It's default parameters are: |
| - `template` (str): The template of the prompt. Default: see ControllerAtomicFlow.yaml for the default template. |
| - `input_variables` (List[str]): The input variables of the prompt. Default: ["commands"]. Note that the commands are the commands of the executor |
| (subflows of branching flow) and are actually to the system prompt template via the `_build_commands_manual` function of this class. |
| - `human_message_prompt_template` (Dict[str, Any]): The prompt template of the human/user message (message used everytime the except the first time in). |
| It's passed as the user message to the LLM. By default its of type aiflows.prompt_template.JinjaPrompt and has the following parameters: |
| - `template` (str): The template of the prompt. Default: see ControllerAtomicFlow.yaml for the default template. |
| - `input_variables` (List[str]): The input variables of the prompt. Default: ["observation"] |
| - init_human_message_prompt_template` (Dict[str, Any]): The prompt template of the human/user message used to initialize the conversation |
| (first time in). It is used to generate the human message. It's passed as the user message to the LLM. |
| By default its of type aiflows.prompt_template.JinjaPrompt and has the following parameters: |
| - `template` (str): The template of the prompt. Default: see ControllerAtomicFlow.yaml for the default template. |
| - `input_variables` (List[str]): The input variables of the prompt. Default: ["goal"] |
| - All other parameters are inherited from the default configuration of ChatAtomicFlow (see Flowcard, i.e. README.md, of ChatAtomicFlowModule). |
| |
| *Initial Input Interface (this is the interface used the first time the flow is called)*: |
| - `goal` (str): The goal of the controller. Usually asked by the user/human (e.g. "I want to know the occupation and birth date of Michael Jordan.") |
| |
| *Input Interface (this is the interface used after the first time the flow is called)*: |
| - `observation` (str): The observation of the controller's previous action. Usually the response of the ExecutorAtomicFlow (e.g. "The result of a wikipedia search (if the ExecutorAtomicFlow has a WikipediaExecutorAtomicFlow).") |
| |
| *Output Interface:* |
| - `thought` (str): The thought of the controller on what to do next (which command to call) |
| - `reasoning` (str): The reasoning of the controller on why it thinks the command it wants to call is the right one |
| - `criticism` (str): The criticism of the controller of it's thinking process |
| - `command` (str): The command to the executor chooses to call |
| - `command_args` (Dict[str, Any]): The arguments of the command to call |
| |
| :param commands: The commands that the controller can call (typically the commands of the executor). |
| :type commands: List[Command] |
| :param \**kwargs: The parameters specific to the ChatAtomicFlow. |
| :type \**kwargs: Dict[str, Any] |
| """ |
| def __init__(self, commands: List[Command], **kwargs): |
| super().__init__(**kwargs) |
| self.system_message_prompt_template = self.system_message_prompt_template.partial( |
| commands=self._build_commands_manual(commands) |
| ) |
|
|
| @staticmethod |
| def _build_commands_manual(commands: List[Command]) -> str: |
| """ This method writes the commands that the ControllerAtomicFlow in string to pass it to the system_message_prompt_template. |
| |
| :param commands: The commands that the controller can call. |
| :type commands: List[Command] |
| :return: The string containing the commands. |
| :rtype: str |
| """ |
| ret = "" |
| for i, command in enumerate(commands): |
| command_input_json_schema = json.dumps( |
| {input_arg: f"YOUR_{input_arg.upper()}" for input_arg in command.input_args}) |
| ret += f"{i + 1}. {command.name}: {command.description} Input arguments (given in the JSON schema): {command_input_json_schema}\n" |
| return ret |
|
|
| @classmethod |
| def instantiate_from_config(cls, config): |
| """ This method instantiates the flow from a configuration file. |
| |
| :param config: The configuration of the flow. |
| :type config: Dict[str, Any] |
| :return: The instantiated flow. |
| :rtype: ControllerAtomicFlow |
| """ |
| flow_config = deepcopy(config) |
|
|
| kwargs = {"flow_config": flow_config} |
|
|
| |
| kwargs.update(cls._set_up_prompts(flow_config)) |
| kwargs.update(cls._set_up_backend(flow_config)) |
|
|
| |
| commands = flow_config["commands"] |
| commands = [ |
| Command(name, command_conf["description"], command_conf["input_args"]) for name, command_conf in |
| commands.items() |
| ] |
| kwargs.update({"commands": commands}) |
|
|
| |
| return cls(**kwargs) |
|
|
| def run(self, input_message: FlowMessage): |
| """ This method runs the flow. Note that the response of the LLM is in the JSON format, but it's not a hard constraint (it can hallucinate and return an invalid JSON) |
| |
| :param input_message: The input data of the flow. |
| :type input_message: FlowMessage |
| """ |
| |
| input_data = input_message.data |
| |
| if "goal" in input_data: |
| self.flow_state["goal"] = input_data["goal"] |
| |
| else: |
| input_data["goal"] = self.flow_state["goal"] |
| |
| api_output = self.query_llm(input_data) |
| |
| response = json.loads(api_output) |
| |
| reply = self._package_output_message( |
| input_message=input_message, |
| response=response |
| ) |
| |
| self.reply_to_message(reply = reply, to = input_message) |
|
|