| from aiflows.base_flows import CompositeFlow |
| from aiflows.utils import logging |
| from aiflows.messages import FlowMessage |
| from aiflows.interfaces import KeyInterface |
| from aiflows.data_transformations import RegexFirstOccurrenceExtractor,EndOfInteraction |
| log = logging.get_logger(f"aiflows.{__name__}") |
|
|
|
|
| class ChatHumanFlowModule(CompositeFlow): |
| """ This class implements a Chat Human Flow Module. It is a flow that consists of two sub-flows that are executed circularly. It Contains the following subflows: |
| |
| - A User Flow: A flow makes queries to the Assistant Flow. E.g. The user asks the assistant (LLM) a question. |
| - A Assistant Flow: A flow that responds to queries made by the User Flow. E.g. The assistant (LLM) answers the user's question. |
| |
| To end the interaction, the user must type "\<END\>" |
| |
| An illustration of the flow is as follows: |
| |
| |------> User Flow -----------> | |
| ^ | |
| | | |
| | v |
| |<------ Assistant Flow <-------| |
| |
| *Configuration Parameters*: |
| |
| - `name` (str): The name of the flow. Default: "ChatHumanFlowModule" |
| - `description` (str): A description of the flow. This description is used to generate the help message of the flow. |
| Default: "Flow that enables chatting between a ChatAtomicFlow and a user providing the input." |
| - `max_rounds` (int): The maximum number of rounds the flow can run for. Default: None, which means that there is no limit on the number of rounds. |
| - `early_exit_key` (str): The key that is used to exit the flow. Default: "end_of_interaction" |
| - `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default: |
| - `Assistant Flow`: The configuration of the Assistant Flow. By default, it a ChatAtomicFlow. It default parmaters are defined in ChatAtomicFlowModule. |
| - `User Flow`: The configuration of the User Flow. By default, it a HumanStandardInputFlow. It default parmaters are defined in HumanStandardInputFlowModule. |
| - `topology` (str): (List[Dict[str,Any]]): The topology of the flow which is "circular". |
| By default, the topology is the one shown in the illustration above (the topology is also described in ChatHumanFlowModule.yaml). |
| |
| *Input Interface*: |
| |
| - None. By default, the input interface doesn't expect any input. |
| |
| *Output Interface*: |
| |
| - `end_of_interaction` (bool): Whether the interaction is finished or not. |
| |
| :param \**kwargs: Arguments to be passed to the parent class CircularFlow constructor. |
| :type \**kwargs: Dict[str, Any] |
| """ |
| |
| def __init__(self, **kwargs): |
| super().__init__(**kwargs) |
| |
| |
| self.regex_extractor = RegexFirstOccurrenceExtractor(**self.flow_config["regex_first_occurrence_extractor"]) |
| |
| |
| self.end_of_interaction = EndOfInteraction(**self.flow_config["end_of_interaction"]) |
| |
| self.input_interface_assistant = KeyInterface( |
| keys_to_rename = {"human_input": "query"}, |
| additional_transformations = [self.regex_extractor, self.end_of_interaction] |
| ) |
| |
| self.input_interface_user = KeyInterface() |
| |
| def set_up_flow_state(self): |
| """ This method sets up the flow state. It is called when the flow is executed.""" |
| super().set_up_flow_state() |
| self.flow_state["last_state"] = None |
| self.flow_state["current_round"] = 0 |
| self.flow_state["user_inputs"] = [] |
| self.flow_state["assistant_outputs"] = [] |
| self.flow_state["input_message"] = None |
| self.flow_state["end_of_interaction"] = False |
|
|
| @classmethod |
| def type(cls): |
| """ This method returns the type of the flow.""" |
| return "OpenAIChatHumanFlowModule" |
| |
| def max_rounds_reached(self): |
| """ This method checks if the maximum number of rounds has been reached. If the maximum number of rounds has been reached, it returns True. Otherwise, it returns False.""" |
| return self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"] |
| |
| def generate_reply(self): |
| """ This method generates the reply message. It is called when the interaction is finished. |
| :param input_message: The input message to the flow. |
| :type input_message: FlowMessage |
| """ |
| reply = self.package_output_message( |
| input_message = self.flow_state["input_message"], |
| response = { |
| "user_inputs": self.flow_state["user_inputs"], |
| "assistant_outputs": self.flow_state["assistant_outputs"], |
| "end_of_interaction": self.flow_state["end_of_interaction"] |
| }, |
| ) |
| |
| self.send_message( |
| reply |
| ) |
| |
| |
| |
| def call_to_user(self,input_message): |
| """ This method calls the User Flow. (Human) |
| |
| :param input_message: The input message to the flow. |
| :type input_message: FlowMessage |
| """ |
| msg = self.input_interface_user(input_message) |
| self.flow_state["assistant_outputs"].append(msg.data["api_output"]) |
| message = self.package_input_message(data=msg.data) |
|
|
| if self.max_rounds_reached(): |
| self.generate_reply() |
| else: |
| self.subflows["User"].get_reply( |
| message, |
| ) |
| self.flow_state["last_state"] = "User" |
| |
| self.flow_state["current_round"] += 1 |
| |
| |
| |
| def call_to_assistant(self,input_message): |
| """ This method calls the Assistant Flow. |
| |
| :param input_message: The input message to the flow. |
| :type input_message: FlowMessage |
| """ |
| message = self.input_interface_assistant(input_message) |
| message = self.package_input_message(data=message.data) |
| |
| if self.flow_state["last_state"] is None: |
| self.flow_state["input_message"] = input_message |
| |
| else: |
| self.flow_state["user_inputs"].append(input_message.data["query"]) |
| |
| if message.data["end_of_interaction"]: |
| self.flow_state["end_of_interaction"] = True |
| self.generate_reply() |
| |
| else: |
| self.subflows["Assistant"].get_reply( |
| message, |
| ) |
| self.flow_state["last_state"] = "Assistant" |
| |
| def run(self,input_message: FlowMessage): |
| """ This method runs the flow. It is the main method of the flow and it is called when the flow is executed. |
| |
| :param input_message: The input message to the flow. |
| :type input_message: FlowMessage |
| """ |
| last_state = self.flow_state["last_state"] |
| |
| if last_state is None or last_state == "User": |
| self.call_to_assistant(input_message=input_message) |
| |
| |
| elif last_state == "Assistant": |
| self.call_to_user(input_message=input_message) |
| |
| |
| |
| |
| |
| |
| |
| |
|
|