There are several frameworks that support multi-agent communication. For example, autogen, crewai, or AGENTS. The problem here each framework implements its own infra for LLM and don’t play nice with llamaIndex. This is deep dive into how agents
framework works and how they design multi-agent env.
Initialization Link to heading
Starting with the code from examples directory where it calls init
and run
.
agents,sop,environment = init(args.agent)
prepare(agents, sop, environment)
run(agents,sop,environment)
init
creates env, agents and SOP from config files. And connect them together.
def init(config):
sop = SOP.from_config(config)
agents,roles_to_names,names_to_roles = Agent.from_config(config)
environment = Environment.from_config(config)
...
environment.agents = agents
environment.roles_to_names,environment.names_to_roles = roles_to_names,names_to_roles
sop.roles_to_names,sop.names_to_roles = roles_to_names,names_to_roles
for name,agent in agents.items():
agent.environment = environment
run
starts a loop for SOP, env, and agents to work together.
def run(agents,sop,environment):
while True:
current_state,current_agent= sop.next(environment,agents)
action = current_agent.step(current_state,True)
memory = process(action)
environment.update_memory(memory,current_state)
SOP next Link to heading
It starts sop.next()
where it takes env and agents. It returns current_state and current_agent.
next
is important because it uses relations and states in the config files to define the next state and agent to run by calling 2 methods transit
and route
.
def next(self, environment, agents):
"""
Determine the next state and the agent that needs action based on the current situation
"""
# If it is the first time to enter this state
if self.current_state.is_begin:
environment.current_chat_history_idx = len(environment.shared_memory["long_term_memory"])
agent_name = self.roles_to_names[self.current_state.name][self.current_state.begin_role]
agent = agents[agent_name]
return self.current_state,agent
# get relevant history
query = environment.shared_memory["long_term_memory"][-1].content
relevant_history = get_relevant_history(
query,
environment.shared_memory["long_term_memory"][:-1],
environment.shared_memory["chat_embeddings"][:-1],
)
relevant_history = Memory.get_chat_history(relevant_history)
next_state = self.transit(
chat_history=environment.shared_memory["long_term_memory"][
environment.current_chat_history_idx :
],
relevant_history=relevant_history,
environment=environment,
)
# If you enter the termination node, terminate directly
if next_state.name == self.finish_state_name:
self.finished = True
return None, None
self.current_state = next_state
if next_state.name!=self.current_state.name:
self.current_state.index = (self.current_state.index+1) % len(self.current_state.roles)
# If it is the first time to enter the state and there is a begin query, it will be directly assigned to the begin role.
if self.current_state.is_begin and self.current_state.begin_role:
environment.current_chat_history_idx = len(environment.shared_memory["long_term_memory"])
agent_name = self.roles_to_names[self.current_state.name][self.current_state.begin_role]
agent = agents[agent_name]
return self.current_state,agent
next_agent = self.route(
chat_history=environment.shared_memory["long_term_memory"][
environment.current_chat_history_idx :
],
agents = agents,
relevant_history=relevant_history,
)
return self.current_state, next_agent
Basically, transit
combines the current state and send LLM to define the next transition if any.
def transit(self, chat_history, **kwargs):
...
# Otherwise, let the controller judge whether to end
judge_system_prompt = controller_dict["judge_system_prompt"] if "judge_system_prompt" in controller_dict else ""
environment_prompt = eval(Get_environment_prompt) if current_state.environment_prompt else ""
transit_system_prompt = eval(Transit_system_prompt)
judge_last_prompt = controller_dict["judge_last_prompt"] if "judge_last_prompt" in controller_dict else ""
transit_last_prompt = eval(Transit_last_prompt)
environment = kwargs["environment"]
environment_summary = environment.shared_memory["short_term_memory"]
chat_history_message = Memory.get_chat_history(chat_history)
query = chat_history[-1].get_query()
chat_messages = [
{
"role": "user",
"content": eval(Transit_message)
}
]
extract_words = controller_dict["judge_extract_words"] if "judge_extract_words" in controller_dict else "end"
response = self.LLM.get_response(
chat_messages, transit_system_prompt, transit_last_prompt, stream=False, **kwargs
)
next_state = (
response if response.isdigit() else extract(response, extract_words)
)
next_state = self.current_state.next_states[next_state]
return next_state
route
determines the next role and there are several ways to determine it rules
, order
or random
. For rules based routing, it sends to LLM and extract back the next role. For other modes, it can do it locally. Eventually it maps the role back to agent for the next step (pun intended)
def route(self, chat_history, **kwargs):
agents = kwargs["agents"]
relevant_history = kwargs["relevant_history"]
controller_type = (
self.controller_dict[self.current_state.name]["controller_type"]
if "controller_type" in self.controller_dict[self.current_state.name]
else "order"
)
if controller_type == "rule":
controller_dict = self.controller_dict[self.current_state.name]
call_last_prompt = controller_dict["call_last_prompt"] if "call_last_prompt" in controller_dict else ""
allocate_prompt = ""
roles = list(set(self.current_state.roles))
for role in roles:
allocate_prompt += eval(Allocate_component)
call_system_prompt = controller_dict["call_system_prompt"] if "call_system_prompt" in controller_dict else ""
environment_prompt = eval(Get_environment_prompt) if self.current_state.environment_prompt else ""
# call_system_prompt + environment + allocate_prompt
call_system_prompt = eval(Call_system_prompt)
query = chat_history[-1].get_query()
last_name = chat_history[-1].send_name
# last_prompt: note + last_prompt + query
call_last_prompt =eval(Call_last_prompt)
chat_history_message = Memory.get_chat_history(chat_history)
# Intermediate historical conversation records
chat_messages = [
{
"role": "user",
"content": eval(Call_message),
}
]
extract_words = controller_dict["call_extract_words"] if "call_extract_words" in controller_dict else "end"
response = self.LLM.get_response(
chat_messages, call_system_prompt, call_last_prompt, stream=False, **kwargs
)
# get next role
next_role = extract(response, extract_words)
# Speak in order
elif controller_type == "order":
# If there is no begin role, it will be given directly to the first person.
if not self.current_state.current_role:
next_role = self.current_state.roles[0]
# otherwise first
else:
self.current_state.index += 1
self.current_state.index = (self.current_state.index) % len(self.current_state.roles)
next_role = self.current_state.roles[self.current_state.index]
# random speak
elif controller_type == "random":
next_role = random.choice(self.current_state.roles)
# If the next character is not available, pick one at random
if next_role not in self.current_state.roles:
next_role = random.choice(self.current_state.roles)
self.current_state.current_role = next_role
next_agent = agents[self.roles_to_names[self.current_state.name][next_role]]
return next_agent
Agent step Link to heading
In src/agents/Agent/Agent.py
, step
is called with current state from SOP
def step(self, current_state,input=""):
"""
return actions by current state and environment
Return: action(Action)
"""
current_state.chat_nums +=1
state_begin = current_state.is_begin
agent_begin = self.begins[current_state.name]["is_begin"]
self.begins[current_state.name]["is_begin"] = False
current_state.is_begin = False
environment = self.environment
self.current_state = current_state
response = " "
res_dict = {}
if self.is_user:
response = f"{self.name}:{input}"
else:
if len(environment.shared_memory["long_term_memory"])>0:
current_history = self.observe()
self.long_term_memory.append(current_history)
if agent_begin:
response = (char for char in self.begins[current_state.name]["begin_query"])
else:
response,res_dict = self.act()
action_dict = {
"response": response,
"res_dict": res_dict,
"role": self.state_roles[current_state.name],
"name": self.name,
"state_begin" : state_begin,
"agent_begin" : agent_begin,
"is_user" : self.is_user
}
return Action(**action_dict)
Then act
calls the LLM do something.
def act(self):
"""
return actions by the current state
"""
current_state = self.current_state
chat_history = self.long_term_memory
current_LLM = self.LLMs[current_state.name]
system_prompt, last_prompt, res_dict = self.compile()
response = current_LLM.get_response(
chat_history, system_prompt, last_prompt, stream=True
)
return response,res_dict
Or step
calls observe which updates the state of the env with current agent
def observe(self):
"""
Update one's own memory according to the current environment, including: updating short-term memory; updating long-term memory
"""
return self.environment._observe(self)
_observe
just updates the current state in the env.
def _observe(self,agent):
MAX_CHAT_HISTORY = eval(os.environ["MAX_CHAT_HISTORY"])
current_state = agent.current_state
process Link to heading
process
just extract some attributes from Action
return from agent’s step and create a memory.
def process(action):
response = action.response
send_name = action.name
send_role = action.role
if not action.is_user:
print(f"{send_name}({send_role}):{response}")
memory = Memory(send_role, send_name, response)
return memory
environment update_memory Link to heading
env summary is called by update_memory
which also calls the update memory for the agents.
def update_memory(self, memory, current_state):
"""
update chat embbedings and long term memory,short term memory,agents long term memory
"""
MAX_CHAT_HISTORY = eval(os.environ["MAX_CHAT_HISTORY"])
self.shared_memory["long_term_memory"].append(memory)
current_embedding = get_embedding(memory.content)
if "chat_embeddings" not in self.shared_memory:
self.shared_memory["chat_embeddings"] = current_embedding
else:
self.shared_memory["chat_embeddings"] = torch.cat(
[self.shared_memory["chat_embeddings"], current_embedding], dim=0
)
if len(self.shared_memory["long_term_memory"]) % MAX_CHAT_HISTORY == 0:
summary = self.summary(current_state)
self.shared_memory["short_term_memory"] = summary
self.agents[memory.send_name].update_memory(memory)
For the env, only summary calls the LLM to get the summary of the env.
def summary(self, current_state):
"""
Summarize the situation in the current environment every once in a while
"""
MAX_CHAT_HISTORY = eval(os.environ["MAX_CHAT_HISTORY"])
current_state_name = current_state.name
if len(self.shared_memory["long_term_memory"])>1:
query = self.shared_memory["long_term_memory"][-1].content
relevant_history = get_relevant_history(
query,
self.shared_memory["long_term_memory"][:-1],
self.shared_memory["chat_embeddings"][:-1],
)
relevant_history = Memory.get_chat_history(relevant_history)
else:
relevant_history = ""
chat_history = Memory.get_chat_history(
self.shared_memory["long_term_memory"][-MAX_CHAT_HISTORY + 1 :]
)
summary = self.shared_memory["short_term_memory"]
...
current_memory = eval(Environment_summary_memory)
environment_prompt = self.environment_prompt[current_state_name]
summary_system_prompt = self.summary_system_prompt[current_state_name]
environment_summary_system_prompt = eval(Environment_summary_system_prompt)
response = self.LLMs[current_state_name].get_response(None,
Back to update_memory
as it calls Agent’s update_memory which also call LLM (this is second place agent calls LLM other than act)
def update_memory(self, memory):
self.long_term_memory.append(
{"role": "assistant", "content": memory.content}
)
MAX_CHAT_HISTORY = eval(os.environ["MAX_CHAT_HISTORY"])
environment = self.environment
current_chat_history_idx = environment.current_chat_history_idx if environment.environment_type == "competive" else 0
current_long_term_memory = environment.shared_memory["long_term_memory"][current_chat_history_idx:]
last_conversation_idx = environment._get_agent_last_conversation_idx(self,current_long_term_memory)
if len(current_long_term_memory)-last_conversation_idx >= MAX_CHAT_HISTORY:
current_state = self.current_state
current_role = self.state_roles[current_state.name]
current_component_dict = current_state.components[current_role]
# get chat history from new conversation
conversations = environment._get_agent_new_memory(self,current_long_term_memory)
# get summary
summary_prompt = (
current_state.summary_prompt[current_role]
if current_state.summary_prompt
else f"""your name is {self.name},your role is{current_component_dict["style"].role},your task is {current_component_dict["task"].task}.\n"""
)
summary_prompt =eval(Agent_summary_system_prompt)
summary = self.LLMs[current_state.name].get_response(None, summary_prompt,stream = False)
self.short_term_memory = summary