[elbe-devel] [PATCH 2/6] commands: check-build: Add cdrom checker

Olivier Dion dion at linutronix.de
Tue Jun 23 18:31:06 CEST 2020


This checker will validate the bin-cdrom.iso and src-cdrom*.iso of a
build.

It starts by generating a binary table using the source.xml.  This
table is then incrementally validated by inspecting the bin-cdrom.iso.
Every time that a binary is validated, sources are added to the source
table.  The source table is not component aware yet.

Then, the source table is validated the same way, by incrementally
validating sources found in the src-cdrom*.iso.

Signed-off-by: Olivier Dion <dion at linutronix.de>
---
 elbepack/commands/check-build.py | 296 +++++++++++++++++++++++++++++++
 1 file changed, 296 insertions(+)

diff --git a/elbepack/commands/check-build.py b/elbepack/commands/check-build.py
index 64da4d9c..f36f2abe 100644
--- a/elbepack/commands/check-build.py
+++ b/elbepack/commands/check-build.py
@@ -3,6 +3,7 @@
 #
 # SPDX-License-Identifier: GPL-3.0-or-later
 
+import glob
 import logging
 import multiprocessing.pool
 import optparse
@@ -11,6 +12,10 @@ import tempfile
 import traceback
 
 from elbepack.log import elbe_logging
+from elbepack.treeutils import etree
+from elbepack.shellhelper import get_command_out, command_out, do, CommandError
+
+DEVNULL = open(os.devnull, "w")
 
 # TODO:py3 - Replace with tempfile.TempDirectory
 import shutil
@@ -111,3 +116,294 @@ class CheckBase(object):
 
     def fail(self, reason):
         raise CheckException(reason)
+
+ at CheckBase.register("skip-cdroms")
+class CheckCdroms(CheckBase):
+
+    """Check for cdroms integrity"""
+
+    def extract_cdrom(self, tgt, cdrom):
+        try:
+            do('7z x -o"%s" "%s"' % (tgt, cdrom))
+        except CommandError as E:
+            self.fail("Failed to extract cdrom %s:\n%s" % (cdrom, E))
+
+    def dpkg_get_infos(self, path, fmt):
+        """Get dpkg infos for .deb and .dsc file formats"""
+        try:
+            if path.endswith(".deb"):
+                cmd = 'dpkg -f "%s" %s' % (path, " ".join(fmt))
+            elif path.endswith(".dsc"):
+                cmd = 'grep -E "^(%s):" %s' % ("|".join(fmt), path)
+            return get_command_out(cmd).decode("utf-8")
+        except CommandError as E:
+            self.fail("Failed to get debian infos (%s) for %s:\n%s" %
+                      ('|'.join(fmt), path, E))
+
+    @staticmethod
+    def cmp_version(v1, v2):
+        return command_out("dpkg --compare-versions %s eq %s" %
+                           (v1, v2), output=DEVNULL)[0]
+
+    def do_src(self, sources, src_total):
+        """Check for sources in src-cdrom*"""
+
+        # pylint: disable=too-many-locals
+        # pylint: disable=too-many-branches
+
+        iso_it  = glob.iglob("src-cdrom*")
+        src_cnt = 0
+
+        # For every src-cdrom*, extract it to a temporary directory
+        # and find all *.dsc files
+        for cdrom in iso_it:
+            with TempDirectory() as tmp:
+                self.extract_cdrom(tmp, cdrom)
+                for _dir, _, files in os.walk(tmp):
+                    for _file in files:
+                        if not _file.endswith('.dsc'):
+                            continue
+
+                        # TODO - Shall we verify compressed archives
+                        # with sha256sum and compare with what's in
+                        # *.dsc?
+
+                        infos = self.dpkg_get_infos(os.path.join(_dir, _file),
+                                                    ["Source", "Version"])
+                        src_name    = None
+                        src_version = None
+
+                        for info in infos.split('\n'):
+
+                            if info.startswith("Source:"):
+                                src_name = info.split('Source:')[1].strip(' ')
+
+                            # Same as for the binary version.  The
+                            # PGP's signature contains a version field
+                            elif info.startswith("Version:"):
+                                if not src_version:
+                                    src_version = info.split('Version:')[1].strip(' ')
+
+                        if src_name in sources:
+
+                            match = False
+
+                            for version in sources[src_name]:
+
+                                # Found a matching version; prune it
+                                if self.cmp_version(version, src_version) == 0:
+
+                                    logging.info("Validating source %s_%s",
+                                                 src_name, version)
+
+                                    sources[src_name].remove(version)
+                                    src_cnt += 1
+                                    match   = True
+
+                                    break
+
+                            # NOTE! - Because the way the source table is
+                            # generated, it's not possible to have multiple time
+                            # the same version of a source (you have different
+                            # versions only).  However, this is totally possible
+                            # for cdrom because of multiple components.  Thus,
+                            # whenever the source table can handle per component
+                            # sources, this case should emit an error instead of
+                            # a warning
+                            if not match:
+                                logging.warning("Can't find matching version for source %s_%s.\n"
+                                                "It might have already been validated",
+                                                src_name, src_version)
+                        else:
+                            logging.error("Extra source %s_%s found",
+                                          src_name, src_version)
+                            self.ret = 1
+
+        # List missing sources
+        for src_name in sources:
+            for src_version in sources[src_name]:
+                logging.error("Missing source %s_%s",
+                              src_name, src_version)
+
+        logging.info("Succesfully validated %d source packages out of %d",
+                     src_cnt, src_total)
+
+        if src_cnt != src_total:
+            self.ret = 1
+
+    def do_bin(self):
+        """Check for binaries in bin-cdrom*.
+
+        Return a tuple of the form ({ "source-name" : [versions ..] }, src_cnt).
+
+        Put in others, validating binary cdroms will give us back a
+        dictionnary where the key are the source package name, and the
+        values of these keys are lists of versions.  It will also
+        return a total count of source that is in the dictionnary.
+
+        """
+
+        # NOTE! - The binary table should be okay with the current
+        # bin-cdrom.iso.  However, the way that the source table is genrated is
+        # not okay for now.  Indeed, it's not aware of components.  So for
+        # example, if two components have a binary in common, they will share
+        # the corresponding source in the source table.  The solution is not as
+        # trivial as having a reference counter or appending to a list, because
+        # a component could have multiple binaries that match to the same source
+        # package.  Thus, the only way is to make the source table component
+        # aware.
+
+        # pylint: disable=too-many-locals
+        # pylint: disable=too-many-branches
+        # pylint: disable=too-many-statements
+
+        # Every build has a source.xml where the list of binaries
+        # installed can be found
+        xml = etree("source.xml")
+
+        # Initial statistics fo the build
+        bin_cnt   = 0
+        src_cnt   = 0
+        bin_total = 0
+
+        binaries = {}
+        sources  = {}
+
+        # Create a dictionnary of the form {"bin-name": [versions ..]}
+        # from the source.xml.  We do this by iterating over all <pkg>
+        for tag in xml.all("./*/pkg"):
+
+            bin_pkg = tag.et.text
+
+            # Package already in the dictionnary? Add its version.
+            # Otherwise, add a new entry into the dictionnary
+            if bin_pkg in binaries:
+                binaries[bin_pkg].append(tag.et.attrib["version"])
+            else:
+                binaries[bin_pkg] = [tag.et.attrib["version"]]
+
+            bin_total += 1
+
+        # For every bin-cdrom, create a temporary directory where to
+        # extract it and find all *.deb files
+        for cdrom in glob.glob("bin-cdrom*"):
+            with TempDirectory() as tmp:
+                self.extract_cdrom(tmp, cdrom)
+                for _dir, _, files in os.walk(tmp):
+                    for _file in files:
+                        if not _file.endswith(".deb"):
+                            continue
+
+                        # Extract informations from .deb
+                        deb_path = os.path.join(_dir, _file)
+                        infos    = self.dpkg_get_infos(deb_path, ["Package",
+                                                                  "Source",
+                                                                  "Version",
+                                                                  "Built-Using"])
+                        src_name    = None
+                        src_version = None
+                        bin_name    = None
+                        bin_version = None
+
+                        for line in infos.split('\n'):
+
+                            # Package: <PACKAGE>
+                            if line.startswith("Package:"):
+                                bin_name = line.split('Package:')[1].strip(' \t')
+
+                            # Version: <VERSION>
+                            #
+                            # Skip PGP's version.  The package version is
+                            # supposed to be before the PGP signature.  However,
+                            # the PGP signature will put a 'Version' field.
+                            # Thus, let's check if we already have found a
+                            # binary version and don't overwrite it
+                            elif line.startswith("Version:"):
+                                if not bin_version:
+                                    bin_version = line.split('Version:')[1].strip(' ')
+
+                            # Source: <SOURCE> [(VERSION)]
+                            #
+                            # This field is optional.  If it is not present, the
+                            # source package default to the bin package
+                            elif line.startswith("Source:"):
+                                src_infos = line.split('Source:')[1].strip(' ').split(' ')
+                                src_name = src_infos[0]
+                                if len(src_infos) > 1:
+                                    src_version = src_infos[1].strip("()")
+
+                            # Built-Using: <SRC (=VERSION)>...
+                            #
+                            # Sources list in the built-using field are
+                            # seperated by a comma
+                            elif line.startswith("Built-Using:"):
+
+                                built_using = line.split("Built-Using:")[1].strip(' ').split(',')
+
+                                for src in built_using:
+
+                                    name, version = src.strip(' ').split(' ', 1)
+                                    version = version.strip("(= )")
+
+                                    # TODO - This is not component aware!
+                                    if name in sources:
+                                        if version not in sources[name]:
+                                            sources[name].add(version)
+                                            src_cnt += 1
+                                    else:
+                                        src_cnt += 1
+                                        sources[name] = {version}
+
+
+                        # No source was found
+                        if src_name is None:
+                            src_name    = bin_name
+                            src_version = bin_version
+
+                        # No source version was found
+                        elif src_version is None:
+                            src_version = bin_version
+
+                        # TODO - This is not component aware!
+                        #
+                        # Let's build a dictionnary of sources of the form
+                        # {"source-name" : [versions ..]}. Same as the binary
+                        # dictionnary before
+                        if src_name in sources:
+                            if src_version not in sources[src_name]:
+                                sources[src_name].add(src_version)
+                                src_cnt += 1
+                        else:
+                            sources[src_name] = {src_version}
+                            src_cnt += 1
+
+                        # Prune version of this binary
+                        bin_cnt += 1
+                        try:
+                            binaries[bin_name].remove(bin_version)
+                            logging.info("Validating binary %s_%s",
+                                         bin_name, bin_version)
+                            logging.info("Adding source %s_%s", src_name, src_version)
+                        except KeyError:
+                            logging.error("Foreign binary found %s_%s",
+                                          bin_name, bin_version)
+                            self.ret = 1
+
+        # List all missing binaries
+        for bin_name in binaries:
+            for bin_version in binaries[bin_name]:
+                logging.error("Missing binary %s_%s", bin_name, bin_version)
+
+        logging.info("Succesfully validated %d binary packages out of %d",
+                     bin_cnt, bin_total)
+
+        if bin_cnt != bin_total:
+            self.ret = 1
+
+        return sources, src_cnt
+
+    def run(self):
+        sources, src_cnt = self.do_bin()
+        self.do_src(sources, src_cnt)
+        return self.ret
+
-- 
2.27.0




More information about the elbe-devel mailing list