Cocotb provides sync/communication between cocotb coroutines. In this post, I will go through Event
and Queue
. They are similar to SystemVerilog events and queues.
Event Link to heading
Event
provides wait
and set
methods for coroutines to wait and trigger. This is an example of the Event
code.
class Event:
r"""A way to signal an event across :class:`~cocotb.task.Task`\ s.
:keyword:`await`\ ing the result of :meth:`wait()` will block the :keyword:`await`\ ing :class:`~cocotb.task.Task`
until :meth:`set` is called.
Args:
name: Name for the Event.
Usage:
.. code-block:: python
e = Event()
async def task1():
await e.wait()
print("resuming!")
cocotb.start_soon(task1())
# do stuff
e.set()
await NullTrigger() # allows task1 to execute
# resuming!
.. versionremoved:: 2.0
Removed the undocumented *data* attribute and argument to :meth:`set`.
"""
...
...
wait
returns NullTrigger
which means the calls will run immediately, otherwise it will return _Event
.
def wait(self) -> Trigger:
"""Block the current Task until the Event is set.
If the event has already been set, the trigger will fire immediately.
To set the Event call :meth:`set`.
To reset the Event (and enable the use of :meth:`wait` again),
call :meth:`clear`.
"""
if self._fired:
return NullTrigger(name=f"{str(self)}.wait()")
return _Event(self)
_prime_trigger
updates _pending_events
.
def _prime_trigger(
self, trigger: _Event, callback: Callable[[Trigger], None]
) -> None:
self._pending_events.append(trigger)
When the event is set, it loops over _pending_events
and calls _callback
.
def set(self, data: Optional[Any] = None) -> None:
"""Set the Event and unblock all Tasks blocked on this Event."""
self._fired = True
if data is not None:
warnings.warn(
"The data field will be removed in a future release.",
DeprecationWarning,
)
self._data = data
pending_events, self._pending_events = self._pending_events, []
for event in pending_events:
event._callback(event)
In _Event
_prime
, it calls _prime_trigger
above. It also initializes _callback
.
class _Event(Trigger):
"""Unique instance used by the Event object.
One created for each attempt to wait on the event so that the scheduler
can maintain a unique mapping of triggers to tasks.
"""
def __init__(self, parent: "Event") -> None:
super().__init__()
self._parent = parent
def _prime(self, callback: Callable[[Trigger], None]) -> None:
if self._primed:
raise RuntimeError(
"Event.wait() result can only be used by one task at a time"
)
self._callback = callback
self._parent._prime_trigger(self, callback)
return super()._prime(callback)
Queue Link to heading
Going through Event
first is useful because AbstractQueue
(which is extended by Queue
) uses Event
as a synchronization mechanism.
Queue
is just a thin layer on top of AbstractQueue
to implement _put
and _get
.
class Queue(AbstractQueue[T]):
"""A subclass of :class:`AbstractQueue`; retrieves oldest entries first (FIFO)."""
def __init__(self, maxsize: int = 0) -> None:
super().__init__(maxsize)
self._queue: Deque[T] = collections.deque()
def _put(self, item: T) -> None:
self._queue.append(item)
def _get(self) -> T:
return self._queue.popleft()
def _size(self) -> int:
return len(self._queue)
def _repr(self) -> str:
return repr(self._queue)
The key functions are put
, get
and their nowait versions.
get
will create an event and wait until triggered to call get_nowait
. get_nowait
calls _get
which is defined by Queue
. It also notifies the putters they can add stuff to the Queue (typical visitor pattern).
put
will also create an event and call put_nowait
to push the item and call the getters to pick it up.
class AbstractQueue(Generic[T]):
"""A queue, useful for coordinating producer and consumer coroutines.
If *maxsize* is less than or equal to 0, the queue size is infinite. If it
is an integer greater than 0, then :meth:`put` will block when the queue
reaches *maxsize*, until an item is removed by :meth:`get`.
"""
def __init__(self, maxsize: int = 0) -> None:
self._maxsize: int = maxsize
self._getters: Deque[Tuple[Event, Task[Any]]] = collections.deque()
self._putters: Deque[Tuple[Event, Task[Any]]] = collections.deque()
def _wakeup_next(self, waiters: Deque[Tuple[Event, Task[Any]]]) -> None:
while waiters:
event, task = waiters.popleft()
if not task.done():
event.set()
break
async def put(self, item: T) -> None:
"""Put an *item* into the queue.
If the queue is full, wait until a free
slot is available before adding the item.
"""
while self.full():
event = Event(f"{type(self).__name__} put")
self._putters.append(
(event, cast(Task[Any], cocotb._scheduler_inst._current_task))
)
await event.wait()
self.put_nowait(item)
def put_nowait(self, item: T) -> None:
"""Put an *item* into the queue without blocking.
If no free slot is immediately available, raise :exc:`~cocotb.queue.QueueFull`.
"""
if self.full():
raise QueueFull()
self._put(item)
self._wakeup_next(self._getters)
async def get(self) -> T:
"""Remove and return an item from the queue.
If the queue is empty, wait until an item is available.
"""
while self.empty():
event = Event(f"{type(self).__name__} get")
self._getters.append(
(event, cast(Task[Any], cocotb._scheduler_inst._current_task))
)
await event.wait()
return self.get_nowait()
def get_nowait(self) -> T:
"""Remove and return an item from the queue.
Return an item if one is immediately available, else raise
:exc:`~cocotb.queue.QueueEmpty`.
"""
if self.empty():
raise QueueEmpty()
item = self._get()
self._wakeup_next(self._putters)
return item