[elbe-devel] [PATCH 2/4] elbevalidate: add package to validate elbe images

Thomas Weißschuh thomas.weissschuh at linutronix.de
Thu May 23 09:30:42 CEST 2024


elbevalidate is a framework to introspect and validate images generated
by elbe. The framework is meant to be used in conjunction with a
unittesting library like pytest.
Images are not modified during testing, so tests are idempotent.

For now this package is experimental and only used for the
implementation of elbes own unittest.
In the future the goal is to stabilize the framework and expose it to
users of elbe.
Or the framework could be split into its own project to be used without
elbe.

Signed-off-by: Thomas Weißschuh <thomas.weissschuh at linutronix.de>
---
 debian/control                           |  15 ++-
 debian/python3-elbe-elbevalidate.install |   1 +
 elbevalidate/__init__.py                 | 154 +++++++++++++++++++++++++
 elbevalidate/constants.py                |  11 ++
 elbevalidate/path.py                     | 187 +++++++++++++++++++++++++++++++
 elbevalidate/pytest.py                   |  40 +++++++
 elbevalidate/test_elbevalidate.py        | 123 ++++++++++++++++++++
 setup.cfg                                |   2 +-
 setup.py                                 |   1 +
 9 files changed, 532 insertions(+), 2 deletions(-)

diff --git a/debian/control b/debian/control
index 8fb48c0e98df..97267c5b8561 100644
--- a/debian/control
+++ b/debian/control
@@ -27,11 +27,13 @@ Build-Depends: debhelper-compat (= 13),
   python3-lxml,
   python3-apt,
   python3-gpg,
+  python3-guestfs,
   python3-libvirt,
   python3-passlib,
   python3-pytest,
   python3-sphinx,
-  python3-sphinx-rtd-theme
+  python3-sphinx-rtd-theme,
+  linux-image-amd64 [amd64]
 Standards-Version: 3.9.6
 Rules-Requires-Root: no
 Homepage: http://elbe-rfs.org
@@ -208,3 +210,14 @@ Description: update daemon for embedded systems
  'elbe gen_update' command from the 'python3-elbe-buildenv' package) is placed
  in this directory the update will be applied.
  Downgrades are possible by the integrated SOAP interface.
+
+Package: python3-elbe-elbevalidate
+Architecture: all
+Depends: ${misc:Depends}, ${python3:Depends},
+  python3,
+  python3-elbe-bin (= ${binary:Version}),
+  python3-elbe-control (= ${binary:Version}),
+  python3-guestfs,
+  python3-pytest
+Description: Image validation framework
+ Python library to introspect and validate generated elbe images
diff --git a/debian/python3-elbe-elbevalidate.install b/debian/python3-elbe-elbevalidate.install
new file mode 100644
index 000000000000..fb8623c87d39
--- /dev/null
+++ b/debian/python3-elbe-elbevalidate.install
@@ -0,0 +1 @@
+usr/lib/python3.*/*-packages/elbevalidate/*.py
diff --git a/elbevalidate/__init__.py b/elbevalidate/__init__.py
new file mode 100644
index 000000000000..c43bfb50c47e
--- /dev/null
+++ b/elbevalidate/__init__.py
@@ -0,0 +1,154 @@
+import abc
+import collections
+import contextlib
+import dataclasses
+import functools
+import os
+import typing
+
+import guestfs
+
+from elbevalidate.constants import GPTPartitionType, PartitionLabel
+from elbevalidate.path import Path as ImagePath
+
+
+class BlockDevice(abc.ABC):
+    @property
+    @abc.abstractmethod
+    def size(self) -> int:
+        pass
+
+    @abc.abstractmethod
+    def blkid(self) -> dict:
+        pass
+
+    @abc.abstractmethod
+    def files(self) -> typing.ContextManager[ImagePath]:
+        pass
+
+
+def _blkid(instance):
+    d = instance._gfs.blkid(instance._gfs_blockdev)
+    for tag in ['DEVNAME', 'DISKSEQ']:
+        d.pop(tag, None)
+    return d
+
+
+ at dataclasses.dataclass
+class Partition(BlockDevice):
+    _parent: BlockDevice = dataclasses.field(repr=False)
+    number: int
+    type: str
+    start: int
+    _size: int
+
+    def __post_init__(self):
+        self._gfs_blockdev = self._parent._gfs_blockdev + str(self.number)
+
+    @property
+    def _gfs(self):
+        return self._parent._gfs
+
+    def blkid(self) -> dict:
+        return _blkid(self)
+
+    @property
+    def size(self) -> int:
+        return self._size
+
+    @contextlib.contextmanager
+    def files(self) -> collections.abc.Generator[ImagePath, None, None]:
+        mountpoint = '/'
+        self._gfs.mount_ro(self._gfs_blockdev, mountpoint)
+        try:
+            yield ImagePath(mountpoint, device=self._parent, guestfs=self._gfs)
+        finally:
+            self._gfs.umount(mountpoint)
+
+
+ at dataclasses.dataclass
+class PartitionTable:
+    label: PartitionLabel
+    sector_size: int
+    _partitions: list[Partition]
+
+    def __len__(self):
+        return len(self._partitions)
+
+    def __getitem__(self, key):
+        return self._partitions[key]
+
+
+class Image(BlockDevice):
+    def __init__(self, gfs):
+        self._gfs = gfs
+        self._gfs_blockdev = '/dev/sda'
+
+    @classmethod
+    @contextlib.contextmanager
+    def from_file(cls, image) -> collections.abc.Generator[typing.Self, None, None]:
+        gfs = guestfs.GuestFS(python_return_dict=True)
+        instance = cls(gfs)
+
+        with contextlib.closing(gfs):
+            gfs.add_drive_opts(os.fspath(image), readonly=True)
+            gfs.launch()
+
+            yield instance
+
+    def blkid(self) -> dict:
+        return _blkid(self)
+
+    @functools.cached_property
+    def size(self) -> int:
+        return self._gfs.blockdev_getsize64(self._gfs_blockdev)
+
+    def _get_part_type(self, parttype, partnum):
+        if parttype == PartitionLabel.DOS:
+            return '{:x}'.format(self._gfs.part_get_mbr_id(self._gfs_blockdev, partnum))
+        elif parttype == PartitionLabel.GPT:
+            return GPTPartitionType(self._gfs.part_get_gpt_type(self._gfs_blockdev, partnum))
+        else:
+            raise ValueError(parttype)
+
+    @functools.cached_property
+    def partitions(self) -> PartitionTable:
+        parttype = self._gfs.part_get_parttype(self._gfs_blockdev)
+        gfs_parts = self._gfs.part_list(self._gfs_blockdev)
+
+        partitions = [
+                Partition(_parent=self,
+                          number=p['part_num'], start=p['part_start'], _size=p['part_size'],
+                          type=self._get_part_type(parttype, p['part_num']))
+                for p in gfs_parts
+        ]
+
+        return PartitionTable(
+            label=PartitionLabel(parttype),
+            sector_size=self._gfs.blockdev_getss(self._gfs_blockdev),
+            _partitions=partitions,
+        )
+
+    @contextlib.contextmanager
+    def files(self) -> collections.abc.Generator[ImagePath, None, None]:
+        roots = self._gfs.inspect_os()
+        if len(roots) != 1:
+            raise ValueError(roots)
+
+        root = roots[0]
+
+        try:
+            mountpoints = self._gfs.inspect_get_mountpoints(root)
+            for device, mountpoint in sorted(mountpoints.items(),
+                                             key=lambda k: len(k[0])):
+                self._gfs.mount_ro(mountpoint, device)
+
+            yield ImagePath('/', device=self, guestfs=self._gfs)
+
+        finally:
+            self._gfs.umount_all()
+
+
+# This is a module-level API in the stdlib, so we do the same here.
+def statvfs(path: ImagePath):
+    return path._statvfs()
diff --git a/elbevalidate/constants.py b/elbevalidate/constants.py
new file mode 100644
index 000000000000..925db0c06ec2
--- /dev/null
+++ b/elbevalidate/constants.py
@@ -0,0 +1,11 @@
+import enum
+
+
+class PartitionLabel(enum.StrEnum):
+    DOS = 'msdos'
+    GPT = 'gpt'
+
+
+class GPTPartitionType(str):
+    EFI_SYSTEM_PARTITION = 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
+    LINUX_FILESYSTEM = '0FC63DAF-8483-4772-8E79-3D69D8477DE4'
diff --git a/elbevalidate/path.py b/elbevalidate/path.py
new file mode 100644
index 000000000000..936ae3fffeb3
--- /dev/null
+++ b/elbevalidate/path.py
@@ -0,0 +1,187 @@
+"""
+Classes mimicking pathlib.Path-like that operate on files within a libguestfs context.
+"""
+
+import contextlib
+import io
+import os
+import pathlib
+
+
+class _PurePath:
+    """
+    Reference to a path inside an image.
+    API is the same as of pathlib.
+
+    Pure variant that only provides path-manipulation methods.
+    """
+
+    def __init__(self, *pathsegments, device, guestfs, root=None):
+        self.device = device
+        self.root = root or self
+        self._p = pathlib.PurePosixPath(*pathsegments)
+        self._guestfs = guestfs
+
+    def joinpath(self, *pathsegments):
+        return type(self)(
+                self._p.joinpath(*pathsegments),
+                device=self.device,
+                guestfs=self._guestfs,
+                root=self.root,
+        )
+
+    def __truediv__(self, key):
+        try:
+            return self.joinpath(key)
+        except TypeError:
+            return NotImplemented
+
+    def __str__(self):
+        return str(self._p)
+
+    @property
+    def _path(self):
+        return str(self._p)
+
+    def __repr__(self):
+        return f'{self.__class__.__name__}({str(self)})'
+
+
+ at contextlib.contextmanager
+def _guestfs_ctx():
+    """
+    Map libguestfs exceptions to the matching standard Python exceptions.
+    """
+    _exception_mapping = [
+        ('Not a directory', NotADirectoryError),
+        ('No such file or directory', FileNotFoundError),
+    ]
+
+    try:
+        yield
+    except RuntimeError as e:
+        if len(e.args) != 1 or not isinstance(e.args[0], str):
+            raise
+
+        msg = e.args[0]
+
+        for s, t in _exception_mapping:
+            if msg.endswith(': ' + s):
+                raise t(msg) from e
+
+        raise
+
+
+class Path(_PurePath):
+    """
+    Reference to a path inside an image.
+    API is the same as of pathlib.
+
+    Normal variant containing IO functionality.
+    """
+
+    def iterdir(self):
+        with _guestfs_ctx():
+            for entry in self._guestfs.ls(self._path):
+                yield self / entry
+
+    def read_bytes(self):
+        with _guestfs_ctx():
+            return self._guestfs.read_file(self._path)
+
+    def open(self, mode='r', buffering=-1,
+             encoding=None, errors=None, newline=None):
+        buf = io.BytesIO(self.read_bytes())
+
+        if mode in ('', 'r'):
+            return io.TextIOWrapper(buf, encoding=encoding,
+                                    errors=errors, newline=newline)
+        elif mode in ('b', 'rb'):
+            return buf
+        else:
+            raise ValueError(f'Invalid mode {mode}')
+
+    def read_text(self, encoding=None, errors=None):
+        with self.open(encoding=encoding, errors=errors) as f:
+            return f.read()
+
+    @staticmethod
+    def _convert_stat(gstat):
+        return os.stat_result((
+            gstat['st_mode'], gstat['st_ino'],
+            gstat['st_dev'], gstat['st_nlink'],
+            gstat['st_uid'], gstat['st_gid'], gstat['st_size'],
+            gstat['st_atime_sec'], gstat['st_mtime_sec'],
+            gstat['st_ctime_sec'],
+        ))
+
+    def stat(self):
+        with _guestfs_ctx():
+            return self._convert_stat(self._guestfs.statns(self._path))
+
+    def lstat(self):
+        with _guestfs_ctx():
+            return self._convert_stat(self._guestfs.lstatns(self._path))
+
+    def readlink(self):
+        with _guestfs_ctx():
+            return type(self)(
+                self._guestfs.readlink(self._path),
+                device=self.device,
+                guestfs=self._guestfs,
+            )
+
+    def exists(self):
+        with _guestfs_ctx():
+            return self._guestfs.exists(self._path)
+
+    def is_dir(self):
+        with _guestfs_ctx():
+            return self._guestfs.is_dir(self._path)
+
+    def is_file(self):
+        with _guestfs_ctx():
+            return self._guestfs.is_file(self._path)
+
+    def is_mount(self):
+        raise NotImplementedError()
+
+    def is_symlink(self):
+        with _guestfs_ctx():
+            return self._guestfs.is_symlink(self._path)
+
+    def is_socket(self):
+        with _guestfs_ctx():
+            return self._guestfs.is_socket(self._path)
+
+    def is_fifo(self):
+        with _guestfs_ctx():
+            return self._guestfs.is_fifo(self._path)
+
+    def is_block_device(self):
+        with _guestfs_ctx():
+            return self._guestfs.is_blockdev(self._path)
+
+    def is_char_device(self):
+        with _guestfs_ctx():
+            return self._guestfs.is_chardev(self._path)
+
+    def owner(self):
+        uid = self.stat().st_uid
+        passwd = self.root.joinpath('etc', 'passwd').read_text()
+        for line in passwd.splitlines():
+            fields = line.split(':')
+            if fields[2] == str(uid):
+                return fields[0]
+
+        raise KeyError(str(uid))
+
+    def _statvfs(self):
+        with _guestfs_ctx():
+            stat = self._guestfs.statvfs(self._path)
+
+        return os.statvfs_result([
+            stat['bsize'], stat['frsize'], stat['blocks'], stat['bfree'], stat['bavail'],
+            stat['files'], stat['ffree'], stat['favail'],
+            stat['fsid'], stat['flag'], stat['namemax'],
+        ])
diff --git a/elbevalidate/pytest.py b/elbevalidate/pytest.py
new file mode 100644
index 000000000000..23b9302afd7b
--- /dev/null
+++ b/elbevalidate/pytest.py
@@ -0,0 +1,40 @@
+import os
+import pathlib
+
+import pytest
+
+
+class _ElbeValidationPlugin:
+    def pytest_addoption(self, parser):
+        group = parser.getgroup('elbevalidate')
+        group.addoption(
+            '--elbe-build-dir',
+            dest='elbe_build_dir',
+        )
+
+    @staticmethod
+    def _elbe_build_dir(config):
+        bd = config.option.elbe_build_dir
+        if bd is None:
+            raise ValueError('--elbe-build-dir was not specified')
+        return bd
+
+    @pytest.fixture(name='build_dir')
+    def build_dir_fixture(self, request):
+        return pathlib.Path(self._elbe_build_dir(request.config))
+
+    def pytest_report_header(self, config):
+        bd = self._elbe_build_dir(config)
+        return ['elbe build dir: ' + bd]
+
+
+def run_with_pytest(test_script: os.PathLike, build_dir: os.PathLike):
+    """
+    Run a Python source file through pytest.
+
+    :param test_script: Script to run.
+    :param build_dir: ELBE build directory to validate.
+                      Available to tests as fixture `build_dir` of :class:`pathlib.Path`.
+    """
+
+    pytest.main(['--elbe-build-dir', build_dir, test_script], [_ElbeValidationPlugin()])
diff --git a/elbevalidate/test_elbevalidate.py b/elbevalidate/test_elbevalidate.py
new file mode 100644
index 000000000000..2bc9545e4c47
--- /dev/null
+++ b/elbevalidate/test_elbevalidate.py
@@ -0,0 +1,123 @@
+import pathlib
+import struct
+import subprocess
+import tempfile
+import textwrap
+
+import elbevalidate
+
+
+def _round_to(n, g):
+    return n + (g - (n % g))
+
+
+def _make_disk(path, parts):
+    """ Create a basic MBR partition table. """
+
+    if len(parts) > 4:
+        raise ValueError(parts)
+
+    data_offset = 2 * 1024 * 1024  # 2MiB
+
+    header = bytearray(512)
+    current_data = data_offset
+
+    for i, part in enumerate(parts):
+        partbytes = bytearray(16)
+
+        rounded_size = _round_to(len(part), 512)
+
+        partbytes[0x04] = 0x83
+        partbytes[0x08:0x0C] = struct.pack('<I', current_data // 512)
+        partbytes[0x0C:0x10] = struct.pack('<I', rounded_size // 512)
+
+        part_start = 0x01BE + 16 * i
+        header[part_start:part_start + 16] = partbytes
+
+        current_data += rounded_size
+
+    header[0x01FE] = 0x55
+    header[0x01FF] = 0xAA
+
+    with path.open('wb') as f:
+        f.write(header)
+        f.write(b'\x00' * (data_offset - len(header)))
+
+        for part in parts:
+            rounded_size = _round_to(len(part), 512)
+            f.write(part)
+            f.write(bytearray(rounded_size - len(part)))
+
+
+def _make_partition(path):
+    assert path.is_dir()
+
+    with tempfile.NamedTemporaryFile() as t:
+        subprocess.run(
+            ['mksquashfs', path, t.name, '-noappend'],
+            check=True, capture_output=True,
+        )
+
+        return pathlib.Path(t.name).read_bytes()
+
+
+def test_elbevalidate(tmp_path):
+    part1_dir = tmp_path / 'part1'
+    part1_dir.mkdir()
+
+    part2_dir = tmp_path / 'part2'
+    part2_dir.mkdir()
+
+    part1_dir.joinpath('foo').write_text('foo')
+    part2_dir.joinpath('bar').write_text('bar')
+
+    etc = part1_dir / 'etc'
+    etc.mkdir()
+
+    etc.joinpath('fstab').write_text(textwrap.dedent("""
+        /dev/sda1   /       squashfs    defaults    0   0
+        /dev/sda2   /data   squashfs    defaults    0   0
+    """))
+
+    bin_ = part1_dir / 'bin'
+    bin_.mkdir()
+
+    data = part1_dir / 'data'
+    data.mkdir()
+
+    disk_file = tmp_path / 'disk.img'
+    part1 = _make_partition(part1_dir)
+    part2 = _make_partition(part2_dir)
+    _make_disk(disk_file, [part1, part2])
+
+    with elbevalidate.Image.from_file(disk_file) as image:
+
+        assert image.size == disk_file.stat().st_size
+
+        image_blkid = image.blkid()
+        assert image_blkid['PTTYPE'] == 'dos'
+
+        assert len(image.partitions) == 2
+
+        part0 = image.partitions[0]
+        assert part0.type == '83'
+        assert part0.size == len(part1) + 512
+
+        part0_blkid = part0.blkid()
+        assert part0_blkid['TYPE'] == 'squashfs'
+        assert 'DEVNAME' not in part0_blkid
+
+        with part0.files() as root:
+            assert root.joinpath('foo').exists()
+            assert not root.joinpath('bar').exists()
+
+            statvfs = elbevalidate.statvfs(root)
+            assert statvfs.f_blocks == 1
+            assert statvfs.f_files == 6
+            assert statvfs.f_bfree == 0
+            assert statvfs.f_ffree == 0
+
+        with image.files() as root:
+            assert root.joinpath('foo').exists()
+            assert not root.joinpath('bar').exists()
+            assert root.joinpath('data', 'bar').exists()
diff --git a/setup.cfg b/setup.cfg
index 282351e1f25b..24875da04911 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -5,4 +5,4 @@ extend-exclude =
 	elbe-project-*,
 	debian/,
 	initvm/
-application-import-names = elbepack
+application-import-names = elbepack elbevalidate
diff --git a/setup.py b/setup.py
index 3c10fc96b8ce..08ed390f601a 100644
--- a/setup.py
+++ b/setup.py
@@ -54,6 +54,7 @@ setup(name='elbe',
                 'elbepack.daemons',
                 'elbepack.daemons.soap',
                 'elbepack.schema',
+                'elbevalidate',
                 ],
       package_data={'elbepack': ['makofiles/*.mako',
                                  'init/*.mako',

-- 
2.45.1



More information about the elbe-devel mailing list