TL;DR Link to heading

This is a deconstruction of Opendevin user-to-agent development assistant framework.

I will jump around a lot because I have no idea what is going on but this the TL;DR

Opendevin uses the following abstraction to manage data from client to Agents and back using websocket and internal steam and subscribers to these streams.

SessionManager -> Session -> AgentSession -> AgentController -> Agent

Starting opendevin with Ollama Link to heading

The simplest way to run it using docker and Ollama running locally. Here is the command that worked for me

docker run     -it   --pull=always     -e SANDBOX_USER_ID=$(id -u)     -e WORKSPACE_MOUNT_PATH=$WORKSPACE_BASE     -v $WORKSPACE_BASE:/opt/workspace_base     -v /var/run/docker.sock:/var/run/docker.sock     -p 3000:3000     --add-host host.docker.internal:host-gateway     -e LLM_API_KEY="ollama" -e LLM_BASE_URL="http://host.docker.internal:11434" ghcr.io/opendevin/opendevin:0.5

That’s it. It’s up on localhost:3000

Manual Setup Link to heading

If you have some time to kill, here is summary for steps from manual installation.

  • Install python3.11
  • Install Newest version node
  • Install poetry
apt install python3.11
pip install poetry

Then see the make purrr

make build

Agent Link to heading

The story always start with our hero the agent. The agents live in agenthub/. Let’s look at agenthub/browsing_agent/__init__.py where agent is registered with Agent.register

from opendevin.controller.agent import Agent

from .browsing_agent import BrowsingAgent

Agent.register('BrowsingAgent', BrowsingAgent)

The register function is defined in opendevin/controller/agent.py.

    @classmethod
    def register(cls, name: str, agent_cls: Type['Agent']):
        if name in cls._registry:
            raise AgentAlreadyRegisteredError(name)
        cls._registry[name] = agent_cls

You see, _registry is used in several places. For example, The server API returns to the frontend. We will circle back later.

@app.get('/api/options/agents')
async def get_agents():

    agents = Agent.list_agents()
    return agents

BrowsingAgent Link to heading

Let’s start with one of the agent in agenthub. As Agent provide the base class, The concrete agent needs to implement some methods.

  • reset
  • step
  • search_memory

In this case, the browsing agent implements the reset and step.

    def reset(self) -> None:
        super().reset()
        self.cost_accumulator = 0

step creates a prompt using system template and user prompt. It sends the prompt to LLM (with temperature 0). Based on the agent name and prompt, This examines a web page

    def step(self, state: State) -> Action:

        goal = state.get_current_user_intent()
        messages = []

        system_msg = f"""\
        # Instructions
        Review the current state of the page and all other information to find the best
        possible next action to accomplish your goal. Your answer will be interpreted
        and executed by a program, make sure to follow the formatting instructions.

        # Goal:
        {goal}

        # Action Space
        {self.action_space.describe(with_long_description=False, with_examples=True)}
        """

        messages.append({'role': 'system', 'content': system_msg})
        prev_actions = ''
        cur_axtree_txt = ''
        error_prefix = ''
        last_obs = None
        for prev_action, obs in state.history:
            if isinstance(prev_action, BrowseInteractiveAction):
                prev_actions += f'{prev_action.browser_actions}\n'
                last_obs = obs

        if isinstance(last_obs, BrowserOutputObservation):
            if last_obs.error:
                # add error recovery prompt prefix
                error_prefix = f'Last action failed:\n{last_obs.last_browser_action}\nTry again with the current state of the page.\n'
            cur_axtree_txt = flatten_axtree_to_str(last_obs.axtree_object)

        prompt = f"""\
{error_prefix}

# Current Accessibility Tree:
{cur_axtree_txt}

# Previous Actions
{prev_actions}

Here is an example with chain of thought of a valid action when clicking on a button:
"
In order to accomplish my goal I need to click on the button with bid 12
```click("12")```
"
"""
        messages.append({'role': 'user', 'content': prompt})
        response = self.llm.completion(
            messages=messages,
            temperature=0.0,
        )
        self.log_cost(response)
        action_resp = response['choices'][0]['message']['content']
        logger.info(prompt)
        logger.info(action_resp)
        return parse_response(action_resp)

ADHD alert, The agent parsea the action to create MessageAction.

def parse_response(response: str) -> Action:
    thought = response.split('```')[0].strip()
    action_str = response.split('```')[1].strip()
    if 'send_msg_to_user(' in action_str:
        tree = ast.parse(action_str)
        args = tree.body[0].value.args  # type: ignore
        return MessageAction(args[0].value)

    return BrowseInteractiveAction(browser_actions=action_str, thought=thought)

Agent session Link to heading

Who calls reset or step? This will take us to AgentSession and AgentController. In AgentSession opendevin/server/session/agent.py where agent and controller are created in _create_controller

    async def _create_controller(self, start_event: dict):
        """Creates an AgentController instance.

        Args:
            start_event: The start event data (optional).
        """
        if self.controller is not None:
            raise Exception('Controller already created')
        if self.runtime is None:
            raise Exception('Runtime must be initialized before the agent controller')
        args = {
            key: value
            for key, value in start_event.get('args', {}).items()
            if value != ''
        }  # remove empty values, prevent FE from sending empty strings
        agent_cls = args.get(ConfigType.AGENT, config.agent.name)
        model = args.get(ConfigType.LLM_MODEL, config.llm.model)
        api_key = args.get(ConfigType.LLM_API_KEY, config.llm.api_key)
        api_base = config.llm.base_url
        max_iterations = args.get(ConfigType.MAX_ITERATIONS, config.max_iterations)
        max_chars = args.get(ConfigType.MAX_CHARS, config.llm.max_chars)

        logger.info(f'Creating agent {agent_cls} using LLM {model}')
        llm = LLM(model=model, api_key=api_key, base_url=api_base)
        agent = Agent.get_cls(agent_cls)(llm)

        self.runtime.init_sandbox_plugins(agent.sandbox_plugins)

        self.controller = AgentController(
            sid=self.sid,
            event_stream=self.event_stream,
            agent=agent,
            max_iterations=int(max_iterations),
            max_chars=int(max_chars),
        )
        try:
            agent_state = State.restore_from_session(self.sid)
            self.controller.set_state(agent_state)
        except Exception as e:
            print('Error restoring state', e)

_create_controller is called in start

    async def start(self, start_event: dict):
        """Starts the agent session.

        Args:
            start_event: The start event data (optional).
        """
        if self.controller or self.runtime:
            raise Exception(
                'Session already started. You need to close this session and start a new one.'
            )
        await self._create_runtime()
        await self._create_controller(start_event)

Session Link to heading

The next step is finding who starts the AgentSession, Let’s follow that rabbit for a second. start is called from Session in opendevin/server/session/session.py

    async def _initialize_agent(self, data: dict):
        await self.agent_session.event_stream.add_event(
            ChangeAgentStateAction(AgentState.LOADING), EventSource.USER
        )
        await self.agent_session.event_stream.add_event(
            AgentStateChangedObservation('', AgentState.LOADING), EventSource.AGENT
        )
        try:
            await self.agent_session.start(data)

Here is the deal, Session is wrapper class around websocket (ugh!) which uses sid as identifier for getting/creating session through SessionManager.

class Session:
    sid: str
    websocket: WebSocket | None
    last_active_ts: int = 0
    is_alive: bool = True
    agent_session: AgentSession

    def __init__(self, sid: str, ws: WebSocket | None):
        self.sid = sid
        self.websocket = ws
        self.last_active_ts = int(time.time())
        self.agent_session = AgentSession(sid)
        self.agent_session.event_stream.subscribe(
            EventStreamSubscriber.SERVER, self.on_event
        )

ADHD alert, There is one SessionManagersingleton working as factory Sessions

session_manager = SessionManager()

class SessionManager:
    _sessions: dict[str, Session] = {}
    cleanup_interval: int = 300
    session_timeout: int = 600

    def __init__(self):
        asyncio.create_task(self._cleanup_sessions())

    def add_or_restart_session(self, sid: str, ws_conn: WebSocket) -> Session:
        if sid in self._sessions:
            asyncio.create_task(self._sessions[sid].close())
        self._sessions[sid] = Session(sid=sid, ws=ws_conn)
        return self._sessions[sid]

    def get_session(self, sid: str) -> Session | None:
        if sid not in self._sessions:
            return None
        return self._sessions.get(sid)

Back to _initialize_agent called from dispatch called from loop_recv.

    async def loop_recv(self):
        try:
            if self.websocket is None:
                return
            while True:
                try:
                    data = await self.websocket.receive_json()
                except ValueError:
                    await self.send_error('Invalid JSON')
                    continue
                await self.dispatch(data)
        except WebSocketDisconnect:
            await self.close()
            logger.info('WebSocket disconnected, sid: %s', self.sid)
        except RuntimeError as e:
            await self.close()
            logger.exception('Error in loop_recv: %s', e)

ADHD alert, There is another send method for websocket there as well.

    async def send(self, data: dict[str, object]) -> bool:
        try:
            if self.websocket is None or not self.is_alive:
                return False
            await self.websocket.send_json(data)
            await asyncio.sleep(0.001)  # This flushes the data to the client
            self.last_active_ts = int(time.time())
            return True
        except WebSocketDisconnect:
            self.is_alive = False
            return False

Server Link to heading

Who calls loop_recv? It’s the main server in opendevin/server/listen.py. This is typical websocket server implemented with fastapi. session is created there.

@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):

    await websocket.accept()

    session = None
    if websocket.query_params.get('token'):
        token = websocket.query_params.get('token')
        sid = get_sid_from_token(token)

        if sid == '':
            await websocket.send_json({'error': 'Invalid token', 'error_code': 401})
            await websocket.close()
            return
    else:
        sid = str(uuid.uuid4())
        token = sign_token({'sid': sid})

    session = session_manager.add_or_restart_session(sid, websocket)
    await websocket.send_json({'token': token, 'status': 'ok'})

    last_event_id = -1
    if websocket.query_params.get('last_event_id'):
        last_event_id = int(websocket.query_params.get('last_event_id'))
    for event in session.agent_session.event_stream.get_events(
        start_id=last_event_id + 1
    ):
        if isinstance(event, NullAction) or isinstance(event, NullObservation):
            continue
        if isinstance(event, ChangeAgentStateAction) or isinstance(
            event, AgentStateChangedObservation
        ):
            continue
        await websocket.send_json(event_to_dict(event))

    await session.loop_recv()

Jumping to loop_recv, receive_json() is called to receive data from client.

    async def loop_recv(self):
        try:
            if self.websocket is None:
                return
            while True:
                try:
                    data = await self.websocket.receive_json()
                except ValueError:
                    await self.send_error('Invalid JSON')
                    continue
                await self.dispatch(data)
        except WebSocketDisconnect:
            await self.close()
            logger.info('WebSocket disconnected, sid: %s', self.sid)
        except RuntimeError as e:
            await self.close()
            logger.exception('Error in loop_recv: %s', e)

dispatch initialize the agent session and create event and sends to agent session.

    async def dispatch(self, data: dict):
        action = data.get('action', '')
        if action == ActionType.INIT:
            await self._initialize_agent(data)
            return
        event = event_from_dict(data.copy())
        await self.agent_session.event_stream.add_event(event, EventSource.USER)

Agent Controller Link to heading

Now, We have the data flow from client(Browser) to server to Session to AgentSession to Agent/AgentController. I need to go back to AgentController in opendevin/controller/agent_controller.py

__init__ start asyncio task _start_step_loop which essentially a loo.

class AgentController:
    id: str
    agent: Agent
    max_iterations: int
    event_stream: EventStream
    state: State
    agent_task: Optional[asyncio.Task] = None
    delegate: 'AgentController | None' = None
    _pending_action: Action | None = None

    def __init__(
        self,
        agent: Agent,
        event_stream: EventStream,
        sid: str = 'default',
        max_iterations: int = MAX_ITERATIONS,
        max_chars: int = MAX_CHARS,
        inputs: dict | None = None,
    ):
        self.id = sid
        self.agent = agent
        self.state = State(inputs=inputs or {}, max_iterations=max_iterations)
        self.event_stream = event_stream
        self.event_stream.subscribe(
            EventStreamSubscriber.AGENT_CONTROLLER, self.on_event
        )
        self.max_iterations = max_iterations
        self.max_chars = max_chars
        self.agent_task = asyncio.create_task(self._start_step_loop())

_start_step_loop loops for a while calling _step. It has to call sleep to let other even loop things run.(shout out to asyncio)

    async def _start_step_loop(self):
        while True:
            try:
                await self._step()
            except asyncio.CancelledError:
                logger.info('AgentController task was cancelled')
                break
            except Exception as e:
                traceback.print_exc()
                logger.error(f'Error while running the agent: {e}')
                logger.error(traceback.format_exc())
                await self.report_error(
                    'There was an unexpected error while running the agent', exception=e
                )
                await self.set_agent_state_to(AgentState.ERROR)
                break

            await asyncio.sleep(0.1)

_step checks the status of agent and sleeps if not ready to let other asyncio event loop stuff run, but if it is running, it will call step.

    async def _step(self):
        if self.get_agent_state() != AgentState.RUNNING:
            logger.debug('waiting for agent to run...')
            await asyncio.sleep(1)
            return

        if self._pending_action:
            logger.debug('waiting for pending action: ' + str(self._pending_action))
            await asyncio.sleep(1)
            return

        logger.info(f'STEP {self.state.iteration}', extra={'msg_type': 'STEP'})
        if self.state.iteration >= self.max_iterations:
            await self.report_error('Agent reached maximum number of iterations')
            await self.set_agent_state_to(AgentState.ERROR)
            return

        if self.delegate is not None:
            delegate_done = await self.delegate._step()
            if delegate_done:
                outputs = self.delegate.state.outputs if self.delegate.state else {}
                obs: Observation = AgentDelegateObservation(content='', outputs=outputs)
                await self.event_stream.add_event(obs, EventSource.AGENT)
                self.delegate = None
                self.delegateAction = None
            return

        if self.state.num_of_chars > self.max_chars:
            raise MaxCharsExceedError(self.state.num_of_chars, self.max_chars)

        self.update_state_before_step()
        action: Action = NullAction()
        try:
            action = self.agent.step(self.state)
            if action is None:
                raise AgentNoActionError('No action was returned')
        except (AgentMalformedActionError, AgentNoActionError, LLMOutputError) as e:
            await self.report_error(str(e))
            return

        logger.info(action, extra={'msg_type': 'ACTION'})

        self.update_state_after_step()

So, how the status is updated? There is on_event which is subscribed to AGENT_CONTROLLER.

        self.event_stream.subscribe(
            EventStreamSubscriber.AGENT_CONTROLLER, self.on_event
        )

on_event changes the state inside the agent (wrapped with this agent controller). BTW, This triggers the stuff _step

    async def on_event(self, event: Event):
        if isinstance(event, ChangeAgentStateAction):
            await self.set_agent_state_to(event.agent_state)  # type: ignore
        elif isinstance(event, MessageAction):
            if event.source == EventSource.USER:
                await self.add_history(event, NullObservation(''))
                if self.get_agent_state() != AgentState.RUNNING:
                    await self.set_agent_state_to(AgentState.RUNNING)
            elif event.source == EventSource.AGENT and event.wait_for_response:
                await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
        elif isinstance(event, AgentDelegateAction):
            await self.start_delegate(event)
        elif isinstance(event, AddTaskAction):
            self.state.root_task.add_subtask(event.parent, event.goal, event.subtasks)
        elif isinstance(event, ModifyTaskAction):
            self.state.root_task.set_subtask_state(event.task_id, event.state)
        elif isinstance(event, AgentFinishAction):
            self.state.outputs = event.outputs  # type: ignore[attr-defined]
            await self.set_agent_state_to(AgentState.FINISHED)
        elif isinstance(event, Observation):
            if self._pending_action and self._pending_action.id == event.cause:
                await self.add_history(self._pending_action, event)
                self._pending_action = None
                logger.info(event, extra={'msg_type': 'OBSERVATION'})
            elif isinstance(event, CmdOutputObservation):
                await self.add_history(NullAction(), event)
                logger.info(event, extra={'msg_type': 'OBSERVATION'})

LLM API support Link to heading

Opendevin uses litellm as abstraction over different LLM API.

        llm = LLM(model=model, api_key=api_key, base_url=api_base)

In opendevin/llm/llm.py, It uses partial to bind the need args.

        self._completion = partial(
            litellm_completion,
            model=self.model_name,
            api_key=self.api_key,
            base_url=self.base_url,
            api_version=self.api_version,
            custom_llm_provider=custom_llm_provider,
            max_tokens=self.max_output_tokens,
            timeout=self.llm_timeout,
            temperature=llm_temperature,
            top_p=llm_top_p,
        )