Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 79 additions & 71 deletions chroot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,29 @@
import subprocess
from contextlib import contextmanager

from typing import Dict, Optional, Union, TypeVar, Generator, List, Any
from typing import Generator, TypeVar, Any

AnyPath = TypeVar('AnyPath', str, os.PathLike)
AnyPath = TypeVar("AnyPath", str, os.PathLike)

MNT_DEFAULT = {
# Mounts 'devpts' and 'proc' type mounts into the chroot
'switch': '-t', # use '-t' (type) switch with mount
'proc' : 'proc', # label/mount_type: mount_point
'devpts': 'dev/pts'}
# Mounts 'devpts' and 'proc' type mounts into the chroot
"switch": "-t", # use '-t' (type) switch with mount
"proc": "proc", # label/mount_type: mount_point
"devpts": "dev/pts",
}

MNT_FULL = {
# Bind mounts /dev, /sys, /proc & /run into the chroot
'switch': '-o', # use '-o (bind)' (option) switch with mount
'proc': 'proc', # label/host_mount: mount_point
'dev': 'dev',
'sys': 'sys',
'run': 'run'}
# Bind mounts /dev, /sys, /proc & /run into the chroot
"switch": "-o", # use '-o (bind)' (option) switch with mount
"proc": "proc", # label/host_mount: mount_point
"dev": "dev",
"sys": "sys",
"run": "run",
}


def debug(*s: Any) -> None:
if os.getenv('TKL_CHROOT_DEBUG', ''):
if os.getenv("TKL_CHROOT_DEBUG", ""):
print(*s)


Expand All @@ -44,16 +46,16 @@ class MountError(ChrootError):


def is_mounted(path: AnyPath) -> bool:
''' determines if a given path is currently mounted.
"""determines if a given path is currently mounted.

This method supports any path-like object (any object which implements the
os.PathLike interface, this includes `str`, `bytes` and path objects
provided by `pathlib` in the standard library.
'''
raw_path: Union[str, bytes] = os.fspath(path)
mode = 'rb' if isinstance(raw_path, bytes) else 'r'
sep = b' ' if isinstance(raw_path, bytes) else ' '
with open('/proc/mounts', mode) as fob:
"""
raw_path: str | bytes = os.fspath(path)
mode = "rb" if isinstance(raw_path, bytes) else "r"
sep = b" " if isinstance(raw_path, bytes) else " "
with open("/proc/mounts", mode) as fob:
for line in fob:
host, guest, *others = line.split(sep)
if guest == path:
Expand All @@ -63,11 +65,11 @@ def is_mounted(path: AnyPath) -> bool:

@contextmanager
def mount(
target: os.PathLike,
environ: Optional[Dict[str, str]] = None,
mnt_profile: Optional[Dict[str, str]] = None
) -> Generator['Chroot', None, None]:
'''magic mount context manager
target: os.PathLike,
environ: dict[str, str] | None = None,
mnt_profile: dict[str, str] | None = None,
) -> Generator["Chroot", None, None]:
"""magic mount context manager

Usage:

Expand All @@ -82,61 +84,63 @@ def mount(
Yields:
a `Chroot` object representing a mounted chroot at the given location

'''
"""
yield Chroot(target, environ, mnt_profile)


class MagicMounts:
'''MagicMounts: An object which manages mounting/unmounting a chroot.
"""MagicMounts: An object which manages mounting/unmounting a chroot.

You *probably* don't want to use this object directly but rather the `mount`
context manager, or the `Chroot` object.
'''
def __init__(self, mnt_profile: Dict[str, str], root: str = "/"):
"""

def __init__(self, mnt_profile: dict[str, str], root: str = "/"):
root = os.fspath(abspath(root))

self.profile = mnt_profile

self.path: Dict[str, str] = {}
self.mounted: Dict[str, str] = {}
self.path: dict[str, str] = {}
self.mounted: dict[str, bool] = {}
for k, v in self.profile.items():
if k != 'switch':
if k != "switch":
self.path[k] = join(root, v)
self.mounted[k] = False

self.mount()

def mount(self) -> None:
''' mount this chroot
"""mount this chroot

Raises:
MountError: An error occured while trying to mount chroot
'''
"""
for host_mnt, chr_path in self.path.items():
if is_mounted(chr_path):
continue
switch = self.profile['switch']
command = ['mount', switch]
if switch == '-o':
command.extend(['bind', host_mnt, chr_path])
elif switch == '-t':
command.extend([host_mnt, f'{host_mnt}-chroot', chr_path])
switch = self.profile["switch"]
command = ["mount", switch]
if switch == "-o":
command.extend(["bind", host_mnt, chr_path])
elif switch == "-t":
command.extend([host_mnt, f"{host_mnt}-chroot", chr_path])
else:
raise MountError(
f"Unknown switch passed to mount() method: '{switch}'.")
f"Unknown switch passed to mount() method: '{switch}'."
)
try:
subprocess.run(command, check=True)
self.mounted[host_mnt] = True
except subprocess.CalledProcessError as e:
raise MountError(*e.args) from e

def umount(self) -> None:
''' un-mount this chroot
"""un-mount this chroot

Raises:
MountError: An error occured while trying to un-mount chroot
'''
command = ['umount', '-f']
"""
command = ["umount", "-f"]
for mount in self.mounted.keys():
if self.mounted[mount]:
subprocess.run([*command, self.path[mount]])
Expand All @@ -147,50 +151,52 @@ def __del__(self) -> None:


class Chroot:
'''represents a chroot on your system that you can run commands inside.
"""represents a chroot on your system that you can run commands inside.
This class automatically attempts to mount the given chroot.

Example usage:

>>> foo = Chroot('/path/to/chroot', { 'ENVVAR': 'bar' })
>>> assert 'ENVVAR=bar' in foo.run(['env'], text=True).stdout
'''
def __init__(
self, newroot: AnyPath,
environ: Optional[Dict[str, str]] = None,
mnt_profile: Optional[Dict[str, str]] = None):
"""

def __init__(
self,
newroot: AnyPath,
environ: dict[str, str] | None = None,
mnt_profile: dict[str, str] | None = None,
):
if environ is None:
environ = {}
self.environ = {
'HOME': '/root',
'TERM': os.environ['TERM'],
'LC_ALL': 'C',
'PATH': "/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/bin:/usr/sbin"
"HOME": "/root",
"TERM": os.environ["TERM"],
"LC_ALL": "C",
"PATH": "/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/bin:/usr/sbin",
}
self.environ.update(environ)
self.profile = MNT_DEFAULT if not mnt_profile else mnt_profile

self.path: str = realpath(os.fspath(newroot))
self.magicmounts = MagicMounts(self.profile, self.path)

def _prepare_command(self, *commands: str) -> List[str]:
if '>' in commands or '<' in commands or '|' in commands:
raise ChrootError("Output redirects and pipes not supported in"
f"fab-chroot (command: `{commands}')")
def _prepare_command(self, *commands: str) -> list[str]:
if ">" in commands or "<" in commands or "|" in commands:
raise ChrootError(
"Output redirects and pipes not supported in"
f"fab-chroot (command: `{commands}')"
)
quoted_commands = []
for command in commands:
try:
quoted_commands.append(shlex.quote(command))
except TypeError as e:
raise ChrootError(f'failed to prepare command {command!r} for chroot') from e
return [
'chroot', self.path,
'sh', '-c',
' '.join(quoted_commands)
]

def system(self, command: Optional[str] = None) -> int:
raise ChrootError(
f"failed to prepare command {command!r} for chroot"
) from e
return ["chroot", self.path, "sh", "-c", " ".join(quoted_commands)]

def system(self, command: str | None = None) -> int:
"""execute system command in chroot

roughly analagous to `os.system` except within the context of a chroot
Expand All @@ -208,13 +214,15 @@ def system(self, command: Optional[str] = None) -> int:
FileNotFoundError: chroot program doesn't exist
"""

debug('chroot.system (args) => \x1b[34m', repr(command), '\x1b[0m')
command_chroot = ['chroot', self.path, '/bin/bash']
debug("chroot.system (args) => \x1b[34m", repr(command), "\x1b[0m")
command_chroot = ["chroot", self.path, "/bin/bash"]
if command:
command_chroot.extend(['-c', command])
command_chroot.extend(["-c", command])
return subprocess.run(command_chroot, env=self.environ).returncode

def run(self, command: str, *args: Any, **kwargs: Any) -> subprocess.CompletedProcess:
def run(
self, command: str, *args: Any, **kwargs: Any
) -> subprocess.CompletedProcess:
"""execute system command in chroot

roughly analagous to `subprocess.run` except within the context of a
Expand All @@ -239,7 +247,7 @@ def run(self, command: str, *args: Any, **kwargs: Any) -> subprocess.CompletedPr
CalledProcessError: check=True was passed in kwargs and
exitcode != 0
"""
debug('chroot.run (args) => \x1b[34m', repr(command), '\x1b[0m')
debug("chroot.run (args) => \x1b[34m", repr(command), "\x1b[0m")
cmd = self._prepare_command(*command)
debug('chroot.run (prepared cmd) => \x1b[33m', repr(cmd), '\x1b[0m')
debug("chroot.run (prepared cmd) => \x1b[33m", repr(cmd), "\x1b[0m")
return subprocess.run(cmd, env=self.environ, *args, **kwargs)
6 changes: 3 additions & 3 deletions debian/control
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ Priority: optional
Maintainer: Stefan Davis <stefan@turnkeylinux.org>
Build-Depends:
debhelper (>= 10),
python3-all (>= 3.5~),
python3-all (>= 3.11~),
dh-python
Standards-Version: 4.0.0
X-Python-Version: >= 3.5
X-Python-Version: >= 3.11

Package: turnkey-chroot
Architecture: any
Architecture: all
Depends:
${misc:Depends},
${python3:Depends},
Expand Down