AgentVerse is yet another agent framework but this one is interesting because:

  • It has game-like UI. UI is implemented with Phaser which is HTML game development framework(live and learn)
  • This is very thing LLamaIndex agent I can find (see ToolAgent)

AgentVerse provides 3 top level classes and CLI programs:

  • Simulation CLI
  • Simulation GUI
  • Task Solving

I will look deeper into task solving TaskSolving because it looks the most relevant at the moment. Starting with agentverse_command/main_tasksolving_cli.py which is really simple, just parsing task and task directory and creating TaskSolving object and run() it.

# from agentverse.agentverse import AgentVerse
from agentverse.tasksolving import TaskSolving

....

def cli_main():
    agentversepipeline = TaskSolving.from_task(args.task, args.tasks_dir)
    agentversepipeline.run()

if __name__ == "__main__":
    cli_main()

Run forest run Link to heading

run just calls the env run and waits until it’s done.

    def run(self):
        """Run the environment from scratch until it is done."""
        self.environment.reset()
        self.logs = []
        advice = "No advice yet."
        previous_plan = "No solution yet."
        while not self.environment.is_done():
            result, advice, previous_plan, logs, success = asyncio.run(
                self.environment.step(advice, previous_plan)
            )
            self.logs += logs
        self.environment.report_metrics()
        self.save_result(previous_plan, result, self.environment.get_spend())
        return previous_plan, result, self.logs

is_done waits until the environment succeeds (or max count is hit).

    def is_done(self):
        """Check if the environment is done"""
        return self.cnt_turn >= self.max_turn or self.success

They have reporting and cost calculation report_metrics and get_spend. They iterate agents get_spend()

    def get_spend(self):
        total_spent = sum([agent.get_spend() for (_, agent) in self.iter_agents()])
        return total_spent

    def report_metrics(self) -> None:
        logger.info("", "Agent spend:", Fore.GREEN)
        for role, agent in self.iter_agents():
            name = agent.name.split(":")[0]
            logger.info(
                "",
                f"Agent (Role: {role}) {name}: {agent.get_spend_formatted()}",
                Fore.GREEN,
            )
        logger.info("", f"Total spent: ${self.get_spend():.6f}", Fore.GREEN)

We will talk more about environment later.

TaskSolving Link to heading

So, what does TaskSolving do? By default, it parses brainstorming from agentverse/tasks/tasksolving/brainstorming/config.yaml by calling prepare_task_config

It also loads the agents and environment based on config.yaml

    @classmethod
    def from_task(cls, task: str, tasks_dir: str):
        """Build an AgentVerse from a task name.
        The task name should correspond to a directory in `tasks` directory.
        Then this method will load the configuration from the yaml file in that directory.
        """
        # Prepare the config of the task
        task_config = prepare_task_config(task, tasks_dir)

        # Build the environment
        env_config = task_config["environment"]

        # Build agents for all pipeline (task)
        agents = {}
        for i, agent_config in enumerate(task_config["agents"]):
            if agent_config.get("agent_type", "") == "critic":
                agent = load_agent(agent_config)
                agents[AGENT_TYPES.CRITIC] = [
                    copy.deepcopy(agent)
                    for _ in range(task_config.get("cnt_agents", 1) - 1)
                ]
            else:
                agent_type = AGENT_TYPES.from_string(agent_config.get("agent_type", ""))
                agents[agent_type] = load_agent(agent_config)

        env_config["agents"] = agents

        env_config["task_description"] = task_config.get("task_description", "")
        env_config["max_rounds"] = task_config.get("max_rounds", 3)

        environment: BasicEnvironment = load_environment(env_config)

        return cls(environment=environment, task=task)

Let’s dig deeper into these load tasks in agentverse/initialization.py

prepare_task_config Link to heading

prepare_task_config loads yaml.config and some other infra stuff based on that yaml:

  • Memory
  • LLM
  • tools
def prepare_task_config(task, tasks_dir):
    """Read the yaml config of the given task in `tasks` directory."""
    all_task_dir = tasks_dir
    task_path = os.path.join(all_task_dir, task)
    ...
    task_config = yaml.safe_load(open(config_path))

    for i, agent_configs in enumerate(task_config["agents"]):
        agent_configs["memory"] = load_memory(agent_configs.get("memory", {}))
        if agent_configs.get("tool_memory", None) is not None:
            agent_configs["tool_memory"] = load_memory(agent_configs["tool_memory"])

        llm = load_llm(agent_configs.get("llm", "text-davinci-003"))
        agent_configs["llm"] = llm

        ...

        agent_configs["tools"] = load_tools(agent_configs.get("tools", []))

    return task_config

load_llm Link to heading

Each framework implements its own LLM wrapper. This one is no different. load_llm just looks up LLM from llm_registry

def load_llm(llm_config: Dict):
    llm_type = llm_config.pop("llm_type", "text-davinci-003")

    return llm_registry.build(llm_type, **llm_config)

What populate llm_registry. Let’s start with LLMBase defining common API implemented by each LLM class.

class BaseLLM(BaseModel):
    args: BaseModelArgs = Field(default_factory=BaseModelArgs)
    max_retry: int = Field(default=3)
    client_args: Optional[Dict] = Field(default={})
    is_azure: bool = Field(default=False)

    @abstractmethod
    def get_spend(self) -> float:
        """
        Number of USD spent
        """
        return -1.0

    @abstractmethod
    def generate_response(self, **kwargs) -> LLMResult:
        pass

    @abstractmethod
    def agenerate_response(self, **kwargs) -> LLMResult:
        pass

class BaseChatModel(BaseLLM):
    pass

class BaseCompletionModel(BaseLLM):
    pass

Currently, they support openAI, Azura and Local VLLM. They pick up env vars and initialize llm objects.

    api_key = None
    base_url = None
    model_name = None
    OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
    OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL")
    AZURE_API_KEY = os.environ.get("AZURE_OPENAI_API_KEY")
    AZURE_API_BASE = os.environ.get("AZURE_OPENAI_API_BASE")
    VLLM_BASE_URL = os.environ.get("VLLM_BASE_URL")
    VLLM_API_KEY = os.environ.get("VLLM_API_KEY", "EMPTY")

    if not OPENAI_API_KEY and not AZURE_API_KEY:
        logger.warn(
            "OpenAI API key is not set. Please set an environment variable OPENAI_API_KEY or "
            "AZURE_OPENAI_API_KEY."
        )
    elif OPENAI_API_KEY:
        DEFAULT_CLIENT = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL)
        DEFAULT_CLIENT_ASYNC = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL)
        api_key = OPENAI_API_KEY
        base_url = OPENAI_BASE_URL
    elif AZURE_API_KEY:
        DEFAULT_CLIENT = AzureOpenAI(
            api_key=AZURE_API_KEY,
            azure_endpoint=AZURE_API_BASE,
            api_version="2024-02-15-preview",
        )
        DEFAULT_CLIENT_ASYNC = AsyncAzureOpenAI(
            api_key=AZURE_API_KEY,
            azure_endpoint=AZURE_API_BASE,
        )
        api_key = AZURE_API_KEY
        base_url = AZURE_API_BASE
    if VLLM_BASE_URL:
        if model_name := get_llm_server_modelname(VLLM_BASE_URL, VLLM_API_KEY, logger):
            # model_name = /mnt/llama/hf_models/TheBloke_Llama-2-70B-Chat-GPTQ
            # transform to TheBloke/Llama-2-70B-Chat-GPTQ
            hf_model_name = model_name.split("/")[-1].replace("_", "/")
            LOCAL_LLMS.append(model_name)
            LOCAL_LLMS_MAPPING[model_name] = {
                "hf_model_name": hf_model_name,
                "base_url": VLLM_BASE_URL,
                "api_key": VLLM_API_KEY if VLLM_API_KEY else "EMPTY",
            }
            logger.info(f"Using vLLM model: {hf_model_name}")
    if hf_model_name := get_llm_server_modelname(
        "http://localhost:5000", logger=logger
    ):
        # meta-llama/Llama-2-7b-chat-hf
        # transform to llama-2-7b-chat-hf
        short_model_name = model_name.split("/")[-1].lower()
        LOCAL_LLMS.append(short_model_name)
        LOCAL_LLMS_MAPPING[short_model_name] = {
            "hf_model_name": hf_model_name,
            "base_url": "http://localhost:5000/v1",
            "api_key": "EMPTY",
        }

        logger.info(f"Using FSChat model: {model_name}")

Then, They register the with model names.

# To support your own local LLMs, register it here and add it into LOCAL_LLMS.
@llm_registry.register("gpt-35-turbo")
@llm_registry.register("gpt-3.5-turbo")
@llm_registry.register("gpt-4")
@llm_registry.register("vllm")
@llm_registry.register("local")
class OpenAIChat(BaseChatModel):
    args: OpenAIChatArgs = Field(default_factory=OpenAIChatArgs)
    client_args: Optional[Dict] = Field(
        default={"api_key": api_key, "base_url": base_url}
    )
    is_azure: bool = Field(default=False)

    total_prompt_tokens: int = 0
    total_completion_tokens: int = 0

load_tools Link to heading

The tools are defined in agentverse/tasks/tasksolving/tool_using/tools_simplified.json which is using BMTools. (TODO)

def load_tools(tool_config: List[Dict]):
    if len(tool_config) == 0:
        return []
    all_tools_list = []
    for tool in tool_config:
        _, config = load_single_tools(tool["tool_name"], tool["tool_url"])
        all_tools_list += import_all_apis(config)
    return all_tools_list

An example of agent using tools are ones from vacation task.

  • agentverse/tasks/tasksolving/tool_using/vacation/config.yaml
  • agentverse/tasks/tasksolving/tool_using/tools_simplified.json

load_environment and load_agent Link to heading

Both tasks just load the env or agent from env_registry and agent_registry. I guess we have to look at those

def load_environment(env_config: Dict) -> BaseEnvironment:
    env_type = env_config.pop("env_type", "basic")
    return env_registry.build(env_type, **env_config)
def load_agent(agent_config: Dict) -> BaseAgent:
    agent_type = agent_config.pop("agent_type", "conversation")
    agent = agent_registry.build(agent_type, **agent_config)
    return agent

Agents Link to heading

BaseAgent are defined in agentverse/agents/base.py with some API.

class BaseAgent(BaseModel):
    name: str
    llm: BaseLLM
    output_parser: OutputParser
    prepend_prompt_template: str = Field(default="")
    append_prompt_template: str = Field(default="")
    prompt_template: str = Field(default="")
    role_description: str = Field(default="")
    memory: BaseMemory = Field(default_factory=ChatHistoryMemory)
    memory_manipulator: BaseMemoryManipulator = Field(
        default_factory=BaseMemoryManipulator
    )
    max_retry: int = Field(default=3)
    receiver: Set[str] = Field(default=set({"all"}))
    async_mode: bool = Field(default=True)


    @abstractmethod
    def step(self, env_description: str = "") -> Message:
        """Get one step response"""
        pass

    @abstractmethod
    def astep(self, env_description: str = "") -> Message:
        """Asynchronous version of step"""
        pass

Specific agents extend BaseAgent and register with agent_registry

@agent_registry.register("executor")
class ExecutorAgent(BaseAgent):
    max_history: int = 5

    def step(
        self, task_description: str, solution: str, tools: List[dict] = [], **kwargs
    ) -> ExecutorMessage:
        pass

    async def astep(
        self, task_description: str, solution: str, tools: List[dict] = [], **kwargs
    ) -> ExecutorMessage:
        logger.debug("", self.name, Fore.MAGENTA)
        prepend_prompt, append_prompt, prompt_token = self.get_all_prompts(
            task_description=task_description,
            solution=solution,
            agent_name=self.name,
            **kwargs,
        )

There is specific type of agents called ToolAgent which calls a tool to get an observation. This is very similar to LLamaIndex ReAct agent. See the call and acall below. It also prepares the prompts by listing all available tools.

agentverse/agents/simulation_agent/tool.py

@agent_registry.register("tool")
class ToolAgent(BaseAgent):
    tools: List[BaseTool] = Field(default=[])
    tool_memory: BaseMemory = Field(default_factory=ChatHistoryMemory)
    verbose: bool = Field(default=False)

    def step(self, env_description: str = "") -> Message:
        parsed_response = None
        tool_observation = [self.tool_memory.to_string()]
        while True:
            prompt = self._fill_prompt_template(env_description, tool_observation)

            for i in range(self.max_retry):
                try:
                    response = self.llm.generate_response(prompt)
                    parsed_response = self.output_parser.parse(respon...


    def _call_tool(self, response: NamedTuple) -> str:
        """Call a tool and return the output"""
        name_to_tool = {tool.name: tool for tool in self.tools}
        if response.tool not in name_to_tool:
            raise ToolNotExistError(response.tool)
        tool = name_to_tool[response.tool]
        observation = tool.run(response.tool_input, verbose=self.verbose)
        return observation

    async def _acall_tool(self, response: NamedTuple) -> str:
        """Call a tool and return the output"""
        name_to_tool = {tool.name: tool for tool in self.tools}
        if response.tool not in name_to_tool:
            raise ToolNotExistError(response.tool)
        tool = name_to_tool[response.tool]
        observation = await tool.arun(response.tool_input, verbose=self.verbose)
        return observation

    def _fill_prompt_template(
        self, env_description: str = "", tool_observation: List[str] = []
    ) -> str:
        """Fill the placeholders in the prompt template

        In the tool agent, these placeholders are supported:
        - ${agent_name}: the name of the agent
        - ${env_description}: the description of the environment
        - ${role_description}: the description of the role of the agent
        - ${chat_history}: the chat history of the agent
        - ${tools}: the list of tools and their usage
        - ${tool_names}: the list of tool names
        - ${tool_observations}: the observation of the tool in this turn
        """
        tools = "\n".join([f"> {tool.name}: {tool.description}" for tool in self.tools])
        tools = tools.replace("{{", "{").replace("}}", "}")
        tool_names = ", ".join([tool.name for tool in self.tools])
        input_arguments = {
            "agent_name": self.name,
            "env_description": env_description,
            "role_description": self.role_description,
            "chat_history": self.memory.to_string(add_sender_prefix=True),
            "tools": tools,
            "tool_names": tool_names,
            "tool_observation": "\n".join(tool_observation),
        }
        return Template(self.prompt_template).safe_substitute(input_arguments)

Environment Link to heading

As usual there is BaseEnvironment with API for child envs to implement.

class BaseEnvironment(BaseModel):
    """
    Base class for environment.

    Args:
        agents: List of agents
        rule: Rule for the environment
        max_turns: Maximum number of turns
        cnt_turn: Current turn number
        last_messages: Messages from last turn
        rule_params: Variables set by the rule
    """

    agents: List[BaseAgent]
    rule: BaseRule
    max_turns: int = 10
    cnt_turn: int = 0
    last_messages: List[Message] = []
    rule_params: Dict = {}

    @abstractmethod
    async def step(self) -> List[Message]:
        """Run one step of the environment"""
        pass

    @abstractmethod
    def reset(self) -> None:
        """Reset the environment"""
        pass

    def report_metrics(self) -> None:
        """Report useful metrics"""
        total_spent = sum([agent.get_spend() for agent in self.agents])
        logger.info(f"Total spent: ${total_spent}")
        pass

    def is_done(self) -> bool:
        """Check if the environment is done"""
        return self.cnt_turn >= self.max_turns

For example, The env for TaskSolving is BasicEnvironment

from agentverse.environments.tasksolving_env.rules import TasksolvingRule

@EnvironmentRegistry.register("task-basic")
class BasicEnvironment(BaseEnvironment):
    rule: TasksolvingRule
    agents: Dict[Enum, Union[BaseAgent, List[BaseAgent]]] = None

    task_description: str

    cnt_turn: int = 0
    max_turn: int = 10
    success: bool = False

    def __init__(self, **kwargs):
        rule_config = kwargs.pop("rule", {})
        role_assigner_config = rule_config.pop(
            "role_assigner", {"type": "role_description"}
        )
        decision_maker_config = rule_config.pop("decision_maker", {"type": "vertical"})
        executor_config = rule_config.pop("executor", {"type": "none"})
        evaluator_config = rule_config.pop("evaluator", {"type": "basic"})
        rule = TasksolvingRule(
            role_assigner_config=role_assigner_config,
            decision_maker_config=decision_maker_config,
            executor_config=executor_config,
            evaluator_config=evaluator_config,
        )
        super().__init__(rule=rule, **kwargs)

Now, This is the most important part as the env, how agent talk based on TasksolvingRule in agentverse/environments/tasksolving_env/rules. The stages implemented in the env step are:

  • assignment
  • plan
  • execute
  • evaluate

step is called multiple times until score (it’s set to 8 but comment says it is arbitrary)

    async def step(
        self, advice: str = "No advice yet.", previous_plan: str = "No solution yet."
    ) -> List[Message]:


        # ================== EXPERT RECRUITMENT ==================
        agents = await self.rule.role_assign(
            self.task_description, self.agents, self.cnt_turn, advice
        )
        description = "\n".join([agent.role_description for agent in agents])

        # ================== EXPERT RECRUITMENT ==================

        # ================== DECISION MAKING ==================
        plan: List[SolverMessage] = await self.rule.decision_making(
            self.task_description, self.agents, previous_plan, advice
        )
        flatten_plan = "\n".join([p.content for p in plan])
        # ================== DECISION MAKING ==================

        # ================== EXECUTION ==================
        result: List[ExecutorMessage] = await self.rule.execute(
            self.task_description, self.agents, plan
        )
        flatten_result = "\n".join([r.content for r in result])

        # ================== EXECUTION ==================

        # ================== EVALUATION ==================
        score, advice = await self.rule.evaluate(
            self.task_description, self.agents, plan, result
        )

        if score is not None and (
            (isinstance(score, bool) and score is True)
            or (isinstance(score, (list, tuple)) and all([s >= 8 for s in score]))
        ):
            # TODO: 8 is an arbitrary threshold
            logs.append({"agent": "system", "content": "Good score! Accept!"})
            logger.info(
                "", f"Good score! Accept! Final Result:\n{flatten_plan}", Fore.GREEN
            )
            self.success = True
        else:
            logs.append({"agent": "system", "content": "Bad score! Reject!"})
            logger.info("", "Bad score! Reject!", Fore.RED)
        self.cnt_turn += 1
        return flatten_result, advice, flatten_plan, logs, self.success

It’s clear that env delegates to the rules. Let’s dig deeper into those.

Rules Link to heading

Looking at TasksolvingRule as env delegates it by calling methods in env step.

    async def role_assign(
        self,
        task_description: str,
        agents: List[BaseAgent],
        cnt_turn: int,
        advice: str = "",
    ) -> List[BaseAgent]:
        """Assign roles to agents"""
        if self.role_assign_only_once and cnt_turn > 0:
            agents = [agents[AGENT_TYPES.SOLVER]] + agents[AGENT_TYPES.CRITIC]
        else:
            agents = await self.role_assigner.astep(
                role_assigner=agents[AGENT_TYPES.ROLE_ASSIGNMENT],
                group_members=[agents[AGENT_TYPES.SOLVER]] + agents[AGENT_TYPES.CRITIC],
                advice=advice,
                task_description=task_description,
            )
            if self.role_assign_only_once and cnt_turn == 0:
                agents[AGENT_TYPES.SOLVER] = agents[0]
                agents[AGENT_TYPES.CRITIC] = agents[1:]
        return agents

The agentverse/environments/tasksolving_env/rules/role_assigner/role_description.py which simply calls astep from the agent assigned for role_description.

class BaseRoleAssigner(BaseModel):
    """
    The base class of role assignment class.
    """

    @abstractmethod
    async def astep(
        self,
        role_assigner: RoleAssignerAgent,
        group_members: List[CriticAgent],
        advice: str = "No advice yet.",
        task_description: str = "",
        *args,
        **kwargs,
    ) -> List[CriticAgent]:
        pass

    def reset(self):
        pass

@role_assigner_registry.register("role_description")
class DescriptionAssigner(BaseRoleAssigner):
    """
    Generates descriptions for each agent.
    """

    async def astep(
        self,
        role_assigner: RoleAssignerAgent,
        group_members: List[CriticAgent],
        advice: str = "No advice yet.",
        task_description: str = "",
        *args,
        **kwargs,
    ) -> List[CriticAgent]:
        assert task_description != ""
        assert len(group_members) > 0

        roles = await role_assigner.astep(advice, task_description, len(group_members))
        if len(roles.content) < len(group_members):
            raise ValueError(
                f"Number of roles ({len(roles.content)}) and number of group members ({len(group_members)}) do not match."
            )
        for role, member in zip(roles.content[: len(group_members)], group_members):
            description = role.strip().strip(".")
            member.role_description = description
            member.name = description

        return group_members

    def reset(self):
        pass