There are several frameworks that support multi-agent communication. For example, autogen, crewai, or AGENTS. The problem here each framework implements its own infra for LLM and don’t play nice with llamaIndex. This is deep dive into how agents framework works and how they design multi-agent env.

Initialization Link to heading

Starting with the code from examples directory where it calls init and run.

agents,sop,environment = init(args.agent)
prepare(agents, sop, environment)
run(agents,sop,environment)

init creates env, agents and SOP from config files. And connect them together.

def init(config):
    sop = SOP.from_config(config)
    agents,roles_to_names,names_to_roles = Agent.from_config(config)
    environment = Environment.from_config(config)
    ...
    environment.agents = agents
    environment.roles_to_names,environment.names_to_roles = roles_to_names,names_to_roles
    sop.roles_to_names,sop.names_to_roles = roles_to_names,names_to_roles

    for name,agent in agents.items():
        agent.environment = environment

run starts a loop for SOP, env, and agents to work together.

def run(agents,sop,environment):
    while True:
        current_state,current_agent= sop.next(environment,agents)
        action = current_agent.step(current_state,True)
        memory = process(action)
        environment.update_memory(memory,current_state)

SOP next Link to heading

It starts sop.next() where it takes env and agents. It returns current_state and current_agent.

next is important because it uses relations and states in the config files to define the next state and agent to run by calling 2 methods transit and route.

    def next(self, environment, agents):
        """
        Determine the next state and the agent that needs action based on the current situation
        """

        # If it is the first time to enter this state

        if self.current_state.is_begin:
            environment.current_chat_history_idx = len(environment.shared_memory["long_term_memory"])
            agent_name = self.roles_to_names[self.current_state.name][self.current_state.begin_role]
            agent = agents[agent_name]
            return self.current_state,agent

        # get relevant history
        query = environment.shared_memory["long_term_memory"][-1].content
        relevant_history = get_relevant_history(
            query,
            environment.shared_memory["long_term_memory"][:-1],
            environment.shared_memory["chat_embeddings"][:-1],
        )
        relevant_history = Memory.get_chat_history(relevant_history)

        next_state = self.transit(
            chat_history=environment.shared_memory["long_term_memory"][
                environment.current_chat_history_idx :
            ],
            relevant_history=relevant_history,
            environment=environment,
        )

        # If you enter the termination node, terminate directly
        if next_state.name == self.finish_state_name:
            self.finished = True
            return None, None

        self.current_state = next_state
        if next_state.name!=self.current_state.name:
            self.current_state.index = (self.current_state.index+1) % len(self.current_state.roles)

        # If it is the first time to enter the state and there is a begin query, it will be directly assigned to the begin role.
        if self.current_state.is_begin and self.current_state.begin_role:
            environment.current_chat_history_idx = len(environment.shared_memory["long_term_memory"])
            agent_name = self.roles_to_names[self.current_state.name][self.current_state.begin_role]
            agent = agents[agent_name]
            return self.current_state,agent


        next_agent = self.route(
            chat_history=environment.shared_memory["long_term_memory"][
                environment.current_chat_history_idx :
            ],
            agents = agents,
            relevant_history=relevant_history,
        )

        return self.current_state, next_agent

Basically, transit combines the current state and send LLM to define the next transition if any.

    def transit(self, chat_history, **kwargs):
        ...

            # Otherwise, let the controller judge whether to end
            judge_system_prompt = controller_dict["judge_system_prompt"] if "judge_system_prompt" in controller_dict else ""
            environment_prompt = eval(Get_environment_prompt) if current_state.environment_prompt else ""
            transit_system_prompt = eval(Transit_system_prompt)

            judge_last_prompt = controller_dict["judge_last_prompt"] if "judge_last_prompt" in controller_dict else ""
            transit_last_prompt = eval(Transit_last_prompt)

            environment = kwargs["environment"]
            environment_summary = environment.shared_memory["short_term_memory"]
            chat_history_message = Memory.get_chat_history(chat_history)
            query = chat_history[-1].get_query()

            chat_messages = [
                {
                    "role": "user",
                    "content": eval(Transit_message)
                }
            ]

            extract_words = controller_dict["judge_extract_words"] if "judge_extract_words" in controller_dict else "end"


            response = self.LLM.get_response(
                chat_messages, transit_system_prompt, transit_last_prompt, stream=False, **kwargs
            )
            next_state = (
                response if response.isdigit() else extract(response, extract_words)
            )

        next_state = self.current_state.next_states[next_state]
        return next_state

route determines the next role and there are several ways to determine it rules, order or random. For rules based routing, it sends to LLM and extract back the next role. For other modes, it can do it locally. Eventually it maps the role back to agent for the next step (pun intended)

    def route(self, chat_history, **kwargs):

        agents = kwargs["agents"]



            relevant_history = kwargs["relevant_history"]
            controller_type = (
                self.controller_dict[self.current_state.name]["controller_type"]
                if "controller_type" in self.controller_dict[self.current_state.name]
                else "order"
            )


            if controller_type == "rule":
                controller_dict = self.controller_dict[self.current_state.name]

                call_last_prompt = controller_dict["call_last_prompt"] if "call_last_prompt" in controller_dict else ""

                allocate_prompt = ""
                roles = list(set(self.current_state.roles))
                for role in roles:
                    allocate_prompt += eval(Allocate_component)

                call_system_prompt = controller_dict["call_system_prompt"]  if "call_system_prompt" in controller_dict else ""
                environment_prompt = eval(Get_environment_prompt) if self.current_state.environment_prompt else ""
                # call_system_prompt + environment + allocate_prompt
                call_system_prompt = eval(Call_system_prompt)

                query = chat_history[-1].get_query()
                last_name = chat_history[-1].send_name
                # last_prompt: note + last_prompt + query
                call_last_prompt =eval(Call_last_prompt)


                chat_history_message = Memory.get_chat_history(chat_history)
                # Intermediate historical conversation records
                chat_messages = [
                    {
                        "role": "user",
                        "content": eval(Call_message),
                    }
                ]

                extract_words = controller_dict["call_extract_words"] if "call_extract_words" in controller_dict else "end"

                response = self.LLM.get_response(
                    chat_messages, call_system_prompt, call_last_prompt, stream=False, **kwargs
                )

                # get next role
                next_role = extract(response, extract_words)

            # Speak in order
            elif controller_type == "order":
                # If there is no begin role, it will be given directly to the first person.
                if not self.current_state.current_role:
                    next_role = self.current_state.roles[0]
                # otherwise first
                else:
                    self.current_state.index += 1
                    self.current_state.index =  (self.current_state.index) % len(self.current_state.roles)
                    next_role = self.current_state.roles[self.current_state.index]
            # random speak
            elif controller_type == "random":
                next_role = random.choice(self.current_state.roles)

        # If the next character is not available, pick one at random
        if next_role not in self.current_state.roles:
            next_role = random.choice(self.current_state.roles)

        self.current_state.current_role = next_role

        next_agent = agents[self.roles_to_names[self.current_state.name][next_role]]

        return next_agent

Agent step Link to heading

In src/agents/Agent/Agent.py, step is called with current state from SOP

    def step(self, current_state,input=""):
        """
        return actions by current state and environment
        Return: action(Action)
        """

        current_state.chat_nums +=1
        state_begin = current_state.is_begin
        agent_begin = self.begins[current_state.name]["is_begin"]
        self.begins[current_state.name]["is_begin"] = False
        current_state.is_begin = False
        environment = self.environment

        self.current_state = current_state

        response = " "
        res_dict = {}

        if self.is_user:
            response = f"{self.name}:{input}"
        else:
            if len(environment.shared_memory["long_term_memory"])>0:
                current_history = self.observe()
                self.long_term_memory.append(current_history)
            if agent_begin:
                response = (char for char in self.begins[current_state.name]["begin_query"])
            else:
                response,res_dict = self.act()


        action_dict =  {
            "response": response,
            "res_dict": res_dict,
            "role": self.state_roles[current_state.name],
            "name": self.name,
            "state_begin" : state_begin,
            "agent_begin" : agent_begin,
            "is_user" : self.is_user
        }
        return  Action(**action_dict)

Then act calls the LLM do something.

    def act(self):
        """
        return actions by the current state
        """
        current_state = self.current_state
        chat_history = self.long_term_memory
        current_LLM = self.LLMs[current_state.name]

        system_prompt, last_prompt, res_dict = self.compile()



        response = current_LLM.get_response(
            chat_history, system_prompt, last_prompt, stream=True
        )
        return response,res_dict

Or step calls observe which updates the state of the env with current agent

   def observe(self):
        """
        Update one's own memory according to the current environment, including: updating short-term memory; updating long-term memory
        """
        return self.environment._observe(self)

_observe just updates the current state in the env.

    def _observe(self,agent):
        MAX_CHAT_HISTORY = eval(os.environ["MAX_CHAT_HISTORY"])
        current_state = agent.current_state

process Link to heading

process just extract some attributes from Action return from agent’s step and create a memory.

def process(action):
    response = action.response
    send_name = action.name
    send_role = action.role
    if not action.is_user:
        print(f"{send_name}({send_role}):{response}")
    memory = Memory(send_role, send_name, response)
    return memory

environment update_memory Link to heading

env summary is called by update_memory which also calls the update memory for the agents.


    def update_memory(self, memory, current_state):
        """
        update chat embbedings and long term memory,short term memory,agents long term memory
        """
        MAX_CHAT_HISTORY = eval(os.environ["MAX_CHAT_HISTORY"])
        self.shared_memory["long_term_memory"].append(memory)
        current_embedding = get_embedding(memory.content)
        if "chat_embeddings" not in self.shared_memory:
            self.shared_memory["chat_embeddings"] = current_embedding
        else:
            self.shared_memory["chat_embeddings"] = torch.cat(
                [self.shared_memory["chat_embeddings"], current_embedding], dim=0
            )
        if len(self.shared_memory["long_term_memory"]) % MAX_CHAT_HISTORY == 0:
            summary = self.summary(current_state)
            self.shared_memory["short_term_memory"] = summary

        self.agents[memory.send_name].update_memory(memory)

For the env, only summary calls the LLM to get the summary of the env.

    def summary(self, current_state):
        """
        Summarize the situation in the current environment every once in a while
        """
        MAX_CHAT_HISTORY = eval(os.environ["MAX_CHAT_HISTORY"])
        current_state_name = current_state.name

        if len(self.shared_memory["long_term_memory"])>1:
            query = self.shared_memory["long_term_memory"][-1].content
            relevant_history = get_relevant_history(
                query,
                self.shared_memory["long_term_memory"][:-1],
                self.shared_memory["chat_embeddings"][:-1],
            )

            relevant_history = Memory.get_chat_history(relevant_history)
        else:
            relevant_history = ""
        chat_history = Memory.get_chat_history(
            self.shared_memory["long_term_memory"][-MAX_CHAT_HISTORY + 1 :]
        )
        summary = self.shared_memory["short_term_memory"]

        ...
        current_memory = eval(Environment_summary_memory)
        environment_prompt = self.environment_prompt[current_state_name]
        summary_system_prompt = self.summary_system_prompt[current_state_name]

        environment_summary_system_prompt = eval(Environment_summary_system_prompt)
        response = self.LLMs[current_state_name].get_response(None,

Back to update_memory as it calls Agent’s update_memory which also call LLM (this is second place agent calls LLM other than act)

    def update_memory(self, memory):
        self.long_term_memory.append(
            {"role": "assistant", "content": memory.content}
        )

        MAX_CHAT_HISTORY = eval(os.environ["MAX_CHAT_HISTORY"])
        environment = self.environment
        current_chat_history_idx = environment.current_chat_history_idx if environment.environment_type == "competive" else 0

        current_long_term_memory = environment.shared_memory["long_term_memory"][current_chat_history_idx:]
        last_conversation_idx = environment._get_agent_last_conversation_idx(self,current_long_term_memory)
        if len(current_long_term_memory)-last_conversation_idx >= MAX_CHAT_HISTORY:
            current_state = self.current_state
            current_role = self.state_roles[current_state.name]
            current_component_dict = current_state.components[current_role]

            # get chat history from new conversation
            conversations = environment._get_agent_new_memory(self,current_long_term_memory)

            # get summary
            summary_prompt = (
                current_state.summary_prompt[current_role]
                if current_state.summary_prompt
                else f"""your name is {self.name},your role is{current_component_dict["style"].role},your task is {current_component_dict["task"].task}.\n"""
            )
            summary_prompt =eval(Agent_summary_system_prompt)
            summary = self.LLMs[current_state.name].get_response(None, summary_prompt,stream = False)
            self.short_term_memory = summary