TL;DR Link to heading
This is a deconstruction of Opendevin user-to-agent development assistant framework.
I will jump around a lot because I have no idea what is going on but this the TL;DR
Opendevin uses the following abstraction to manage data from client to Agents and back using websocket and internal steam and subscribers to these streams.
SessionManager -> Session -> AgentSession -> AgentController -> Agent
Starting opendevin with Ollama Link to heading
The simplest way to run it using docker and Ollama running locally. Here is the command that worked for me
docker run -it --pull=always -e SANDBOX_USER_ID=$(id -u) -e WORKSPACE_MOUNT_PATH=$WORKSPACE_BASE -v $WORKSPACE_BASE:/opt/workspace_base -v /var/run/docker.sock:/var/run/docker.sock -p 3000:3000 --add-host host.docker.internal:host-gateway -e LLM_API_KEY="ollama" -e LLM_BASE_URL="http://host.docker.internal:11434" ghcr.io/opendevin/opendevin:0.5
That’s it. It’s up on localhost:3000
Manual Setup Link to heading
If you have some time to kill, here is summary for steps from manual installation.
- Install python3.11
- Install Newest version node
- Install poetry
apt install python3.11
pip install poetry
Then see the make purrr
make build
Agent Link to heading
The story always start with our hero the agent. The agents live in agenthub/
. Let’s look at agenthub/browsing_agent/__init__.py
where agent is registered with Agent.register
from opendevin.controller.agent import Agent
from .browsing_agent import BrowsingAgent
Agent.register('BrowsingAgent', BrowsingAgent)
The register
function is defined in opendevin/controller/agent.py
.
@classmethod
def register(cls, name: str, agent_cls: Type['Agent']):
if name in cls._registry:
raise AgentAlreadyRegisteredError(name)
cls._registry[name] = agent_cls
You see, _registry
is used in several places. For example, The server API returns to the frontend. We will circle back later.
@app.get('/api/options/agents')
async def get_agents():
agents = Agent.list_agents()
return agents
BrowsingAgent Link to heading
Let’s start with one of the agent in agenthub
. As Agent
provide the base class, The concrete agent needs to implement some methods.
- reset
- step
- search_memory
In this case, the browsing agent implements the reset
and step
.
def reset(self) -> None:
super().reset()
self.cost_accumulator = 0
step
creates a prompt using system template and user prompt. It sends the prompt to LLM (with temperature 0). Based on the agent name and prompt, This examines a web page
def step(self, state: State) -> Action:
goal = state.get_current_user_intent()
messages = []
system_msg = f"""\
# Instructions
Review the current state of the page and all other information to find the best
possible next action to accomplish your goal. Your answer will be interpreted
and executed by a program, make sure to follow the formatting instructions.
# Goal:
{goal}
# Action Space
{self.action_space.describe(with_long_description=False, with_examples=True)}
"""
messages.append({'role': 'system', 'content': system_msg})
prev_actions = ''
cur_axtree_txt = ''
error_prefix = ''
last_obs = None
for prev_action, obs in state.history:
if isinstance(prev_action, BrowseInteractiveAction):
prev_actions += f'{prev_action.browser_actions}\n'
last_obs = obs
if isinstance(last_obs, BrowserOutputObservation):
if last_obs.error:
# add error recovery prompt prefix
error_prefix = f'Last action failed:\n{last_obs.last_browser_action}\nTry again with the current state of the page.\n'
cur_axtree_txt = flatten_axtree_to_str(last_obs.axtree_object)
prompt = f"""\
{error_prefix}
# Current Accessibility Tree:
{cur_axtree_txt}
# Previous Actions
{prev_actions}
Here is an example with chain of thought of a valid action when clicking on a button:
"
In order to accomplish my goal I need to click on the button with bid 12
```click("12")```
"
"""
messages.append({'role': 'user', 'content': prompt})
response = self.llm.completion(
messages=messages,
temperature=0.0,
)
self.log_cost(response)
action_resp = response['choices'][0]['message']['content']
logger.info(prompt)
logger.info(action_resp)
return parse_response(action_resp)
ADHD alert, The agent parsea the action to create MessageAction
.
def parse_response(response: str) -> Action:
thought = response.split('```')[0].strip()
action_str = response.split('```')[1].strip()
if 'send_msg_to_user(' in action_str:
tree = ast.parse(action_str)
args = tree.body[0].value.args # type: ignore
return MessageAction(args[0].value)
return BrowseInteractiveAction(browser_actions=action_str, thought=thought)
Agent session Link to heading
Who calls reset or step? This will take us to AgentSession
and AgentController
. In AgentSession
opendevin/server/session/agent.py
where agent and controller are created in _create_controller
async def _create_controller(self, start_event: dict):
"""Creates an AgentController instance.
Args:
start_event: The start event data (optional).
"""
if self.controller is not None:
raise Exception('Controller already created')
if self.runtime is None:
raise Exception('Runtime must be initialized before the agent controller')
args = {
key: value
for key, value in start_event.get('args', {}).items()
if value != ''
} # remove empty values, prevent FE from sending empty strings
agent_cls = args.get(ConfigType.AGENT, config.agent.name)
model = args.get(ConfigType.LLM_MODEL, config.llm.model)
api_key = args.get(ConfigType.LLM_API_KEY, config.llm.api_key)
api_base = config.llm.base_url
max_iterations = args.get(ConfigType.MAX_ITERATIONS, config.max_iterations)
max_chars = args.get(ConfigType.MAX_CHARS, config.llm.max_chars)
logger.info(f'Creating agent {agent_cls} using LLM {model}')
llm = LLM(model=model, api_key=api_key, base_url=api_base)
agent = Agent.get_cls(agent_cls)(llm)
self.runtime.init_sandbox_plugins(agent.sandbox_plugins)
self.controller = AgentController(
sid=self.sid,
event_stream=self.event_stream,
agent=agent,
max_iterations=int(max_iterations),
max_chars=int(max_chars),
)
try:
agent_state = State.restore_from_session(self.sid)
self.controller.set_state(agent_state)
except Exception as e:
print('Error restoring state', e)
_create_controller
is called in start
async def start(self, start_event: dict):
"""Starts the agent session.
Args:
start_event: The start event data (optional).
"""
if self.controller or self.runtime:
raise Exception(
'Session already started. You need to close this session and start a new one.'
)
await self._create_runtime()
await self._create_controller(start_event)
Session Link to heading
The next step is finding who starts the AgentSession
, Let’s follow that rabbit for a second.
start
is called from Session
in opendevin/server/session/session.py
async def _initialize_agent(self, data: dict):
await self.agent_session.event_stream.add_event(
ChangeAgentStateAction(AgentState.LOADING), EventSource.USER
)
await self.agent_session.event_stream.add_event(
AgentStateChangedObservation('', AgentState.LOADING), EventSource.AGENT
)
try:
await self.agent_session.start(data)
Here is the deal, Session
is wrapper class around websocket (ugh!) which uses sid
as identifier for getting/creating session through SessionManager
.
class Session:
sid: str
websocket: WebSocket | None
last_active_ts: int = 0
is_alive: bool = True
agent_session: AgentSession
def __init__(self, sid: str, ws: WebSocket | None):
self.sid = sid
self.websocket = ws
self.last_active_ts = int(time.time())
self.agent_session = AgentSession(sid)
self.agent_session.event_stream.subscribe(
EventStreamSubscriber.SERVER, self.on_event
)
ADHD alert, There is one SessionManager
singleton working as factory Sessions
session_manager = SessionManager()
class SessionManager:
_sessions: dict[str, Session] = {}
cleanup_interval: int = 300
session_timeout: int = 600
def __init__(self):
asyncio.create_task(self._cleanup_sessions())
def add_or_restart_session(self, sid: str, ws_conn: WebSocket) -> Session:
if sid in self._sessions:
asyncio.create_task(self._sessions[sid].close())
self._sessions[sid] = Session(sid=sid, ws=ws_conn)
return self._sessions[sid]
def get_session(self, sid: str) -> Session | None:
if sid not in self._sessions:
return None
return self._sessions.get(sid)
Back to _initialize_agent
called from dispatch
called from loop_recv
.
async def loop_recv(self):
try:
if self.websocket is None:
return
while True:
try:
data = await self.websocket.receive_json()
except ValueError:
await self.send_error('Invalid JSON')
continue
await self.dispatch(data)
except WebSocketDisconnect:
await self.close()
logger.info('WebSocket disconnected, sid: %s', self.sid)
except RuntimeError as e:
await self.close()
logger.exception('Error in loop_recv: %s', e)
ADHD alert, There is another send
method for websocket there as well.
async def send(self, data: dict[str, object]) -> bool:
try:
if self.websocket is None or not self.is_alive:
return False
await self.websocket.send_json(data)
await asyncio.sleep(0.001) # This flushes the data to the client
self.last_active_ts = int(time.time())
return True
except WebSocketDisconnect:
self.is_alive = False
return False
Server Link to heading
Who calls loop_recv
? It’s the main server in opendevin/server/listen.py
. This is typical websocket server implemented with fastapi. session
is created there.
@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
session = None
if websocket.query_params.get('token'):
token = websocket.query_params.get('token')
sid = get_sid_from_token(token)
if sid == '':
await websocket.send_json({'error': 'Invalid token', 'error_code': 401})
await websocket.close()
return
else:
sid = str(uuid.uuid4())
token = sign_token({'sid': sid})
session = session_manager.add_or_restart_session(sid, websocket)
await websocket.send_json({'token': token, 'status': 'ok'})
last_event_id = -1
if websocket.query_params.get('last_event_id'):
last_event_id = int(websocket.query_params.get('last_event_id'))
for event in session.agent_session.event_stream.get_events(
start_id=last_event_id + 1
):
if isinstance(event, NullAction) or isinstance(event, NullObservation):
continue
if isinstance(event, ChangeAgentStateAction) or isinstance(
event, AgentStateChangedObservation
):
continue
await websocket.send_json(event_to_dict(event))
await session.loop_recv()
Jumping to loop_recv
, receive_json()
is called to receive data from client.
async def loop_recv(self):
try:
if self.websocket is None:
return
while True:
try:
data = await self.websocket.receive_json()
except ValueError:
await self.send_error('Invalid JSON')
continue
await self.dispatch(data)
except WebSocketDisconnect:
await self.close()
logger.info('WebSocket disconnected, sid: %s', self.sid)
except RuntimeError as e:
await self.close()
logger.exception('Error in loop_recv: %s', e)
dispatch
initialize the agent session and create event
and sends to agent session.
async def dispatch(self, data: dict):
action = data.get('action', '')
if action == ActionType.INIT:
await self._initialize_agent(data)
return
event = event_from_dict(data.copy())
await self.agent_session.event_stream.add_event(event, EventSource.USER)
Agent Controller Link to heading
Now, We have the data flow from client(Browser) to server to Session to AgentSession to Agent/AgentController. I need to go back to AgentController
in opendevin/controller/agent_controller.py
__init__
start asyncio task _start_step_loop
which essentially a loo.
class AgentController:
id: str
agent: Agent
max_iterations: int
event_stream: EventStream
state: State
agent_task: Optional[asyncio.Task] = None
delegate: 'AgentController | None' = None
_pending_action: Action | None = None
def __init__(
self,
agent: Agent,
event_stream: EventStream,
sid: str = 'default',
max_iterations: int = MAX_ITERATIONS,
max_chars: int = MAX_CHARS,
inputs: dict | None = None,
):
self.id = sid
self.agent = agent
self.state = State(inputs=inputs or {}, max_iterations=max_iterations)
self.event_stream = event_stream
self.event_stream.subscribe(
EventStreamSubscriber.AGENT_CONTROLLER, self.on_event
)
self.max_iterations = max_iterations
self.max_chars = max_chars
self.agent_task = asyncio.create_task(self._start_step_loop())
_start_step_loop
loops for a while calling _step
. It has to call sleep to let other even loop things run.(shout out to asyncio)
async def _start_step_loop(self):
while True:
try:
await self._step()
except asyncio.CancelledError:
logger.info('AgentController task was cancelled')
break
except Exception as e:
traceback.print_exc()
logger.error(f'Error while running the agent: {e}')
logger.error(traceback.format_exc())
await self.report_error(
'There was an unexpected error while running the agent', exception=e
)
await self.set_agent_state_to(AgentState.ERROR)
break
await asyncio.sleep(0.1)
_step
checks the status of agent and sleeps if not ready to let other asyncio event loop stuff run, but if it is running, it will call step
.
async def _step(self):
if self.get_agent_state() != AgentState.RUNNING:
logger.debug('waiting for agent to run...')
await asyncio.sleep(1)
return
if self._pending_action:
logger.debug('waiting for pending action: ' + str(self._pending_action))
await asyncio.sleep(1)
return
logger.info(f'STEP {self.state.iteration}', extra={'msg_type': 'STEP'})
if self.state.iteration >= self.max_iterations:
await self.report_error('Agent reached maximum number of iterations')
await self.set_agent_state_to(AgentState.ERROR)
return
if self.delegate is not None:
delegate_done = await self.delegate._step()
if delegate_done:
outputs = self.delegate.state.outputs if self.delegate.state else {}
obs: Observation = AgentDelegateObservation(content='', outputs=outputs)
await self.event_stream.add_event(obs, EventSource.AGENT)
self.delegate = None
self.delegateAction = None
return
if self.state.num_of_chars > self.max_chars:
raise MaxCharsExceedError(self.state.num_of_chars, self.max_chars)
self.update_state_before_step()
action: Action = NullAction()
try:
action = self.agent.step(self.state)
if action is None:
raise AgentNoActionError('No action was returned')
except (AgentMalformedActionError, AgentNoActionError, LLMOutputError) as e:
await self.report_error(str(e))
return
logger.info(action, extra={'msg_type': 'ACTION'})
self.update_state_after_step()
So, how the status is updated? There is on_event
which is subscribed to AGENT_CONTROLLER
.
self.event_stream.subscribe(
EventStreamSubscriber.AGENT_CONTROLLER, self.on_event
)
on_event
changes the state inside the agent (wrapped with this agent controller). BTW, This triggers the stuff _step
async def on_event(self, event: Event):
if isinstance(event, ChangeAgentStateAction):
await self.set_agent_state_to(event.agent_state) # type: ignore
elif isinstance(event, MessageAction):
if event.source == EventSource.USER:
await self.add_history(event, NullObservation(''))
if self.get_agent_state() != AgentState.RUNNING:
await self.set_agent_state_to(AgentState.RUNNING)
elif event.source == EventSource.AGENT and event.wait_for_response:
await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
elif isinstance(event, AgentDelegateAction):
await self.start_delegate(event)
elif isinstance(event, AddTaskAction):
self.state.root_task.add_subtask(event.parent, event.goal, event.subtasks)
elif isinstance(event, ModifyTaskAction):
self.state.root_task.set_subtask_state(event.task_id, event.state)
elif isinstance(event, AgentFinishAction):
self.state.outputs = event.outputs # type: ignore[attr-defined]
await self.set_agent_state_to(AgentState.FINISHED)
elif isinstance(event, Observation):
if self._pending_action and self._pending_action.id == event.cause:
await self.add_history(self._pending_action, event)
self._pending_action = None
logger.info(event, extra={'msg_type': 'OBSERVATION'})
elif isinstance(event, CmdOutputObservation):
await self.add_history(NullAction(), event)
logger.info(event, extra={'msg_type': 'OBSERVATION'})
LLM API support Link to heading
Opendevin uses litellm
as abstraction over different LLM API.
llm = LLM(model=model, api_key=api_key, base_url=api_base)
In opendevin/llm/llm.py
, It uses partial to bind the need args.
self._completion = partial(
litellm_completion,
model=self.model_name,
api_key=self.api_key,
base_url=self.base_url,
api_version=self.api_version,
custom_llm_provider=custom_llm_provider,
max_tokens=self.max_output_tokens,
timeout=self.llm_timeout,
temperature=llm_temperature,
top_p=llm_top_p,
)