AgentVerse
is yet another agent framework but this one is interesting because:
- It has game-like UI. UI is implemented with
Phaser
which is HTML game development framework(live and learn) - This is very thing LLamaIndex agent I can find (see
ToolAgent
)
AgentVerse provides 3 top level classes and CLI programs:
- Simulation CLI
- Simulation GUI
- Task Solving
I will look deeper into task solving TaskSolving
because it looks the most relevant at the moment. Starting with agentverse_command/main_tasksolving_cli.py
which is really simple, just parsing task and task directory and creating TaskSolving
object and run()
it.
# from agentverse.agentverse import AgentVerse
from agentverse.tasksolving import TaskSolving
....
def cli_main():
agentversepipeline = TaskSolving.from_task(args.task, args.tasks_dir)
agentversepipeline.run()
if __name__ == "__main__":
cli_main()
Run forest run Link to heading
run
just calls the env run
and waits until it’s done.
def run(self):
"""Run the environment from scratch until it is done."""
self.environment.reset()
self.logs = []
advice = "No advice yet."
previous_plan = "No solution yet."
while not self.environment.is_done():
result, advice, previous_plan, logs, success = asyncio.run(
self.environment.step(advice, previous_plan)
)
self.logs += logs
self.environment.report_metrics()
self.save_result(previous_plan, result, self.environment.get_spend())
return previous_plan, result, self.logs
is_done
waits until the environment succeeds (or max count is hit).
def is_done(self):
"""Check if the environment is done"""
return self.cnt_turn >= self.max_turn or self.success
They have reporting and cost calculation report_metrics
and get_spend
. They iterate agents get_spend()
def get_spend(self):
total_spent = sum([agent.get_spend() for (_, agent) in self.iter_agents()])
return total_spent
def report_metrics(self) -> None:
logger.info("", "Agent spend:", Fore.GREEN)
for role, agent in self.iter_agents():
name = agent.name.split(":")[0]
logger.info(
"",
f"Agent (Role: {role}) {name}: {agent.get_spend_formatted()}",
Fore.GREEN,
)
logger.info("", f"Total spent: ${self.get_spend():.6f}", Fore.GREEN)
We will talk more about environment later.
TaskSolving Link to heading
So, what does TaskSolving
do?
By default, it parses brainstorming
from agentverse/tasks/tasksolving/brainstorming/config.yaml
by calling prepare_task_config
It also loads the agents and environment based on config.yaml
@classmethod
def from_task(cls, task: str, tasks_dir: str):
"""Build an AgentVerse from a task name.
The task name should correspond to a directory in `tasks` directory.
Then this method will load the configuration from the yaml file in that directory.
"""
# Prepare the config of the task
task_config = prepare_task_config(task, tasks_dir)
# Build the environment
env_config = task_config["environment"]
# Build agents for all pipeline (task)
agents = {}
for i, agent_config in enumerate(task_config["agents"]):
if agent_config.get("agent_type", "") == "critic":
agent = load_agent(agent_config)
agents[AGENT_TYPES.CRITIC] = [
copy.deepcopy(agent)
for _ in range(task_config.get("cnt_agents", 1) - 1)
]
else:
agent_type = AGENT_TYPES.from_string(agent_config.get("agent_type", ""))
agents[agent_type] = load_agent(agent_config)
env_config["agents"] = agents
env_config["task_description"] = task_config.get("task_description", "")
env_config["max_rounds"] = task_config.get("max_rounds", 3)
environment: BasicEnvironment = load_environment(env_config)
return cls(environment=environment, task=task)
Let’s dig deeper into these load tasks in agentverse/initialization.py
prepare_task_config Link to heading
prepare_task_config
loads yaml.config and some other infra stuff based on that yaml:
- Memory
- LLM
- tools
def prepare_task_config(task, tasks_dir):
"""Read the yaml config of the given task in `tasks` directory."""
all_task_dir = tasks_dir
task_path = os.path.join(all_task_dir, task)
...
task_config = yaml.safe_load(open(config_path))
for i, agent_configs in enumerate(task_config["agents"]):
agent_configs["memory"] = load_memory(agent_configs.get("memory", {}))
if agent_configs.get("tool_memory", None) is not None:
agent_configs["tool_memory"] = load_memory(agent_configs["tool_memory"])
llm = load_llm(agent_configs.get("llm", "text-davinci-003"))
agent_configs["llm"] = llm
...
agent_configs["tools"] = load_tools(agent_configs.get("tools", []))
return task_config
load_llm Link to heading
Each framework implements its own LLM wrapper. This one is no different. load_llm
just looks up LLM from llm_registry
def load_llm(llm_config: Dict):
llm_type = llm_config.pop("llm_type", "text-davinci-003")
return llm_registry.build(llm_type, **llm_config)
What populate llm_registry
. Let’s start with LLMBase
defining common API implemented by each LLM class.
class BaseLLM(BaseModel):
args: BaseModelArgs = Field(default_factory=BaseModelArgs)
max_retry: int = Field(default=3)
client_args: Optional[Dict] = Field(default={})
is_azure: bool = Field(default=False)
@abstractmethod
def get_spend(self) -> float:
"""
Number of USD spent
"""
return -1.0
@abstractmethod
def generate_response(self, **kwargs) -> LLMResult:
pass
@abstractmethod
def agenerate_response(self, **kwargs) -> LLMResult:
pass
class BaseChatModel(BaseLLM):
pass
class BaseCompletionModel(BaseLLM):
pass
Currently, they support openAI, Azura and Local VLLM. They pick up env vars and initialize llm objects.
api_key = None
base_url = None
model_name = None
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL")
AZURE_API_KEY = os.environ.get("AZURE_OPENAI_API_KEY")
AZURE_API_BASE = os.environ.get("AZURE_OPENAI_API_BASE")
VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL")
VLLM_API_KEY = os.environ.get("VLLM_API_KEY", "EMPTY")
if not OPENAI_API_KEY and not AZURE_API_KEY:
logger.warn(
"OpenAI API key is not set. Please set an environment variable OPENAI_API_KEY or "
"AZURE_OPENAI_API_KEY."
)
elif OPENAI_API_KEY:
DEFAULT_CLIENT = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL)
DEFAULT_CLIENT_ASYNC = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL)
api_key = OPENAI_API_KEY
base_url = OPENAI_BASE_URL
elif AZURE_API_KEY:
DEFAULT_CLIENT = AzureOpenAI(
api_key=AZURE_API_KEY,
azure_endpoint=AZURE_API_BASE,
api_version="2024-02-15-preview",
)
DEFAULT_CLIENT_ASYNC = AsyncAzureOpenAI(
api_key=AZURE_API_KEY,
azure_endpoint=AZURE_API_BASE,
)
api_key = AZURE_API_KEY
base_url = AZURE_API_BASE
if VLLM_BASE_URL:
if model_name := get_llm_server_modelname(VLLM_BASE_URL, VLLM_API_KEY, logger):
# model_name = /mnt/llama/hf_models/TheBloke_Llama-2-70B-Chat-GPTQ
# transform to TheBloke/Llama-2-70B-Chat-GPTQ
hf_model_name = model_name.split("/")[-1].replace("_", "/")
LOCAL_LLMS.append(model_name)
LOCAL_LLMS_MAPPING[model_name] = {
"hf_model_name": hf_model_name,
"base_url": VLLM_BASE_URL,
"api_key": VLLM_API_KEY if VLLM_API_KEY else "EMPTY",
}
logger.info(f"Using vLLM model: {hf_model_name}")
if hf_model_name := get_llm_server_modelname(
"http://localhost:5000", logger=logger
):
# meta-llama/Llama-2-7b-chat-hf
# transform to llama-2-7b-chat-hf
short_model_name = model_name.split("/")[-1].lower()
LOCAL_LLMS.append(short_model_name)
LOCAL_LLMS_MAPPING[short_model_name] = {
"hf_model_name": hf_model_name,
"base_url": "http://localhost:5000/v1",
"api_key": "EMPTY",
}
logger.info(f"Using FSChat model: {model_name}")
Then, They register the with model names.
# To support your own local LLMs, register it here and add it into LOCAL_LLMS.
@llm_registry.register("gpt-35-turbo")
@llm_registry.register("gpt-3.5-turbo")
@llm_registry.register("gpt-4")
@llm_registry.register("vllm")
@llm_registry.register("local")
class OpenAIChat(BaseChatModel):
args: OpenAIChatArgs = Field(default_factory=OpenAIChatArgs)
client_args: Optional[Dict] = Field(
default={"api_key": api_key, "base_url": base_url}
)
is_azure: bool = Field(default=False)
total_prompt_tokens: int = 0
total_completion_tokens: int = 0
load_tools Link to heading
The tools are defined in agentverse/tasks/tasksolving/tool_using/tools_simplified.json
which is using BMTools
. (TODO)
def load_tools(tool_config: List[Dict]):
if len(tool_config) == 0:
return []
all_tools_list = []
for tool in tool_config:
_, config = load_single_tools(tool["tool_name"], tool["tool_url"])
all_tools_list += import_all_apis(config)
return all_tools_list
An example of agent using tools are ones from vacation
task.
- agentverse/tasks/tasksolving/tool_using/vacation/config.yaml
- agentverse/tasks/tasksolving/tool_using/tools_simplified.json
load_environment and load_agent Link to heading
Both tasks just load the env or agent from env_registry
and agent_registry
. I guess we have to look at those
def load_environment(env_config: Dict) -> BaseEnvironment:
env_type = env_config.pop("env_type", "basic")
return env_registry.build(env_type, **env_config)
def load_agent(agent_config: Dict) -> BaseAgent:
agent_type = agent_config.pop("agent_type", "conversation")
agent = agent_registry.build(agent_type, **agent_config)
return agent
Agents Link to heading
BaseAgent
are defined in agentverse/agents/base.py
with some API.
class BaseAgent(BaseModel):
name: str
llm: BaseLLM
output_parser: OutputParser
prepend_prompt_template: str = Field(default="")
append_prompt_template: str = Field(default="")
prompt_template: str = Field(default="")
role_description: str = Field(default="")
memory: BaseMemory = Field(default_factory=ChatHistoryMemory)
memory_manipulator: BaseMemoryManipulator = Field(
default_factory=BaseMemoryManipulator
)
max_retry: int = Field(default=3)
receiver: Set[str] = Field(default=set({"all"}))
async_mode: bool = Field(default=True)
@abstractmethod
def step(self, env_description: str = "") -> Message:
"""Get one step response"""
pass
@abstractmethod
def astep(self, env_description: str = "") -> Message:
"""Asynchronous version of step"""
pass
Specific agents extend BaseAgent
and register with agent_registry
@agent_registry.register("executor")
class ExecutorAgent(BaseAgent):
max_history: int = 5
def step(
self, task_description: str, solution: str, tools: List[dict] = [], **kwargs
) -> ExecutorMessage:
pass
async def astep(
self, task_description: str, solution: str, tools: List[dict] = [], **kwargs
) -> ExecutorMessage:
logger.debug("", self.name, Fore.MAGENTA)
prepend_prompt, append_prompt, prompt_token = self.get_all_prompts(
task_description=task_description,
solution=solution,
agent_name=self.name,
**kwargs,
)
There is specific type of agents called ToolAgent
which calls a tool to get an observation. This is very similar to LLamaIndex ReAct agent. See the call and acall below. It also prepares the prompts by listing all available tools.
agentverse/agents/simulation_agent/tool.py
@agent_registry.register("tool")
class ToolAgent(BaseAgent):
tools: List[BaseTool] = Field(default=[])
tool_memory: BaseMemory = Field(default_factory=ChatHistoryMemory)
verbose: bool = Field(default=False)
def step(self, env_description: str = "") -> Message:
parsed_response = None
tool_observation = [self.tool_memory.to_string()]
while True:
prompt = self._fill_prompt_template(env_description, tool_observation)
for i in range(self.max_retry):
try:
response = self.llm.generate_response(prompt)
parsed_response = self.output_parser.parse(respon...
def _call_tool(self, response: NamedTuple) -> str:
"""Call a tool and return the output"""
name_to_tool = {tool.name: tool for tool in self.tools}
if response.tool not in name_to_tool:
raise ToolNotExistError(response.tool)
tool = name_to_tool[response.tool]
observation = tool.run(response.tool_input, verbose=self.verbose)
return observation
async def _acall_tool(self, response: NamedTuple) -> str:
"""Call a tool and return the output"""
name_to_tool = {tool.name: tool for tool in self.tools}
if response.tool not in name_to_tool:
raise ToolNotExistError(response.tool)
tool = name_to_tool[response.tool]
observation = await tool.arun(response.tool_input, verbose=self.verbose)
return observation
def _fill_prompt_template(
self, env_description: str = "", tool_observation: List[str] = []
) -> str:
"""Fill the placeholders in the prompt template
In the tool agent, these placeholders are supported:
- ${agent_name}: the name of the agent
- ${env_description}: the description of the environment
- ${role_description}: the description of the role of the agent
- ${chat_history}: the chat history of the agent
- ${tools}: the list of tools and their usage
- ${tool_names}: the list of tool names
- ${tool_observations}: the observation of the tool in this turn
"""
tools = "\n".join([f"> {tool.name}: {tool.description}" for tool in self.tools])
tools = tools.replace("{{", "{").replace("}}", "}")
tool_names = ", ".join([tool.name for tool in self.tools])
input_arguments = {
"agent_name": self.name,
"env_description": env_description,
"role_description": self.role_description,
"chat_history": self.memory.to_string(add_sender_prefix=True),
"tools": tools,
"tool_names": tool_names,
"tool_observation": "\n".join(tool_observation),
}
return Template(self.prompt_template).safe_substitute(input_arguments)
Environment Link to heading
As usual there is BaseEnvironment
with API for child envs to implement.
class BaseEnvironment(BaseModel):
"""
Base class for environment.
Args:
agents: List of agents
rule: Rule for the environment
max_turns: Maximum number of turns
cnt_turn: Current turn number
last_messages: Messages from last turn
rule_params: Variables set by the rule
"""
agents: List[BaseAgent]
rule: BaseRule
max_turns: int = 10
cnt_turn: int = 0
last_messages: List[Message] = []
rule_params: Dict = {}
@abstractmethod
async def step(self) -> List[Message]:
"""Run one step of the environment"""
pass
@abstractmethod
def reset(self) -> None:
"""Reset the environment"""
pass
def report_metrics(self) -> None:
"""Report useful metrics"""
total_spent = sum([agent.get_spend() for agent in self.agents])
logger.info(f"Total spent: ${total_spent}")
pass
def is_done(self) -> bool:
"""Check if the environment is done"""
return self.cnt_turn >= self.max_turns
For example, The env for TaskSolving
is BasicEnvironment
from agentverse.environments.tasksolving_env.rules import TasksolvingRule
@EnvironmentRegistry.register("task-basic")
class BasicEnvironment(BaseEnvironment):
rule: TasksolvingRule
agents: Dict[Enum, Union[BaseAgent, List[BaseAgent]]] = None
task_description: str
cnt_turn: int = 0
max_turn: int = 10
success: bool = False
def __init__(self, **kwargs):
rule_config = kwargs.pop("rule", {})
role_assigner_config = rule_config.pop(
"role_assigner", {"type": "role_description"}
)
decision_maker_config = rule_config.pop("decision_maker", {"type": "vertical"})
executor_config = rule_config.pop("executor", {"type": "none"})
evaluator_config = rule_config.pop("evaluator", {"type": "basic"})
rule = TasksolvingRule(
role_assigner_config=role_assigner_config,
decision_maker_config=decision_maker_config,
executor_config=executor_config,
evaluator_config=evaluator_config,
)
super().__init__(rule=rule, **kwargs)
Now, This is the most important part as the env, how agent talk based on TasksolvingRule
in agentverse/environments/tasksolving_env/rules
. The stages implemented in the env step
are:
- assignment
- plan
- execute
- evaluate
step
is called multiple times until score (it’s set to 8 but comment says it is arbitrary)
async def step(
self, advice: str = "No advice yet.", previous_plan: str = "No solution yet."
) -> List[Message]:
# ================== EXPERT RECRUITMENT ==================
agents = await self.rule.role_assign(
self.task_description, self.agents, self.cnt_turn, advice
)
description = "\n".join([agent.role_description for agent in agents])
# ================== EXPERT RECRUITMENT ==================
# ================== DECISION MAKING ==================
plan: List[SolverMessage] = await self.rule.decision_making(
self.task_description, self.agents, previous_plan, advice
)
flatten_plan = "\n".join([p.content for p in plan])
# ================== DECISION MAKING ==================
# ================== EXECUTION ==================
result: List[ExecutorMessage] = await self.rule.execute(
self.task_description, self.agents, plan
)
flatten_result = "\n".join([r.content for r in result])
# ================== EXECUTION ==================
# ================== EVALUATION ==================
score, advice = await self.rule.evaluate(
self.task_description, self.agents, plan, result
)
if score is not None and (
(isinstance(score, bool) and score is True)
or (isinstance(score, (list, tuple)) and all([s >= 8 for s in score]))
):
# TODO: 8 is an arbitrary threshold
logs.append({"agent": "system", "content": "Good score! Accept!"})
logger.info(
"", f"Good score! Accept! Final Result:\n{flatten_plan}", Fore.GREEN
)
self.success = True
else:
logs.append({"agent": "system", "content": "Bad score! Reject!"})
logger.info("", "Bad score! Reject!", Fore.RED)
self.cnt_turn += 1
return flatten_result, advice, flatten_plan, logs, self.success
It’s clear that env delegates to the rules. Let’s dig deeper into those.
Rules Link to heading
Looking at TasksolvingRule
as env delegates it by calling methods in env step
.
async def role_assign(
self,
task_description: str,
agents: List[BaseAgent],
cnt_turn: int,
advice: str = "",
) -> List[BaseAgent]:
"""Assign roles to agents"""
if self.role_assign_only_once and cnt_turn > 0:
agents = [agents[AGENT_TYPES.SOLVER]] + agents[AGENT_TYPES.CRITIC]
else:
agents = await self.role_assigner.astep(
role_assigner=agents[AGENT_TYPES.ROLE_ASSIGNMENT],
group_members=[agents[AGENT_TYPES.SOLVER]] + agents[AGENT_TYPES.CRITIC],
advice=advice,
task_description=task_description,
)
if self.role_assign_only_once and cnt_turn == 0:
agents[AGENT_TYPES.SOLVER] = agents[0]
agents[AGENT_TYPES.CRITIC] = agents[1:]
return agents
The agentverse/environments/tasksolving_env/rules/role_assigner/role_description.py
which simply calls astep
from the agent assigned for role_description
.
class BaseRoleAssigner(BaseModel):
"""
The base class of role assignment class.
"""
@abstractmethod
async def astep(
self,
role_assigner: RoleAssignerAgent,
group_members: List[CriticAgent],
advice: str = "No advice yet.",
task_description: str = "",
*args,
**kwargs,
) -> List[CriticAgent]:
pass
def reset(self):
pass
@role_assigner_registry.register("role_description")
class DescriptionAssigner(BaseRoleAssigner):
"""
Generates descriptions for each agent.
"""
async def astep(
self,
role_assigner: RoleAssignerAgent,
group_members: List[CriticAgent],
advice: str = "No advice yet.",
task_description: str = "",
*args,
**kwargs,
) -> List[CriticAgent]:
assert task_description != ""
assert len(group_members) > 0
roles = await role_assigner.astep(advice, task_description, len(group_members))
if len(roles.content) < len(group_members):
raise ValueError(
f"Number of roles ({len(roles.content)}) and number of group members ({len(group_members)}) do not match."
)
for role, member in zip(roles.content[: len(group_members)], group_members):
description = role.strip().strip(".")
member.role_description = description
member.name = description
return group_members
def reset(self):
pass