[elbe-devel] [PATCH 2/6] commands: check-build: Add cdrom checker
Torben Hohn
torben.hohn at linutronix.de
Wed Jun 24 16:18:15 CEST 2020
On Tue, Jun 23, 2020 at 12:31:06PM -0400, Olivier Dion wrote:
> This checker will validate the bin-cdrom.iso and src-cdrom*.iso of a
> build.
>
> It starts by generating a binary table using the source.xml. This
> table is then incrementally validated by inspecting the bin-cdrom.iso.
> Every time that a binary is validated, sources are added to the source
> table. The source table is not component aware yet.
>
> Then, the source table is validated the same way, by incrementally
> validating sources found in the src-cdrom*.iso.
ok.
This looks good.
Do we have the components in the source cdrom already ?
Or do we need to implement them ?
The hashes of dscs do not need be validated, i think.
I think they have already been validated by reprepro upon
insertion.
Please test, that reprepro validates things.
it might also be possible to use debmirror and have it
validate a whole cdrom at once.
At least: the hashes validation should be a different test/step
this test is already complex enough.
>
> Signed-off-by: Olivier Dion <dion at linutronix.de>
> ---
> elbepack/commands/check-build.py | 296 +++++++++++++++++++++++++++++++
> 1 file changed, 296 insertions(+)
>
> diff --git a/elbepack/commands/check-build.py b/elbepack/commands/check-build.py
> index 64da4d9c..f36f2abe 100644
> --- a/elbepack/commands/check-build.py
> +++ b/elbepack/commands/check-build.py
> @@ -3,6 +3,7 @@
> #
> # SPDX-License-Identifier: GPL-3.0-or-later
>
> +import glob
> import logging
> import multiprocessing.pool
> import optparse
> @@ -11,6 +12,10 @@ import tempfile
> import traceback
>
> from elbepack.log import elbe_logging
> +from elbepack.treeutils import etree
> +from elbepack.shellhelper import get_command_out, command_out, do, CommandError
> +
> +DEVNULL = open(os.devnull, "w")
>
> # TODO:py3 - Replace with tempfile.TempDirectory
> import shutil
> @@ -111,3 +116,294 @@ class CheckBase(object):
>
> def fail(self, reason):
> raise CheckException(reason)
> +
> + at CheckBase.register("skip-cdroms")
> +class CheckCdroms(CheckBase):
> +
> + """Check for cdroms integrity"""
> +
> + def extract_cdrom(self, tgt, cdrom):
> + try:
> + do('7z x -o"%s" "%s"' % (tgt, cdrom))
> + except CommandError as E:
> + self.fail("Failed to extract cdrom %s:\n%s" % (cdrom, E))
> +
> + def dpkg_get_infos(self, path, fmt):
> + """Get dpkg infos for .deb and .dsc file formats"""
> + try:
> + if path.endswith(".deb"):
> + cmd = 'dpkg -f "%s" %s' % (path, " ".join(fmt))
> + elif path.endswith(".dsc"):
> + cmd = 'grep -E "^(%s):" %s' % ("|".join(fmt), path)
> + return get_command_out(cmd).decode("utf-8")
> + except CommandError as E:
> + self.fail("Failed to get debian infos (%s) for %s:\n%s" %
> + ('|'.join(fmt), path, E))
> +
> + @staticmethod
> + def cmp_version(v1, v2):
> + return command_out("dpkg --compare-versions %s eq %s" %
> + (v1, v2), output=DEVNULL)[0]
> +
> + def do_src(self, sources, src_total):
> + """Check for sources in src-cdrom*"""
> +
> + # pylint: disable=too-many-locals
> + # pylint: disable=too-many-branches
> +
> + iso_it = glob.iglob("src-cdrom*")
> + src_cnt = 0
> +
> + # For every src-cdrom*, extract it to a temporary directory
> + # and find all *.dsc files
> + for cdrom in iso_it:
> + with TempDirectory() as tmp:
> + self.extract_cdrom(tmp, cdrom)
> + for _dir, _, files in os.walk(tmp):
> + for _file in files:
> + if not _file.endswith('.dsc'):
> + continue
> +
> + # TODO - Shall we verify compressed archives
> + # with sha256sum and compare with what's in
> + # *.dsc?
> +
> + infos = self.dpkg_get_infos(os.path.join(_dir, _file),
> + ["Source", "Version"])
> + src_name = None
> + src_version = None
> +
> + for info in infos.split('\n'):
> +
> + if info.startswith("Source:"):
> + src_name = info.split('Source:')[1].strip(' ')
> +
> + # Same as for the binary version. The
> + # PGP's signature contains a version field
> + elif info.startswith("Version:"):
> + if not src_version:
> + src_version = info.split('Version:')[1].strip(' ')
> +
> + if src_name in sources:
> +
> + match = False
> +
> + for version in sources[src_name]:
> +
> + # Found a matching version; prune it
> + if self.cmp_version(version, src_version) == 0:
> +
> + logging.info("Validating source %s_%s",
> + src_name, version)
> +
> + sources[src_name].remove(version)
> + src_cnt += 1
> + match = True
> +
> + break
> +
> + # NOTE! - Because the way the source table is
> + # generated, it's not possible to have multiple time
> + # the same version of a source (you have different
> + # versions only). However, this is totally possible
> + # for cdrom because of multiple components. Thus,
> + # whenever the source table can handle per component
> + # sources, this case should emit an error instead of
> + # a warning
> + if not match:
> + logging.warning("Can't find matching version for source %s_%s.\n"
> + "It might have already been validated",
> + src_name, src_version)
> + else:
> + logging.error("Extra source %s_%s found",
> + src_name, src_version)
> + self.ret = 1
> +
> + # List missing sources
> + for src_name in sources:
> + for src_version in sources[src_name]:
> + logging.error("Missing source %s_%s",
> + src_name, src_version)
> +
> + logging.info("Succesfully validated %d source packages out of %d",
> + src_cnt, src_total)
> +
> + if src_cnt != src_total:
> + self.ret = 1
> +
> + def do_bin(self):
> + """Check for binaries in bin-cdrom*.
> +
> + Return a tuple of the form ({ "source-name" : [versions ..] }, src_cnt).
> +
> + Put in others, validating binary cdroms will give us back a
> + dictionnary where the key are the source package name, and the
> + values of these keys are lists of versions. It will also
> + return a total count of source that is in the dictionnary.
> +
> + """
> +
> + # NOTE! - The binary table should be okay with the current
> + # bin-cdrom.iso. However, the way that the source table is genrated is
> + # not okay for now. Indeed, it's not aware of components. So for
> + # example, if two components have a binary in common, they will share
> + # the corresponding source in the source table. The solution is not as
> + # trivial as having a reference counter or appending to a list, because
> + # a component could have multiple binaries that match to the same source
> + # package. Thus, the only way is to make the source table component
> + # aware.
> +
> + # pylint: disable=too-many-locals
> + # pylint: disable=too-many-branches
> + # pylint: disable=too-many-statements
> +
> + # Every build has a source.xml where the list of binaries
> + # installed can be found
> + xml = etree("source.xml")
> +
> + # Initial statistics fo the build
> + bin_cnt = 0
> + src_cnt = 0
> + bin_total = 0
> +
> + binaries = {}
> + sources = {}
> +
> + # Create a dictionnary of the form {"bin-name": [versions ..]}
> + # from the source.xml. We do this by iterating over all <pkg>
> + for tag in xml.all("./*/pkg"):
> +
> + bin_pkg = tag.et.text
> +
> + # Package already in the dictionnary? Add its version.
> + # Otherwise, add a new entry into the dictionnary
> + if bin_pkg in binaries:
> + binaries[bin_pkg].append(tag.et.attrib["version"])
> + else:
> + binaries[bin_pkg] = [tag.et.attrib["version"]]
> +
> + bin_total += 1
> +
> + # For every bin-cdrom, create a temporary directory where to
> + # extract it and find all *.deb files
> + for cdrom in glob.glob("bin-cdrom*"):
> + with TempDirectory() as tmp:
> + self.extract_cdrom(tmp, cdrom)
> + for _dir, _, files in os.walk(tmp):
> + for _file in files:
> + if not _file.endswith(".deb"):
> + continue
> +
> + # Extract informations from .deb
> + deb_path = os.path.join(_dir, _file)
> + infos = self.dpkg_get_infos(deb_path, ["Package",
> + "Source",
> + "Version",
> + "Built-Using"])
> + src_name = None
> + src_version = None
> + bin_name = None
> + bin_version = None
> +
> + for line in infos.split('\n'):
> +
> + # Package: <PACKAGE>
> + if line.startswith("Package:"):
> + bin_name = line.split('Package:')[1].strip(' \t')
> +
> + # Version: <VERSION>
> + #
> + # Skip PGP's version. The package version is
> + # supposed to be before the PGP signature. However,
> + # the PGP signature will put a 'Version' field.
> + # Thus, let's check if we already have found a
> + # binary version and don't overwrite it
> + elif line.startswith("Version:"):
> + if not bin_version:
> + bin_version = line.split('Version:')[1].strip(' ')
> +
> + # Source: <SOURCE> [(VERSION)]
> + #
> + # This field is optional. If it is not present, the
> + # source package default to the bin package
> + elif line.startswith("Source:"):
> + src_infos = line.split('Source:')[1].strip(' ').split(' ')
> + src_name = src_infos[0]
> + if len(src_infos) > 1:
> + src_version = src_infos[1].strip("()")
> +
> + # Built-Using: <SRC (=VERSION)>...
> + #
> + # Sources list in the built-using field are
> + # seperated by a comma
> + elif line.startswith("Built-Using:"):
> +
> + built_using = line.split("Built-Using:")[1].strip(' ').split(',')
> +
> + for src in built_using:
> +
> + name, version = src.strip(' ').split(' ', 1)
> + version = version.strip("(= )")
> +
> + # TODO - This is not component aware!
> + if name in sources:
> + if version not in sources[name]:
> + sources[name].add(version)
> + src_cnt += 1
> + else:
> + src_cnt += 1
> + sources[name] = {version}
> +
> +
> + # No source was found
> + if src_name is None:
> + src_name = bin_name
> + src_version = bin_version
> +
> + # No source version was found
> + elif src_version is None:
> + src_version = bin_version
> +
> + # TODO - This is not component aware!
> + #
> + # Let's build a dictionnary of sources of the form
> + # {"source-name" : [versions ..]}. Same as the binary
> + # dictionnary before
> + if src_name in sources:
> + if src_version not in sources[src_name]:
> + sources[src_name].add(src_version)
> + src_cnt += 1
> + else:
> + sources[src_name] = {src_version}
> + src_cnt += 1
> +
> + # Prune version of this binary
> + bin_cnt += 1
> + try:
> + binaries[bin_name].remove(bin_version)
> + logging.info("Validating binary %s_%s",
> + bin_name, bin_version)
> + logging.info("Adding source %s_%s", src_name, src_version)
> + except KeyError:
> + logging.error("Foreign binary found %s_%s",
> + bin_name, bin_version)
> + self.ret = 1
> +
> + # List all missing binaries
> + for bin_name in binaries:
> + for bin_version in binaries[bin_name]:
> + logging.error("Missing binary %s_%s", bin_name, bin_version)
> +
> + logging.info("Succesfully validated %d binary packages out of %d",
> + bin_cnt, bin_total)
> +
> + if bin_cnt != bin_total:
> + self.ret = 1
> +
> + return sources, src_cnt
> +
> + def run(self):
> + sources, src_cnt = self.do_bin()
> + self.do_src(sources, src_cnt)
> + return self.ret
> +
> --
> 2.27.0
>
>
> _______________________________________________
> elbe-devel mailing list
> elbe-devel at linutronix.de
> https://lists.linutronix.de/mailman/listinfo/elbe-devel
--
Torben Hohn
Linutronix GmbH | Bahnhofstrasse 3 | D-88690 Uhldingen-Mühlhofen
Phone: +49 7556 25 999 18; Fax.: +49 7556 25 999 99
Hinweise zum Datenschutz finden Sie hier (Informations on data privacy
can be found here): https://linutronix.de/kontakt/Datenschutz.php
Linutronix GmbH | Firmensitz (Registered Office): Uhldingen-Mühlhofen |
Registergericht (Registration Court): Amtsgericht Freiburg i.Br., HRB700
806 | Geschäftsführer (Managing Directors): Heinz Egger, Thomas Gleixner
More information about the elbe-devel
mailing list