Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 49 additions & 49 deletions labgrid/driver/qemudriver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""The QEMUDriver implements a driver to use a QEMU target"""
import atexit
import select
import shlex
import shutil
Expand Down Expand Up @@ -104,17 +103,6 @@ def __attrs_post_init__(self):
self._socket = None
self._clientsocket = None
self._forwarded_ports = {}
atexit.register(self._atexit)

def _atexit(self):
if not self._child:
return
self._child.terminate()
try:
self._child.communicate(timeout=1)
except subprocess.TimeoutExpired:
self._child.kill()
self._child.communicate(timeout=1)

def get_qemu_version(self, qemu_bin):
p = subprocess.run([qemu_bin, "-version"], stdout=subprocess.PIPE, encoding="utf-8")
Expand Down Expand Up @@ -236,36 +224,19 @@ def on_activate(self):
self._socket.bind(sockpath)
self._socket.listen(0)

self._cmd = self.get_qemu_base_args()

self._cmd.append("-S")
self._cmd.append("-qmp")
self._cmd.append("stdio")
qemu_cmd = self.get_qemu_base_args()

self._cmd.append("-chardev")
self._cmd.append(f"socket,id=serialsocket,path={sockpath}")
self._cmd.append("-serial")
self._cmd.append("chardev:serialsocket")
qemu_cmd.append("-S")
qemu_cmd.append("-qmp")
qemu_cmd.append("stdio")

def on_deactivate(self):
if self.status:
self.off()
if self._clientsocket:
self._clientsocket.close()
self._clientsocket = None
self._socket.close()
self._socket = None
shutil.rmtree(self._tempdir)
qemu_cmd.append("-chardev")
qemu_cmd.append(f"socket,id=serialsocket,path={sockpath}")
qemu_cmd.append("-serial")
qemu_cmd.append("chardev:serialsocket")

@step()
def on(self):
"""Start the QEMU subprocess, accept the unix socket connection and
afterwards start the emulator using a QMP Command"""
if self.status:
return
self.logger.debug("Starting with: %s", self._cmd)
self._child = subprocess.Popen(
self._cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.logger.debug("Starting with: %s", qemu_cmd)
self._child = subprocess.Popen(qemu_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

# prepare for timeout handing
self._clientsocket, address = self._socket.accept()
Expand All @@ -282,37 +253,64 @@ def on(self):
) from exc
raise

self.status = 1
def on_deactivate(self):
if self._child and self._child.poll() is None:
self.monitor_command("quit")
try:
self._child.communicate(timeout=1)
except subprocess.TimeoutExpired:
self._child.kill()
self._child.communicate(timeout=1)

self._child = None
self.qmp = None

if self._clientsocket:
self._clientsocket.close()
self._clientsocket = None

if self._socket:
self._socket.close()
self._socket = None
if self._tempdir:
shutil.rmtree(self._tempdir)
self._tempdir = None

@Driver.check_active
@step()
def on(self):
"""Starts the emulation using a QMP command"""
if self.status:
return

# Restore port forwards
for v in self._forwarded_ports.values():
self._add_port_forward(*v)

self.monitor_command("cont")
self.status = 1

@Driver.check_active
@step()
def off(self):
"""Stop the emulator using a monitor command and await the exitcode"""
"""Stops and resets the emulation using monitor commands"""
if not self.status:
return
self.monitor_command('quit')
if self._child.wait() != 0:
self._child.communicate()
raise IOError
self._child = None
self.monitor_command('stop')
self.monitor_command('system_reset')
self.status = 0

@Driver.check_active
@step()
def cycle(self):
"""Cycle the emulator by restarting it"""
self.off()
self.on()

@Driver.check_active
@step(result=True, args=['command', 'arguments'])
def monitor_command(self, command, arguments={}):
"""Execute a monitor_command via the QMP"""
if not self.status:
raise ExecutionError(
"Can't use monitor command on non-running target")
return self.qmp.execute(command, arguments)

def _add_port_forward(self, proto, local_address, local_port, remote_address, remote_port):
Expand All @@ -321,10 +319,12 @@ def _add_port_forward(self, proto, local_address, local_port, remote_address, re
{"command-line": f"hostfwd_add {proto}:{local_address}:{local_port}-{remote_address}:{remote_port}"},
)

@Driver.check_active
def add_port_forward(self, proto, local_address, local_port, remote_address, remote_port):
self._add_port_forward(proto, local_address, local_port, remote_address, remote_port)
self._forwarded_ports[(proto, local_address, local_port)] = (proto, local_address, local_port, remote_address, remote_port)

@Driver.check_active
def remove_port_forward(self, proto, local_address, local_port):
del self._forwarded_ports[(proto, local_address, local_port)]
self.monitor_command(
Expand Down
27 changes: 18 additions & 9 deletions tests/test_qemudriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def qemu_target(qemu_env):
return qemu_env.get_target()

@pytest.fixture
def qemu_driver(qemu_target):
def qemu_driver(qemu_target, qemu_mock):
q = QEMUDriver(
qemu_target,
"qemu",
Expand Down Expand Up @@ -60,28 +60,37 @@ def qemu_mock(mocker):
socket_mock = mocker.patch('socket.socket')
socket_mock.return_value.accept.return_value = mocker.MagicMock(), ''

version_mock = mocker.patch('subprocess.run')
version_mock.return_value.returncode = 0
version_mock.return_value.stdout = "QEMU emulator version 4.2.1"

@pytest.fixture
def qemu_version_mock(mocker):
run_mock = mocker.patch('subprocess.run')
run_mock.return_value.returncode = 0
run_mock.return_value.stdout = "QEMU emulator version 4.2.1"
def qemu_qmp_mock(mocker):
monitor_mock = mocker.patch('labgrid.driver.qemudriver.QMPMonitor')
monitor_mock.return_value.execute.return_value = {'return': {}}
return monitor_mock

def test_qemu_instance(qemu_target, qemu_driver):
def test_qemu_instance(qemu_driver):
assert (isinstance(qemu_driver, QEMUDriver))

def test_qemu_activate_deactivate(qemu_target, qemu_driver, qemu_version_mock):
def test_qemu_activate_deactivate(qemu_target, qemu_driver, qemu_qmp_mock):
qemu_target.activate(qemu_driver)

qemu_driver.monitor_command("info")
qemu_qmp_mock.assert_called_once()
qemu_qmp_mock.return_value.execute.assert_called_with("info", {})

qemu_target.deactivate(qemu_driver)

def test_qemu_on_off(qemu_target, qemu_driver, qemu_mock, qemu_version_mock):
def test_qemu_on_off(qemu_target, qemu_driver):
qemu_target.activate(qemu_driver)

qemu_driver.on()
qemu_driver.off()

qemu_target.deactivate(qemu_driver)

def test_qemu_read_write(qemu_target, qemu_driver, qemu_mock, qemu_version_mock):
def test_qemu_read_write(qemu_target, qemu_driver):
qemu_target.activate(qemu_driver)

qemu_driver.on()
Expand Down