From 0559386ca5bc4d369efa7d81ac980930cc0c8032 Mon Sep 17 00:00:00 2001 From: Bastian Krause Date: Thu, 13 Nov 2025 18:05:42 +0100 Subject: [PATCH 1/3] tests: qemudriver: restructure mocks Since `qemu_version_mock` is required for all tests using `qemu_mock`, move its functionality into `qemu_mock`. Since `qemu_driver` will fixture will soon need `qemu_mock` always, add it as its dependency now, thus minimizing the direct fixtures required by the tests. Signed-off-by: Bastian Krause --- tests/test_qemudriver.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/tests/test_qemudriver.py b/tests/test_qemudriver.py index bfbe08edb..3edd10607 100644 --- a/tests/test_qemudriver.py +++ b/tests/test_qemudriver.py @@ -27,7 +27,7 @@ def qemu_target(qemu_env): return qemu_env.get_target() @pytest.fixture -def qemu_driver(qemu_target): +def qemu_driver(qemu_target, qemu_mock): q = QEMUDriver( qemu_target, "qemu", @@ -60,20 +60,18 @@ def qemu_mock(mocker): socket_mock = mocker.patch('socket.socket') socket_mock.return_value.accept.return_value = mocker.MagicMock(), '' -@pytest.fixture -def qemu_version_mock(mocker): - run_mock = mocker.patch('subprocess.run') - run_mock.return_value.returncode = 0 - run_mock.return_value.stdout = "QEMU emulator version 4.2.1" + version_mock = mocker.patch('subprocess.run') + version_mock.return_value.returncode = 0 + version_mock.return_value.stdout = "QEMU emulator version 4.2.1" -def test_qemu_instance(qemu_target, qemu_driver): +def test_qemu_instance(qemu_driver): assert (isinstance(qemu_driver, QEMUDriver)) -def test_qemu_activate_deactivate(qemu_target, qemu_driver, qemu_version_mock): +def test_qemu_activate_deactivate(qemu_target, qemu_driver): qemu_target.activate(qemu_driver) qemu_target.deactivate(qemu_driver) -def test_qemu_on_off(qemu_target, qemu_driver, qemu_mock, qemu_version_mock): +def test_qemu_on_off(qemu_target, qemu_driver): qemu_target.activate(qemu_driver) qemu_driver.on() @@ -81,7 +79,7 @@ def test_qemu_on_off(qemu_target, qemu_driver, qemu_mock, qemu_version_mock): qemu_target.deactivate(qemu_driver) -def test_qemu_read_write(qemu_target, qemu_driver, qemu_mock, qemu_version_mock): +def test_qemu_read_write(qemu_target, qemu_driver): qemu_target.activate(qemu_driver) qemu_driver.on() From 08c1fd5993223be8166dd23eacf58a8c76196f8e Mon Sep 17 00:00:00 2001 From: Joschka Seydell Date: Wed, 10 Sep 2025 22:14:16 +0200 Subject: [PATCH 2/3] labgrid/qemudriver: tie QEMU and QMP monitor start/stop to on_activate()/on_deactivate() Until now the QEMU process and QMP monitor start was tied to the on()/off() methods. This feels unnatural, preventing the user from interacting with the QEMU process via monitor commands before the emulation starts and meant starting a new process on each power cycle. Rework the driver to start QEMU and the QMP monitor in on_activate(), allowing interaction via monitor_command after activation. The on() and off() methods interact only via QMP now. All methods relying on a started QEMU and QMP monitor instance are decorated with @Driver.check_active now. The atexit handling is no longer required since the target's atexit handler already calls the driver's on_deactivate(). Signed-off-by: Joschka Seydell Signed-off-by: Bastian Krause --- labgrid/driver/qemudriver.py | 80 ++++++++++++++++++------------------ tests/test_qemudriver.py | 13 +++++- 2 files changed, 52 insertions(+), 41 deletions(-) diff --git a/labgrid/driver/qemudriver.py b/labgrid/driver/qemudriver.py index a9dc0829d..63c4cca41 100644 --- a/labgrid/driver/qemudriver.py +++ b/labgrid/driver/qemudriver.py @@ -1,5 +1,4 @@ """The QEMUDriver implements a driver to use a QEMU target""" -import atexit import select import shlex import shutil @@ -104,17 +103,6 @@ def __attrs_post_init__(self): self._socket = None self._clientsocket = None self._forwarded_ports = {} - atexit.register(self._atexit) - - def _atexit(self): - if not self._child: - return - self._child.terminate() - try: - self._child.communicate(timeout=1) - except subprocess.TimeoutExpired: - self._child.kill() - self._child.communicate(timeout=1) def get_qemu_version(self, qemu_bin): p = subprocess.run([qemu_bin, "-version"], stdout=subprocess.PIPE, encoding="utf-8") @@ -247,25 +235,8 @@ def on_activate(self): self._cmd.append("-serial") self._cmd.append("chardev:serialsocket") - def on_deactivate(self): - if self.status: - self.off() - if self._clientsocket: - self._clientsocket.close() - self._clientsocket = None - self._socket.close() - self._socket = None - shutil.rmtree(self._tempdir) - - @step() - def on(self): - """Start the QEMU subprocess, accept the unix socket connection and - afterwards start the emulator using a QMP Command""" - if self.status: - return self.logger.debug("Starting with: %s", self._cmd) - self._child = subprocess.Popen( - self._cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + self._child = subprocess.Popen(self._cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) # prepare for timeout handing self._clientsocket, address = self._socket.accept() @@ -282,37 +253,64 @@ def on(self): ) from exc raise - self.status = 1 + def on_deactivate(self): + if self._child and self._child.poll() is None: + self.monitor_command("quit") + try: + self._child.communicate(timeout=1) + except subprocess.TimeoutExpired: + self._child.kill() + self._child.communicate(timeout=1) + + self._child = None + self.qmp = None + + if self._clientsocket: + self._clientsocket.close() + self._clientsocket = None + + if self._socket: + self._socket.close() + self._socket = None + if self._tempdir: + shutil.rmtree(self._tempdir) + self._tempdir = None + + @Driver.check_active + @step() + def on(self): + """Starts the emulation using a QMP command""" + if self.status: + return # Restore port forwards for v in self._forwarded_ports.values(): self._add_port_forward(*v) self.monitor_command("cont") + self.status = 1 + @Driver.check_active @step() def off(self): - """Stop the emulator using a monitor command and await the exitcode""" + """Stops and resets the emulation using monitor commands""" if not self.status: return - self.monitor_command('quit') - if self._child.wait() != 0: - self._child.communicate() - raise IOError - self._child = None + self.monitor_command('stop') + self.monitor_command('system_reset') self.status = 0 + @Driver.check_active + @step() def cycle(self): """Cycle the emulator by restarting it""" self.off() self.on() + @Driver.check_active @step(result=True, args=['command', 'arguments']) def monitor_command(self, command, arguments={}): """Execute a monitor_command via the QMP""" - if not self.status: - raise ExecutionError( - "Can't use monitor command on non-running target") return self.qmp.execute(command, arguments) def _add_port_forward(self, proto, local_address, local_port, remote_address, remote_port): @@ -321,10 +319,12 @@ def _add_port_forward(self, proto, local_address, local_port, remote_address, re {"command-line": f"hostfwd_add {proto}:{local_address}:{local_port}-{remote_address}:{remote_port}"}, ) + @Driver.check_active def add_port_forward(self, proto, local_address, local_port, remote_address, remote_port): self._add_port_forward(proto, local_address, local_port, remote_address, remote_port) self._forwarded_ports[(proto, local_address, local_port)] = (proto, local_address, local_port, remote_address, remote_port) + @Driver.check_active def remove_port_forward(self, proto, local_address, local_port): del self._forwarded_ports[(proto, local_address, local_port)] self.monitor_command( diff --git a/tests/test_qemudriver.py b/tests/test_qemudriver.py index 3edd10607..3bee63bd0 100644 --- a/tests/test_qemudriver.py +++ b/tests/test_qemudriver.py @@ -64,11 +64,22 @@ def qemu_mock(mocker): version_mock.return_value.returncode = 0 version_mock.return_value.stdout = "QEMU emulator version 4.2.1" +@pytest.fixture +def qemu_qmp_mock(mocker): + monitor_mock = mocker.patch('labgrid.driver.qemudriver.QMPMonitor') + monitor_mock.return_value.execute.return_value = {'return': {}} + return monitor_mock + def test_qemu_instance(qemu_driver): assert (isinstance(qemu_driver, QEMUDriver)) -def test_qemu_activate_deactivate(qemu_target, qemu_driver): +def test_qemu_activate_deactivate(qemu_target, qemu_driver, qemu_qmp_mock): qemu_target.activate(qemu_driver) + + qemu_driver.monitor_command("info") + qemu_qmp_mock.assert_called_once() + qemu_qmp_mock.return_value.execute.assert_called_with("info", {}) + qemu_target.deactivate(qemu_driver) def test_qemu_on_off(qemu_target, qemu_driver): From b9b44895a83910805c7ddcc3bbe6d6645abb5b73 Mon Sep 17 00:00:00 2001 From: Bastian Krause Date: Fri, 14 Nov 2025 17:24:36 +0100 Subject: [PATCH 3/3] driver/qemudriver: turn private _cmd member into local variable Now that the QEMU invocation is not only prepared, but also started in on_activate(), there is no need to keep the command around. Turn it into a local variable instead. Signed-off-by: Bastian Krause --- labgrid/driver/qemudriver.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/labgrid/driver/qemudriver.py b/labgrid/driver/qemudriver.py index 63c4cca41..47a740b42 100644 --- a/labgrid/driver/qemudriver.py +++ b/labgrid/driver/qemudriver.py @@ -224,19 +224,19 @@ def on_activate(self): self._socket.bind(sockpath) self._socket.listen(0) - self._cmd = self.get_qemu_base_args() + qemu_cmd = self.get_qemu_base_args() - self._cmd.append("-S") - self._cmd.append("-qmp") - self._cmd.append("stdio") + qemu_cmd.append("-S") + qemu_cmd.append("-qmp") + qemu_cmd.append("stdio") - self._cmd.append("-chardev") - self._cmd.append(f"socket,id=serialsocket,path={sockpath}") - self._cmd.append("-serial") - self._cmd.append("chardev:serialsocket") + qemu_cmd.append("-chardev") + qemu_cmd.append(f"socket,id=serialsocket,path={sockpath}") + qemu_cmd.append("-serial") + qemu_cmd.append("chardev:serialsocket") - self.logger.debug("Starting with: %s", self._cmd) - self._child = subprocess.Popen(self._cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + self.logger.debug("Starting with: %s", qemu_cmd) + self._child = subprocess.Popen(qemu_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) # prepare for timeout handing self._clientsocket, address = self._socket.accept()