[elbe-devel] [PATCH v3 3/7] commands: check-build: Add cdrom checker

Torben Hohn torben.hohn at linutronix.de
Tue Aug 25 12:43:08 CEST 2020


On Thu, Aug 20, 2020 at 12:02:11PM -0400, Olivier Dion wrote:
> This checker will validate the bin-cdrom.iso and src-cdrom*.iso of a
> build.
> 
> It starts by generating a binary table using the source.xml.  This
> table is then incrementally validated by inspecting the bin-cdrom.iso.
> Every time that a binary is validated, sources are added to the source
> table.  The source table is not component aware yet.
> 
> Then, the source table is validated the same way, by incrementally
> validating sources found in the src-cdrom*.iso.
> 
> Signed-off-by: Olivier Dion <dion at linutronix.de>

Reviewed-by: Torben Hohn <torben.hohn at linutronix.de>

> ---
>  elbepack/commands/check-build.py | 283 +++++++++++++++++++++++++++++++
>  1 file changed, 283 insertions(+)
> 
> diff --git a/elbepack/commands/check-build.py b/elbepack/commands/check-build.py
> index a5d7656b..928d2bd8 100644
> --- a/elbepack/commands/check-build.py
> +++ b/elbepack/commands/check-build.py
> @@ -3,6 +3,7 @@
>  #
>  # SPDX-License-Identifier: GPL-3.0-or-later
>  
> +import glob
>  import logging
>  import optparse
>  import os
> @@ -94,3 +95,285 @@ class CheckBase(object):
>  
>      def fail(self, reason):
>          raise CheckException(reason)
> +
> + at CheckBase.register("cdrom")
> +class CheckCdroms(CheckBase):
> +
> +    """Check for cdroms integrity"""
> +
> +    def extract_cdrom(self, tgt, cdrom):
> +        try:
> +            do('7z x -o"%s" "%s"' % (tgt, cdrom))
> +        except CommandError as E:
> +            self.fail("Failed to extract cdrom %s:\n%s" % (cdrom, E))
> +
> +    def dpkg_get_infos(self, path, fmt):
> +        """Get dpkg infos for .deb and .dsc file formats"""
> +        try:
> +            if path.endswith(".deb"):
> +                cmd = 'dpkg -f "%s" %s' % (path, " ".join(fmt))
> +            elif path.endswith(".dsc"):
> +                cmd = 'grep -E "^(%s):" %s' % ("|".join(fmt), path)
> +            return get_command_out(cmd).decode("utf-8")
> +        except CommandError as E:
> +            self.fail("Failed to get debian infos (%s) for %s:\n%s" %
> +                      ('|'.join(fmt), path, E))
> +
> +    @staticmethod
> +    def cmp_version(v1, v2):
> +        return command_out("dpkg --compare-versions %s eq %s" %
> +                           (v1, v2), output=DEVNULL)[0]
> +
> +    def do_src(self, sources, src_total):
> +        """Check for sources in src-cdrom*"""
> +
> +        # pylint: disable=too-many-locals
> +        # pylint: disable=too-many-branches
> +
> +        iso_it  = glob.iglob("src-cdrom*")
> +        src_cnt = 0
> +
> +        # For every src-cdrom*, extract it to a temporary directory
> +        # and find all *.dsc files
> +        for cdrom in iso_it:
> +            with TmpdirFilesystem() as tmp:
> +                self.extract_cdrom(tmp.path, cdrom)
> +                for _, realpath in tmp.walk_files():
> +                    if not realpath.endswith('.dsc'):
> +                        continue
> +
> +                    infos = self.dpkg_get_infos(realpath,
> +                                                ["Source", "Version"])
> +                    src_name    = None
> +                    src_version = None
> +
> +                    for info in infos.split('\n'):
> +
> +                        if info.startswith("Source:"):
> +                            src_name = info.split('Source:')[1].strip(' ')
> +
> +                        # Same as for the binary version.  The
> +                        # PGP's signature contains a version field
> +                        elif info.startswith("Version:"):
> +                            if not src_version:
> +                                src_version = info.split('Version:')[1].strip(' ')
> +
> +                    if src_name in sources:
> +
> +                        match = False
> +
> +                        for version in sources[src_name]:
> +
> +                            # Found a matching version; prune it
> +                            if self.cmp_version(version, src_version) == 0:
> +
> +                                logging.info("Validating source %s_%s",
> +                                             src_name, version)
> +
> +                                sources[src_name].remove(version)
> +                                src_cnt += 1
> +                                match   = True
> +
> +                                break
> +
> +                        # NOTE! - Because the way the source table is
> +                        # generated, it's not possible to have multiple time
> +                        # the same version of a source (you have different
> +                        # versions only).  However, this is totally possible
> +                        # for cdrom because of multiple components.  Thus,
> +                        # whenever the source table can handle per component
> +                        # sources, this case should emit an error instead of
> +                        # a warning
> +                        if not match:
> +                            logging.warning("Can't find matching version for source %s_%s.\n"
> +                                            "It might have already been validated",
> +                                            src_name, src_version)
> +                    else:
> +                        logging.error("Extra source %s_%s found",
> +                                      src_name, src_version)
> +                        self.ret = 1
> +
> +        # List missing sources
> +        for src_name in sources:
> +            for src_version in sources[src_name]:
> +                logging.error("Missing source %s_%s",
> +                              src_name, src_version)
> +
> +        logging.info("Succesfully validated %d source packages out of %d",
> +                     src_cnt, src_total)
> +
> +        if src_cnt != src_total:
> +            self.ret = 1
> +
> +    def do_bin(self):
> +        """Check for binaries in bin-cdrom*.
> +
> +        Return a tuple of the form ({ "source-name" : [versions ..] }, src_cnt).
> +
> +        Put in other words, validating binary cdroms will give us back a
> +        dictionnary where the keys are the source package's name, and the values
> +        of these keys are lists of versions.  It will also return a total count
> +        of source that is in the dictionnary.
> +        """
> +
> +        # NOTE! - The binary table should be okay with the current
> +        # bin-cdrom.iso.  However, the way that the source table is genrated is
> +        # not okay for now.  Indeed, it's not aware of components.  So for
> +        # example, if two components have a binary in common, they will share
> +        # the corresponding source in the source table.  The solution is not as
> +        # trivial as having a reference counter or appending to a list, because
> +        # a component could have multiple binaries that match to the same source
> +        # package.  Thus, the only way is to make the source table component
> +        # aware.
> +
> +        # pylint: disable=too-many-locals
> +        # pylint: disable=too-many-branches
> +        # pylint: disable=too-many-statements
> +
> +        # Every build has a source.xml where the list of binaries
> +        # installed can be found
> +        xml = etree("source.xml")
> +
> +        # Initial statistics fo the build
> +        bin_cnt   = 0
> +        src_cnt   = 0
> +        bin_total = 0
> +
> +        binaries = {}
> +        sources  = {}
> +
> +        # Create a dictionnary of the form {"bin-name": [versions ..]}
> +        # from the source.xml.  We do this by iterating over all <pkg>
> +        for tag in xml.all("./*/pkg"):
> +
> +            bin_pkg = tag.et.text
> +
> +            # Package already in the dictionnary? Add its version.
> +            # Otherwise, add a new entry into the dictionnary
> +            if bin_pkg in binaries:
> +                binaries[bin_pkg].append(tag.et.attrib["version"])
> +            else:
> +                binaries[bin_pkg] = [tag.et.attrib["version"]]
> +
> +            bin_total += 1
> +
> +        # For every bin-cdrom, create a temporary directory where to
> +        # extract it and find all *.deb files
> +        for cdrom in glob.glob("bin-cdrom*"):
> +            with TmpdirFilesystem() as tmp:
> +                self.extract_cdrom(tmp.path, cdrom)
> +                for _, realpath in tmp.walk_files():
> +                    if not realpath.endswith('.deb'):
> +                        continue
> +
> +                    # Extract informations from .deb
> +                    infos    = self.dpkg_get_infos(realpath, ["Package",
> +                                                              "Source",
> +                                                              "Version",
> +                                                              "Built-Using"])
> +                    src_name    = None
> +                    src_version = None
> +                    bin_name    = None
> +                    bin_version = None
> +
> +                    for line in infos.split('\n'):
> +
> +                        # Package: <PACKAGE>
> +                        if line.startswith("Package:"):
> +                            bin_name = line.split('Package:')[1].strip(' \t')
> +
> +                        # Version: <VERSION>
> +                        #
> +                        # Skip PGP's version.  The package version is
> +                        # supposed to be before the PGP signature.  However,
> +                        # the PGP signature will put a 'Version' field.
> +                        # Thus, let's check if we already have found a
> +                        # binary version and don't overwrite it
> +                        elif line.startswith("Version:"):
> +                            if not bin_version:
> +                                bin_version = line.split('Version:')[1].strip(' ')
> +
> +                        # Source: <SOURCE> [(VERSION)]
> +                        #
> +                        # This field is optional.  If it is not present, the
> +                        # source package default to the bin package
> +                        elif line.startswith("Source:"):
> +                            src_infos = line.split('Source:')[1].strip(' ').split(' ')
> +                            src_name = src_infos[0]
> +                            if len(src_infos) > 1:
> +                                src_version = src_infos[1].strip("()")
> +
> +                        # Built-Using: <SRC (=VERSION)>...
> +                        #
> +                        # Sources list in the built-using field are
> +                        # seperated by a comma
> +                        elif line.startswith("Built-Using:"):
> +
> +                            built_using = line.split("Built-Using:")[1].strip(' ').split(',')
> +
> +                            for src in built_using:
> +
> +                                name, version = src.strip(' ').split(' ', 1)
> +                                version = version.strip("(= )")
> +
> +                                # TODO - This is not component aware!
> +                                if name in sources:
> +                                    if version not in sources[name]:
> +                                        sources[name].add(version)
> +                                        src_cnt += 1
> +                                else:
> +                                    src_cnt += 1
> +                                    sources[name] = {version}
> +
> +
> +                    # No source was found
> +                    if src_name is None:
> +                        src_name    = bin_name
> +                        src_version = bin_version
> +
> +                    # No source version was found
> +                    elif src_version is None:
> +                        src_version = bin_version
> +
> +                    # TODO - This is not component aware!
> +                    #
> +                    # Let's build a dictionnary of sources of the form
> +                    # {"source-name" : [versions ..]}. Same as the binary
> +                    # dictionnary before
> +                    if src_name in sources:
> +                        if src_version not in sources[src_name]:
> +                            sources[src_name].add(src_version)
> +                            src_cnt += 1
> +                    else:
> +                        sources[src_name] = {src_version}
> +                        src_cnt += 1
> +
> +                    # Prune version of this binary
> +                    bin_cnt += 1
> +                    try:
> +                        binaries[bin_name].remove(bin_version)
> +                        logging.info("Validating binary %s_%s",
> +                                     bin_name, bin_version)
> +                        logging.info("Adding source %s_%s", src_name, src_version)
> +                    except KeyError:
> +                        logging.error("Foreign binary found %s_%s",
> +                                      bin_name, bin_version)
> +                        self.ret = 1
> +
> +        # List all missing binaries
> +        for bin_name in binaries:
> +            for bin_version in binaries[bin_name]:
> +                logging.error("Missing binary %s_%s", bin_name, bin_version)
> +
> +        logging.info("Succesfully validated %d binary packages out of %d",
> +                     bin_cnt, bin_total)
> +
> +        if bin_cnt != bin_total:
> +            self.ret = 1
> +
> +        return sources, src_cnt
> +
> +    def run(self):
> +        sources, src_cnt = self.do_bin()
> +        self.do_src(sources, src_cnt)
> +        return self.ret
> -- 
> 2.28.0
> 
> _______________________________________________
> elbe-devel mailing list
> elbe-devel at linutronix.de
> https://lists.linutronix.de/mailman/listinfo/elbe-devel

-- 
Torben Hohn
Linutronix GmbH | Bahnhofstrasse 3 | D-88690 Uhldingen-Mühlhofen
Phone: +49 7556 25 999 18; Fax.: +49 7556 25 999 99

Hinweise zum Datenschutz finden Sie hier (Informations on data privacy 
can be found here): https://linutronix.de/kontakt/Datenschutz.php

Linutronix GmbH | Firmensitz (Registered Office): Uhldingen-Mühlhofen | 
Registergericht (Registration Court): Amtsgericht Freiburg i.Br., HRB700 
806 | Geschäftsführer (Managing Directors): Heinz Egger, Thomas Gleixner


More information about the elbe-devel mailing list