Cocotb provides sync/communication between cocotb coroutines. In this post, I will go through Event and Queue. They are similar to SystemVerilog events and queues.

Event Link to heading

Event provides wait and set methods for coroutines to wait and trigger. This is an example of the Event code.

class Event:
    r"""A way to signal an event across :class:`~cocotb.task.Task`\ s.

    :keyword:`await`\ ing the result of :meth:`wait()` will block the :keyword:`await`\ ing :class:`~cocotb.task.Task`
    until :meth:`set` is called.

    Args:
        name: Name for the Event.

    Usage:
        .. code-block:: python

            e = Event()


            async def task1():
                await e.wait()
                print("resuming!")


            cocotb.start_soon(task1())
            # do stuff
            e.set()
            await NullTrigger()  # allows task1 to execute
            # resuming!

    .. versionremoved:: 2.0

        Removed the undocumented *data* attribute and argument to :meth:`set`.
    """
    ...
    ...

wait returns NullTrigger which means the calls will run immediately, otherwise it will return _Event.

    def wait(self) -> Trigger:
        """Block the current Task until the Event is set.

        If the event has already been set, the trigger will fire immediately.

        To set the Event call :meth:`set`.
        To reset the Event (and enable the use of :meth:`wait` again),
        call :meth:`clear`.
        """
        if self._fired:
            return NullTrigger(name=f"{str(self)}.wait()")
        return _Event(self)

_prime_trigger updates _pending_events.

    def _prime_trigger(
        self, trigger: _Event, callback: Callable[[Trigger], None]
    ) -> None:
        self._pending_events.append(trigger)

When the event is set, it loops over _pending_events and calls _callback.

    def set(self, data: Optional[Any] = None) -> None:
        """Set the Event and unblock all Tasks blocked on this Event."""
        self._fired = True
        if data is not None:
            warnings.warn(
                "The data field will be removed in a future release.",
                DeprecationWarning,
            )
        self._data = data

        pending_events, self._pending_events = self._pending_events, []
        for event in pending_events:
            event._callback(event)

In _Event _prime, it calls _prime_trigger above. It also initializes _callback.

class _Event(Trigger):
    """Unique instance used by the Event object.

    One created for each attempt to wait on the event so that the scheduler
    can maintain a unique mapping of triggers to tasks.
    """

    def __init__(self, parent: "Event") -> None:
        super().__init__()
        self._parent = parent

    def _prime(self, callback: Callable[[Trigger], None]) -> None:
        if self._primed:
            raise RuntimeError(
                "Event.wait() result can only be used by one task at a time"
            )
        self._callback = callback
        self._parent._prime_trigger(self, callback)
        return super()._prime(callback)

Queue Link to heading

Going through Event first is useful because AbstractQueue (which is extended by Queue) uses Event as a synchronization mechanism.

Queue is just a thin layer on top of AbstractQueue to implement _put and _get.

class Queue(AbstractQueue[T]):
    """A subclass of :class:`AbstractQueue`; retrieves oldest entries first (FIFO)."""

    def __init__(self, maxsize: int = 0) -> None:
        super().__init__(maxsize)
        self._queue: Deque[T] = collections.deque()

    def _put(self, item: T) -> None:
        self._queue.append(item)

    def _get(self) -> T:
        return self._queue.popleft()

    def _size(self) -> int:
        return len(self._queue)

    def _repr(self) -> str:
        return repr(self._queue)

The key functions are put, get and their nowait versions.

get will create an event and wait until triggered to call get_nowait. get_nowait calls _get which is defined by Queue. It also notifies the putters they can add stuff to the Queue (typical visitor pattern).

put will also create an event and call put_nowait to push the item and call the getters to pick it up.

class AbstractQueue(Generic[T]):
    """A queue, useful for coordinating producer and consumer coroutines.

    If *maxsize* is less than or equal to 0, the queue size is infinite. If it
    is an integer greater than 0, then :meth:`put` will block when the queue
    reaches *maxsize*, until an item is removed by :meth:`get`.
    """

    def __init__(self, maxsize: int = 0) -> None:
        self._maxsize: int = maxsize
        self._getters: Deque[Tuple[Event, Task[Any]]] = collections.deque()
        self._putters: Deque[Tuple[Event, Task[Any]]] = collections.deque()
  
      def _wakeup_next(self, waiters: Deque[Tuple[Event, Task[Any]]]) -> None:
        while waiters:
            event, task = waiters.popleft()
            if not task.done():
                event.set()
                break

    async def put(self, item: T) -> None:
        """Put an *item* into the queue.

        If the queue is full, wait until a free
        slot is available before adding the item.
        """
        while self.full():
            event = Event(f"{type(self).__name__} put")
            self._putters.append(
                (event, cast(Task[Any], cocotb._scheduler_inst._current_task))
            )
            await event.wait()
        self.put_nowait(item)

    def put_nowait(self, item: T) -> None:
        """Put an *item* into the queue without blocking.

        If no free slot is immediately available, raise :exc:`~cocotb.queue.QueueFull`.
        """
        if self.full():
            raise QueueFull()
        self._put(item)
        self._wakeup_next(self._getters)

    async def get(self) -> T:
        """Remove and return an item from the queue.

        If the queue is empty, wait until an item is available.
        """
        while self.empty():
            event = Event(f"{type(self).__name__} get")
            self._getters.append(
                (event, cast(Task[Any], cocotb._scheduler_inst._current_task))
            )
            await event.wait()
        return self.get_nowait()

    def get_nowait(self) -> T:
        """Remove and return an item from the queue.

        Return an item if one is immediately available, else raise
        :exc:`~cocotb.queue.QueueEmpty`.
        """
        if self.empty():
            raise QueueEmpty()
        item = self._get()
        self._wakeup_next(self._putters)
        return item