[elbe-devel] [PATCH 1/1] commands check-build: Create build checker

Olivier Dion dion at linutronix.de
Wed May 13 23:48:46 CEST 2020


The new command 'elbe check-build [options] <build-dir>' allows the
user to check the integrity of a project's build.

This command run on the host and can performs several tests
asynchronously.

As for now, only one test is available and it checks if the bin-cdrom*
and src-cdrom* are matching the source.xml.

To add more test, one can simply create a class that derived from
'CheckBase' and implement the 'run' method.  Then, the decorator
'@CheckBase.register(skip_if)' must be used on the class, where
skip_if is string that represent a flag passed to the checker to skip
the test.

For example:
----------------------------------------------------------------------
@CheckBase.register("skip-this")
class CheckThis(CheckBase):
      def run(self):
          pass
----------------------------------------------------------------------
this test could be skip if the flag '--skip-this' is provided.

Current working directory is changed to the build directory before
starting tests, thus tests can assume that their current working
directory correspond to the build directory.  Tests can also assume
that they have their own address space.

Tests that failed should set the 'self.fail' variable to True before
returning from the 'run' method.  If any exception is raised and not
catch inside 'run', 'self.fail' is set to True.

Signed-off-by: Olivier Dion <dion at linutronix.de>
---
 elbepack/commands/check-build.py | 334 +++++++++++++++++++++++++++++++
 1 file changed, 334 insertions(+)
 create mode 100644 elbepack/commands/check-build.py

diff --git a/elbepack/commands/check-build.py b/elbepack/commands/check-build.py
new file mode 100644
index 00000000..408c7b62
--- /dev/null
+++ b/elbepack/commands/check-build.py
@@ -0,0 +1,334 @@
+# ELBE - Debian Based Embedded Rootfilesystem Builder
+# Copyright (c) 2020 Olivier Dion <dion at linutronix.de>
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+import multiprocessing
+import optparse
+import os
+import subprocess
+import sys
+import tempfile
+import traceback
+
+from elbepack.treeutils import etree
+from elbepack.shellhelper import command_out, system_out
+
+# TODO - Replace with subprocess.DEVNULL for Python3
+DEVNULL = open(os.devnull, "w")
+
+# TODO - Replace with tempfile.TempDirectory for Python3
+# This is a simple work around for Python2
+import shutil
+class TempDirectory(object):
+
+    def __init__(self):
+        self._dir = tempfile.mkdtemp(prefix='elbe')
+
+    def __enter__(self):
+        return self._dir
+
+    def __exit__(self, exec_type, exec_value, tb):
+        shutil.rmtree(self._dir)
+        return False
+
+
+def get_files_with_prefix(prefix, _dir="."):
+    # TODO - Change os.listdir for os.scandir in Python3
+    return [path
+            for path in os.listdir(_dir)
+            if path.startswith(prefix)]
+
+def dpkg_get_infos(path, fmt):
+    if path.endswith(".deb"):
+        cmd = 'dpkg -f "%s" %s' % (path, " ".join(fmt))
+    elif path.endswith(".dsc"):
+        cmd = 'grep -E "^(%s):" %s' % ("|".join(fmt), path)
+    return system_out(cmd)
+
+class CheckBase(object):
+
+    _tests = []
+
+    def __init__(self):
+        self.ret = 0
+
+    def __call__(self, r, w):
+        os.close(r)
+        try:
+            os.dup2(w, os.sys.stdout.fileno())
+            os.dup2(w, os.sys.stderr.fileno())
+            os.sys.stdout = os.fdopen(os.sys.stdout.fileno(), "w", 1)
+            os.sys.stderr = os.fdopen(os.sys.stderr.fileno(), "w", 1)
+            os.sys.__stdout__ = os.sys.stdout
+            os.sys.__stderr__ = os.sys.stderr
+            self.run()
+        # pylint: disable=bare-except
+        except:
+            print(traceback.format_exc())
+            self.ret = 1
+        finally:
+            os.close(w)
+            sys.exit(self.ret)
+
+    @classmethod
+    def register(cls, skip_if):
+        def _register(test):
+            cls._tests.append((test, skip_if))
+            return test
+        return _register
+
+    # pylint: disable=no-self-use
+    def run(self):
+        raise Exception("Check run method not implemented")
+
+ at CheckBase.register("skip-cdroms")
+class CheckCdroms(CheckBase):
+
+    """Check for cdroms integrity"""
+
+    @staticmethod
+    def extract_cdrom(tgt, cdrom):
+        print("Extracting cdrom %s " % cdrom)
+        command_out('7z x -o"%s" "%s"' % (tgt, cdrom), output=DEVNULL)
+
+    def do_bin(self):
+
+        # pylint: disable=too-many-locals
+        # pylint: disable=too-many-branches
+        # pylint: disable=too-many-statements
+
+        xml = etree("source.xml")
+        bin_lst = get_files_with_prefix("bin-cdrom")
+        bin_cnt = 0
+        bin_total = 0
+        src_total = 0
+
+        binaries = {}
+        sources = {}
+
+        # Create a dictionnary of the form { "bin-name": [versions ..] }
+        for tag in xml.all("./*/pkg"):
+            bin_pkg = tag.et.text
+
+            if bin_pkg in binaries:
+                binaries[bin_pkg].append(tag.et.attrib["version"])
+            else:
+                binaries[bin_pkg] = [tag.et.attrib["version"]]
+
+            bin_total += 1
+
+        for cdrom in bin_lst:
+            with TempDirectory() as tmp:
+
+                self.extract_cdrom(tmp, cdrom)
+
+                # For all *.deb files
+                for _dir, _, files in os.walk(tmp):
+
+                    for _file in files:
+
+                        if not _file.endswith(".deb"):
+                            continue
+
+                        # Extract the Package, Source and Version
+                        deb_path = os.path.join(_dir, _file)
+                        infos    = dpkg_get_infos(deb_path, ["Package",
+                                                             "Source",
+                                                             "Version"])
+                        # Let's extract the wanted infos
+                        src_name    = None
+                        src_version = None
+                        bin_name    = None
+                        bin_version = None
+                        for line in infos.split('\n'):
+
+                            if line.startswith("Package"):
+                                bin_name = line.split(' ')[1]
+
+                            # Skip GPG version
+                            elif line.startswith("Version") and not bin_version:
+                                bin_version = line.split(' ')[1]
+
+                            elif line.startswith("Source"):
+                                # "Source:", NAME, [VERSION]
+                                src_infos = line.split(' ')
+                                # Some sources have a version attached to them.
+                                # This override the version of the binary
+                                if len(src_infos) > 2:
+                                    src_version = src_infos[2].strip("()")
+
+                                src_name = src_infos[1]
+
+
+                        # No version for source was found.  Let's take the
+                        # binary version then.
+                        if not src_version:
+                            src_version = bin_version
+
+                        # No source where found; Let's use the binary name
+                        if src_name is None:
+                            src_name = bin_name
+
+                        # Let's build a dictionnary of sources of the form
+                        # { "source-name" : {versions ..} }
+
+                        if src_name in sources:
+                            sources[src_name].append(src_version)
+                        else:
+                            sources[src_name] = [src_version]
+
+                        # Prune version of this binary
+                        try:
+                            binaries[bin_name].remove(bin_version)
+                            bin_cnt += 1
+                            src_total += 1
+                        except KeyError:
+                            print("Unknown binary found %s_%s" % (bin_name, bin_version))
+
+        # List all missing binaries
+        for bin_name in binaries:
+            for bin_version in binaries[bin_name]:
+                print("Missing binary %s_%s" % (bin_name, bin_version))
+
+        print("Succesfully validated %d binary packages out of %d" % (bin_cnt, bin_total))
+
+        if bin_cnt != bin_total:
+            self.ret = 1
+
+        return sources, src_total
+
+    def do_src(self, sources, src_total):
+
+        # pylint: disable=too-many-locals
+        # pylint: disable=too-many-branches
+
+        iso_lst = get_files_with_prefix("src-cdrom")
+        src_cnt = 0
+
+        for cdrom in iso_lst:
+
+            with TempDirectory() as tmp:
+
+                self.extract_cdrom(tmp, cdrom)
+
+                for _dir, _, files in os.walk(tmp):
+
+                    for _file in files:
+
+                        if not _file.endswith('.dsc'):
+                            continue
+
+                        infos = dpkg_get_infos(os.path.join(_dir, _file),
+                                               ["Source", "Version"])
+                        src_name    = None
+                        src_version = None
+
+                        for info in infos.split('\n'):
+
+                            if info.startswith('Source'):
+                                src_name = info.split(' ')[1]
+
+                            # Skip GPG version
+                            elif info.startswith('Version') and not src_version:
+                                src_version = info.split(' ')[1]
+
+                        if src_name in sources:
+                            matches = []
+                            for version in sources[src_name]:
+                                ret = command_out("dpkg --compare-versions %s eq %s" %
+                                                  (version, src_version),
+                                                  output=DEVNULL)
+
+                                if ret[0] == 0:
+                                    matches.append(version)
+
+                            for match in matches:
+                                sources[src_name].remove(match)
+                                src_cnt += 1
+                        else:
+                            print("Extra source %s_%s found" % (src_name, src_version))
+                            self.ret = 1
+
+        for src_name in sources:
+            for src_version in sources[src_name]:
+                print("Missing source %s_%s" % (src_name, src_version))
+
+        print("Succesfully validated %d source packages out of %d" % (src_cnt, src_total))
+
+        if src_cnt != src_total:
+            self.ret = 1
+
+    def run(self):
+        sources, src_total = self.do_bin()
+        self.do_src(sources, src_total)
+
+def run_async_test(cls):
+    r, w = os.pipe()
+    t = multiprocessing.Process(target=cls(), args=(r, w))
+    t.daemon = False
+
+    print("Starting test : %s (%s)" % (cls.__name__, cls.__doc__))
+
+    t.start()
+    os.close(w)
+    return (t, r)
+
+def run_command(argv):
+
+    oparser = optparse.OptionParser(usage="usage: %prog check-build [options] <build-dir>")
+
+    # pylint: disable=protected-access
+    for _, skip_if in CheckBase._tests:
+        oparser.add_option("--%s" % skip_if, action="store_true",
+                           dest=skip_if, default=False,
+                           help="Skip test cdroms integrity")
+
+    (opt, args) = oparser.parse_args(argv)
+
+    if not args:
+        print("No build directory specified")
+        oparser.print_help()
+        sys.exit(20)
+
+    os.chdir(args[0])
+
+    opt       = vars(opt)
+    tests     = []
+    total_cnt = 0
+    fail_cnt  = 0
+
+    # pylint: disable=protected-access
+    for cls, skip_if in CheckBase._tests:
+
+        if not opt[skip_if]:
+            thread, pipe = run_async_test(cls)
+            tests.append((thread, pipe, cls))
+            total_cnt += 1
+
+    for test, pipe, cls in tests:
+
+        test.join()
+
+        if test.exitcode:
+            fail_cnt += 1
+
+        print("======================================================================")
+        print('Result of test %s (%s) - %s' % (cls.__name__, cls.__doc__,
+                                               "PASSED" if test.exitcode == 0 else "FAILED"))
+        print("======================================================================")
+
+        while True:
+            try:
+                buf = os.read(pipe, 80)
+                if not buf:
+                    break
+                os.write(os.sys.stdout.fileno(), buf)
+            except OSError:
+                break
+
+        print("======================================================================")
+
+    print("Passed %d tests ouf of %d" % (total_cnt - fail_cnt, total_cnt))
+
+    sys.exit(0 if fail_cnt == 0 else 1)
-- 
2.26.2




More information about the elbe-devel mailing list