[elbe-devel] [PATCH v2 2/6] commands: check-build: Add cdrom checker

Olivier Dion dion at linutronix.de
Tue Aug 4 18:07:07 CEST 2020


On Tue, 04 Aug 2020, Bastian Germann <bage at linutronix.de> wrote:
> Wow, that is a lot. Why don't you use python-apt's parser for all this
> stuff?

Correct me if I'm wrong.  But I don't think python-apt is required on
the host side.  I'm not sure if we want to use it more than in
rpcaptcache.  It's not well documented and kind of broken.

In any case, I want to see if there's a Debian tool that could do this
process for us.

>
> Am 03.08.20 um 18:40 schrieb Olivier Dion:
>> This checker will validate the bin-cdrom.iso and src-cdrom*.iso of a
>> build.
>> 
>> It starts by generating a binary table using the source.xml.  This
>> table is then incrementally validated by inspecting the bin-cdrom.iso.
>> Every time that a binary is validated, sources are added to the source
>> table.  The source table is not component aware yet.
>> 
>> Then, the source table is validated the same way, by incrementally
>> validating sources found in the src-cdrom*.iso.
>> 
>> Signed-off-by: Olivier Dion <dion at linutronix.de>
>> ---
>>  elbepack/commands/check-build.py | 290 +++++++++++++++++++++++++++++++
>>  elbepack/filesystem.py           |   7 +
>>  2 files changed, 297 insertions(+)
>> 
>> diff --git a/elbepack/commands/check-build.py b/elbepack/commands/check-build.py
>> index 2db8faf6..a39ac8ac 100644
>> --- a/elbepack/commands/check-build.py
>> +++ b/elbepack/commands/check-build.py
>> @@ -3,6 +3,7 @@
>>  #
>>  # SPDX-License-Identifier: GPL-3.0-or-later
>>  
>> +import glob
>>  import logging
>>  import optparse
>>  import os
>> @@ -95,3 +96,292 @@ class CheckBase(object):
>>  
>>      def fail(self, reason):
>>          raise CheckException(reason)
>> +
>> + at CheckBase.register("cdrom")
>> +class CheckCdroms(CheckBase):
>> +
>> +    """Check for cdroms integrity"""
>> +
>> +    def extract_cdrom(self, tgt, cdrom):
>> +        try:
>> +            do('7z x -o"%s" "%s"' % (tgt, cdrom))
>> +        except CommandError as E:
>> +            self.fail("Failed to extract cdrom %s:\n%s" % (cdrom, E))
>> +
>> +    def dpkg_get_infos(self, path, fmt):
>> +        """Get dpkg infos for .deb and .dsc file formats"""
>> +        try:
>> +            if path.endswith(".deb"):
>> +                cmd = 'dpkg -f "%s" %s' % (path, " ".join(fmt))
>> +            elif path.endswith(".dsc"):
>> +                cmd = 'grep -E "^(%s):" %s' % ("|".join(fmt), path)
>> +            return get_command_out(cmd).decode("utf-8")
>> +        except CommandError as E:
>> +            self.fail("Failed to get debian infos (%s) for %s:\n%s" %
>> +                      ('|'.join(fmt), path, E))
>> +
>> +    @staticmethod
>> +    def cmp_version(v1, v2):
>> +        return command_out("dpkg --compare-versions %s eq %s" %
>> +                           (v1, v2), output=DEVNULL)[0]
>> +
>> +    def do_src(self, sources, src_total):
>> +        """Check for sources in src-cdrom*"""
>> +
>> +        # pylint: disable=too-many-locals
>> +        # pylint: disable=too-many-branches
>> +
>> +        iso_it  = glob.iglob("src-cdrom*")
>> +        src_cnt = 0
>> +
>> +        # For every src-cdrom*, extract it to a temporary directory
>> +        # and find all *.dsc files
>> +        for cdrom in iso_it:
>> +            with TmpdirFilesystem() as tmp:
>> +                self.extract_cdrom(tmp, cdrom)
>> +                for _dir, _, files in os.walk(tmp):
>> +                    for _file in files:
>> +                        if not _file.endswith('.dsc'):
>> +                            continue
>> +
>> +                        infos = self.dpkg_get_infos(os.path.join(_dir, _file),
>> +                                                    ["Source", "Version"])
>> +                        src_name    = None
>> +                        src_version = None
>> +
>> +                        for info in infos.split('\n'):
>> +
>> +                            if info.startswith("Source:"):
>> +                                src_name = info.split('Source:')[1].strip(' ')
>> +
>> +                            # Same as for the binary version.  The
>> +                            # PGP's signature contains a version field
>> +                            elif info.startswith("Version:"):
>> +                                if not src_version:
>> +                                    src_version = info.split('Version:')[1].strip(' ')
>> +
>> +                        if src_name in sources:
>> +
>> +                            match = False
>> +
>> +                            for version in sources[src_name]:
>> +
>> +                                # Found a matching version; prune it
>> +                                if self.cmp_version(version, src_version) == 0:
>> +
>> +                                    logging.info("Validating source %s_%s",
>> +                                                 src_name, version)
>> +
>> +                                    sources[src_name].remove(version)
>> +                                    src_cnt += 1
>> +                                    match   = True
>> +
>> +                                    break
>> +
>> +                            # NOTE! - Because the way the source table is
>> +                            # generated, it's not possible to have multiple time
>> +                            # the same version of a source (you have different
>> +                            # versions only).  However, this is totally possible
>> +                            # for cdrom because of multiple components.  Thus,
>> +                            # whenever the source table can handle per component
>> +                            # sources, this case should emit an error instead of
>> +                            # a warning
>> +                            if not match:
>> +                                logging.warning("Can't find matching version for source %s_%s.\n"
>> +                                                "It might have already been validated",
>> +                                                src_name, src_version)
>> +                        else:
>> +                            logging.error("Extra source %s_%s found",
>> +                                          src_name, src_version)
>> +                            self.ret = 1
>> +
>> +        # List missing sources
>> +        for src_name in sources:
>> +            for src_version in sources[src_name]:
>> +                logging.error("Missing source %s_%s",
>> +                              src_name, src_version)
>> +
>> +        logging.info("Succesfully validated %d source packages out of %d",
>> +                     src_cnt, src_total)
>> +
>> +        if src_cnt != src_total:
>> +            self.ret = 1
>> +
>> +    def do_bin(self):
>> +        """Check for binaries in bin-cdrom*.
>> +
>> +        Return a tuple of the form ({ "source-name" : [versions ..] }, src_cnt).
>> +
>> +        Put in others, validating binary cdroms will give us back a
>> +        dictionnary where the key are the source package name, and the
>> +        values of these keys are lists of versions.  It will also
>> +        return a total count of source that is in the dictionnary.
>> +
>> +        """
>> +
>> +        # NOTE! - The binary table should be okay with the current
>> +        # bin-cdrom.iso.  However, the way that the source table is genrated is
>> +        # not okay for now.  Indeed, it's not aware of components.  So for
>> +        # example, if two components have a binary in common, they will share
>> +        # the corresponding source in the source table.  The solution is not as
>> +        # trivial as having a reference counter or appending to a list, because
>> +        # a component could have multiple binaries that match to the same source
>> +        # package.  Thus, the only way is to make the source table component
>> +        # aware.
>> +
>> +        # pylint: disable=too-many-locals
>> +        # pylint: disable=too-many-branches
>> +        # pylint: disable=too-many-statements
>> +
>> +        # Every build has a source.xml where the list of binaries
>> +        # installed can be found
>> +        xml = etree("source.xml")
>> +
>> +        # Initial statistics fo the build
>> +        bin_cnt   = 0
>> +        src_cnt   = 0
>> +        bin_total = 0
>> +
>> +        binaries = {}
>> +        sources  = {}
>> +
>> +        # Create a dictionnary of the form {"bin-name": [versions ..]}
>> +        # from the source.xml.  We do this by iterating over all <pkg>
>> +        for tag in xml.all("./*/pkg"):
>> +
>> +            bin_pkg = tag.et.text
>> +
>> +            # Package already in the dictionnary? Add its version.
>> +            # Otherwise, add a new entry into the dictionnary
>> +            if bin_pkg in binaries:
>> +                binaries[bin_pkg].append(tag.et.attrib["version"])
>> +            else:
>> +                binaries[bin_pkg] = [tag.et.attrib["version"]]
>> +
>> +            bin_total += 1
>> +
>> +        # For every bin-cdrom, create a temporary directory where to
>> +        # extract it and find all *.deb files
>> +        #
>> +        # pylint: disable=too-many-nested-blocks
>> +        for cdrom in glob.glob("bin-cdrom*"):
>> +            with TmpdirFilesystem() as tmp:
>> +                self.extract_cdrom(tmp, cdrom)
>> +                for _dir, _, files in os.walk(tmp):
>> +                    for _file in files:
>> +                        if not _file.endswith(".deb"):
>> +                            continue
>> +
>> +                        # Extract informations from .deb
>> +                        deb_path = os.path.join(_dir, _file)
>> +                        infos    = self.dpkg_get_infos(deb_path, ["Package",
>> +                                                                  "Source",
>> +                                                                  "Version",
>> +                                                                  "Built-Using"])
>> +                        src_name    = None
>> +                        src_version = None
>> +                        bin_name    = None
>> +                        bin_version = None
>> +
>> +                        for line in infos.split('\n'):
>> +
>> +                            # Package: <PACKAGE>
>> +                            if line.startswith("Package:"):
>> +                                bin_name = line.split('Package:')[1].strip(' \t')
>> +
>> +                            # Version: <VERSION>
>> +                            #
>> +                            # Skip PGP's version.  The package version is
>> +                            # supposed to be before the PGP signature.  However,
>> +                            # the PGP signature will put a 'Version' field.
>> +                            # Thus, let's check if we already have found a
>> +                            # binary version and don't overwrite it
>> +                            elif line.startswith("Version:"):
>> +                                if not bin_version:
>> +                                    bin_version = line.split('Version:')[1].strip(' ')
>> +
>> +                            # Source: <SOURCE> [(VERSION)]
>> +                            #
>> +                            # This field is optional.  If it is not present, the
>> +                            # source package default to the bin package
>> +                            elif line.startswith("Source:"):
>> +                                src_infos = line.split('Source:')[1].strip(' ').split(' ')
>> +                                src_name = src_infos[0]
>> +                                if len(src_infos) > 1:
>> +                                    src_version = src_infos[1].strip("()")
>> +
>> +                            # Built-Using: <SRC (=VERSION)>...
>> +                            #
>> +                            # Sources list in the built-using field are
>> +                            # seperated by a comma
>> +                            elif line.startswith("Built-Using:"):
>> +
>> +                                built_using = line.split("Built-Using:")[1].strip(' ').split(',')
>> +
>> +                                for src in built_using:
>> +
>> +                                    name, version = src.strip(' ').split(' ', 1)
>> +                                    version = version.strip("(= )")
>> +
>> +                                    # TODO - This is not component aware!
>> +                                    if name in sources:
>> +                                        if version not in sources[name]:
>> +                                            sources[name].add(version)
>> +                                            src_cnt += 1
>> +                                    else:
>> +                                        src_cnt += 1
>> +                                        sources[name] = {version}
>> +
>> +
>> +                        # No source was found
>> +                        if src_name is None:
>> +                            src_name    = bin_name
>> +                            src_version = bin_version
>> +
>> +                        # No source version was found
>> +                        elif src_version is None:
>> +                            src_version = bin_version
>> +
>> +                        # TODO - This is not component aware!
>> +                        #
>> +                        # Let's build a dictionnary of sources of the form
>> +                        # {"source-name" : [versions ..]}. Same as the binary
>> +                        # dictionnary before
>> +                        if src_name in sources:
>> +                            if src_version not in sources[src_name]:
>> +                                sources[src_name].add(src_version)
>> +                                src_cnt += 1
>> +                        else:
>> +                            sources[src_name] = {src_version}
>> +                            src_cnt += 1
>> +
>> +                        # Prune version of this binary
>> +                        bin_cnt += 1
>> +                        try:
>> +                            binaries[bin_name].remove(bin_version)
>> +                            logging.info("Validating binary %s_%s",
>> +                                         bin_name, bin_version)
>> +                            logging.info("Adding source %s_%s", src_name, src_version)
>> +                        except KeyError:
>> +                            logging.error("Foreign binary found %s_%s",
>> +                                          bin_name, bin_version)
>> +                            self.ret = 1
>> +
>> +        # List all missing binaries
>> +        for bin_name in binaries:
>> +            for bin_version in binaries[bin_name]:
>> +                logging.error("Missing binary %s_%s", bin_name, bin_version)
>> +
>> +        logging.info("Succesfully validated %d binary packages out of %d",
>> +                     bin_cnt, bin_total)
>> +
>> +        if bin_cnt != bin_total:
>> +            self.ret = 1
>> +
>> +        return sources, src_cnt
>> +
>> +    def run(self):
>> +        sources, src_cnt = self.do_bin()
>> +        self.do_src(sources, src_cnt)
>> +        return self.ret
>> +
>> diff --git a/elbepack/filesystem.py b/elbepack/filesystem.py
>> index 51697907..714aadd4 100644
>> --- a/elbepack/filesystem.py
>> +++ b/elbepack/filesystem.py
>> @@ -507,6 +507,13 @@ class TmpdirFilesystem (Filesystem):
>>      def delete(self):
>>          shutil.rmtree(self.path, True)
>>  
>> +    def __enter__(self):
>> +        return self.path
>> +
>> +    def __exit__(self, exec_type, exec_value, tb):
>> +        shutil.rmtree(self.path)
>> +        return False
>> +
>>  
>>  class ImgMountFilesystem(Filesystem):
>>      def __init__(self, mntpoint, dev):
>> 
-- 
Olivier Dion
Linutronix GmbH | Bahnhofstrasse 3 | D-88690 Uhldingen-Mühlhofen


More information about the elbe-devel mailing list