Callback is one of the most used concepts in both software and hardware. In UVM, callback is its own thing (specific events registered with UVM), but is also used heavily in TLM.

For cocotb, we can use a similar approach by setting function callbacks to pass objects between components. This is very similar to UVM export/imp write method. That said, there is another pattern by just using cocotb Queue as TLM fifo, which can be a good way to connect components.

In this example, I am using _queue (vanilla python queue) to keep track of packets. In run, once we get something there we pop it and call the _on_rx callback. Note that the source defines connect used by the test to connect its source to a sink.

Packet = namedtuple("Packet", ["data"])

class Source:
    def __init__(self):
        self._queue : deque[Packet] = deque()
        self._running: bool= False
        self._on_rx : Callable[[Packet], None] = None

    def connect(self, sink):
        self._on_rx = sink._on_rx

    def issue_packet(self, packet :Packet):
        self._queue.append(packet)

    async def run(self):
        while self._running:
            # Get from the queue if not empty, or None
            packet = self._queue.popleft() if self._queue else None
            cocotb.log.info(f"Source run loop, queue size: {len(self._queue)}")

            # Process the packet
            if packet is not None:
                cocotb.log.info("Source queue is not empty, sending packet to sink")
                self._on_rx(packet)

            # wait for trigger to let simulation advance
            await Timer(1, unit="ns") # trigger event can be a Timer or clock edge

    async def start(self):
        self._running = True
        self.run_task = cocotb.start_soon(self.run())

    async def stop(self):
        self._running = False
        await self.run_task.kill()

The Sink just defines the _on_rx callback.

class Sink:
    def __init__(self):
        pass

    def _on_rx(self, packet :Packet):
        cocotb.log.info(f"Sink received: {packet.data}")

Finally, the test does the following 2 steps:

  • connect source to the sink using connect
  • push packet into the source queue using issue_packet
@cocotb.test()
async def test_connecting_models(dut):
    source = Source()
    sink = Sink()

    source.connect(sink)

    await source.start()

    source.issue_packet(Packet(data="Hello, World!"))
    await Timer(10, unit="ns")

    source.issue_packet(Packet(data="Hello, World2!"))
    await Timer(10, unit="ns")

    cocotb.log.info("Test completed")