[elbe-devel] [PATCH 4/6] debinstaller: implement download of vmlinuz and initrd.gz

Torben Hohn torben.hohn at linutronix.de
Tue Aug 28 18:41:28 CEST 2018

the debinstaller module shall replace elbe-bootstrap.
it downloads debian installer linux kernel and initrd.gz from
a debian mirror. It does that in a secure manner, iE validating
Release.gpg and SHA256SUMS on the way.

filenames on the mirror:

Although this functionality is also provided by apt,
we implement it here in pure python, because outside
of the initvm, we can not rely on apt being available.

With this code in Place, we can remove the fallback
Code in pkgutils. We also will never see initvm build
failures due too new debian Releases requiring new
installer/elbe-bootstrap versions.

Signed-off-by: Torben Hohn <torben.hohn at linutronix.de>
 elbepack/commands/init.py |   4 +-
 elbepack/debinstaller.py  | 235 ++++++++++++++++++++++++++++++++++++++++++++++
 elbepack/pkgutils.py      |  30 +-----
 3 files changed, 240 insertions(+), 29 deletions(-)
 create mode 100644 elbepack/debinstaller.py

diff --git a/elbepack/commands/init.py b/elbepack/commands/init.py
index 7064cbc5..3f107b29 100644
--- a/elbepack/commands/init.py
+++ b/elbepack/commands/init.py
@@ -15,7 +15,7 @@ from optparse import OptionParser
 from elbepack.treeutils import etree
 from elbepack.validate import validate_xml
-from elbepack.pkgutils import copy_kinitrd, NoKinitrdException
+from elbepack.debinstaller import copy_kinitrd, NoKinitrdException
 from elbepack.xmldefaults import ElbeDefaults
 from elbepack.version import elbe_version
 from elbepack.templates import write_template, get_initvm_preseed
@@ -165,7 +165,7 @@ def run_command(argv):
         os.putenv("no_proxy", "localhost,")
-        copy_kinitrd(xml.node("/initvm"), out_path, defs, arch="amd64")
+        copy_kinitrd(xml.node("/initvm"), out_path)
     except NoKinitrdException as e:
         print("Failure to download kernel/initrd debian Package:")
diff --git a/elbepack/debinstaller.py b/elbepack/debinstaller.py
new file mode 100644
index 00000000..a49a86f0
--- /dev/null
+++ b/elbepack/debinstaller.py
@@ -0,0 +1,235 @@
+# ELBE - Debian Based Embedded Rootfilesystem Builder
+# Copyright (c) 2018 Torben Hohn <torben.hohn at linutronix.de>
+# SPDX-License-Identifier: GPL-3.0-or-later
+from __future__ import print_function
+import sys
+import os
+import re
+from urllib2 import urlopen
+from shutil import copyfileobj, copyfile
+from gpgme import Context
+from elbepack.filesystem import TmpdirFilesystem
+from elbepack.gpg import OverallStatus, check_signature
+from elbepack.shellhelper import CommandError, system
+from elbepack.hashes import HashValidator, HashValidationFailed
+class InvalidSignature(Exception):
+    pass
+class NoKinitrdException(Exception):
+    pass
+class ReleaseFile(HashValidator):
+    def __init__(self, base_url, fname, fname_list):
+        HashValidator.__init__(self, base_url)
+        header_re = re.compile(r'(\w+):(.*)')
+        hash_re   = re.compile(r' ([0-9a-f]+)\s+([0-9]+)\s+(\S+)')
+        current_header = ''
+        with open(fname, 'r') as fp:
+            for l in fp.readlines():
+                m = header_re.match(l)
+                if m:
+                    # line contains an rfc822 Header,
+                    # remember it.
+                    current_header = m.group(1)
+                    continue
+                m = hash_re.match(l)
+                if m:
+                    # line contains a hash entry.
+                    # check filename, whether we are interested in it
+                    if m.group(3) in fname_list:
+                        self.insert_fname_hash(current_header, m.group(3), m.group(1))
+class SHA256SUMSFile(HashValidator):
+    def __init__(self, base_url, fname, fname_list):
+        HashValidator.__init__(self, base_url)
+        hash_re   = re.compile(r'([0-9a-f]+)\s+(\S+)')
+        with open(fname, 'r') as fp:
+            for l in fp.readlines():
+                m = hash_re.match(l)
+                if m:
+                    # line contains a hash entry.
+                    # check filename, whether we are interested in it
+                    if m.group(2) in fname_list:
+                        self.insert_fname_hash("SHA256", m.group(2), m.group(1))
+def setup_apt_keyring(gpg_home, keyring_fname):
+    ring_path = os.path.join(gpg_home, keyring_fname)
+    if not os.path.isdir("/etc/apt/trusted.gpg.d"):
+        print("/etc/apt/trusted.gpg.d doesn't exist")
+        print("apt-get install debian-archive-keyring may "
+              "fix this problem")
+        sys.exit(20)
+    if os.path.exists("/etc/apt/trusted.gpg"):
+        system('cp /etc/apt/trusted.gpg "%s"' % ring_path)
+    gpg_options = '--keyring "%s" --no-auto-check-trustdb ' \
+                  '--trust-model always --no-default-keyring ' \
+                  '--homedir "%s"' % (ring_path, gpg_home)
+    trustkeys = os.listdir("/etc/apt/trusted.gpg.d")
+    for key in trustkeys:
+        print("Import %s: " % key)
+        try:
+            system('gpg %s --import "%s"' % (
+                gpg_options,
+                os.path.join("/etc/apt/trusted.gpg.d", key)))
+        except CommandError:
+            print('adding keyring "%s" to keyring "%s" failed' % (key, ring_path))
+def download(url, local_fname):
+    try:
+        rf = urlopen(url, None, 10)
+        with open(local_fname, "w") as wf:
+            copyfileobj(rf, wf)
+    finally:
+        rf.close()
+def download_release(tmp, base_url):
+    # setup gpg context, for verifying
+    # the Release.gpg signature.
+    os.environ['GNUPGHOME'] = tmp.fname('/')
+    ctx = Context()
+    # download the Relase file to a tmp file,
+    # because we need it 2 times
+    download(base_url + "Release", tmp.fname('Release'))
+    # validate signature.
+    # open downloaded plaintext file, and
+    # use the urlopen object of the Release.gpg
+    # directtly.
+    try:
+        sig = urlopen(base_url + 'Release.gpg', None, 10)
+        with tmp.open("Release", "r") as signed:
+            overall_status = OverallStatus()
+            # verify detached signature
+            sigs = ctx.verify(sig, signed, None)
+            for s in sigs:
+                status = check_signature(ctx, s)
+                overall_status.add(status)
+            if overall_status.to_exitcode():
+                raise InvalidSignature('Failed to verify Release file')
+    finally:
+        sig.close()
+def download_kinitrd(tmp, suite, mirror):
+    base_url = "%s/dists/%s/" % (
+        mirror.replace("LOCALMACHINE", "localhost"), suite)
+    installer_path = "main/installer-amd64/current/images/"
+    setup_apt_keyring(tmp.fname('/'), 'pubring.gpg')
+    # download release file and check
+    # signature
+    download_release(tmp, base_url)
+    # parse Release file, and remember hashvalues
+    # we are interested in
+    interesting = [installer_path + 'SHA256SUMS']
+    release_file = ReleaseFile(base_url, tmp.fname('Release'), interesting)
+    # now download and validate SHA256SUMS
+    release_file.download_and_validate_file(
+            installer_path + 'SHA256SUMS',
+            tmp.fname('SHA256SUMS'))
+    # now we have a valid SHA256SUMS file
+    # parse it
+    interesting = ['./cdrom/initrd.gz',
+                   './cdrom/vmlinuz',
+                   './netboot/debian-installer/amd64/initrd.gz',
+                   './netboot/debian-installer/amd64/linux']
+    sha256_sums = SHA256SUMSFile(
+            base_url + installer_path,
+            tmp.fname('SHA256SUMS'),
+            interesting)
+    # and then download the files we actually want
+    for p,ln in zip(interesting, ['initrd-cdrom.gz',
+                                  'linux-cdrom',
+                                  'initrd.gz',
+                                  'vmlinuz']):
+        sha256_sums.download_and_validate_file(
+                p,
+                tmp.fname(ln))
+def get_primary_mirror(prj):
+    if prj.has("mirror/primary_host"):
+        m = prj.node("mirror")
+        mirror = m.text("primary_proto") + "://"
+        mirror += m.text("primary_host") + "/"
+        mirror += m.text("primary_path")
+    else:
+        raise Exception("No mirror defined")
+    return mirror
+def copy_kinitrd(prj, target_dir):
+    suite =  prj.text("suite")
+    try:
+        if not prj.has("mirror/cdrom"):
+            mirror = get_primary_mirror(prj)
+            tmp = TmpdirFilesystem()
+            download_kinitrd(tmp, suite, mirror)
+            copyfile(tmp.fname("initrd-cdrom.gz"),
+                     os.path.join(target_dir, "initrd-cdrom.gz"))
+            copyfile(tmp.fname("initrd.gz"),
+                     os.path.join(target_dir, "initrd.gz"))
+            copyfile(tmp.fname("vmlinuz"),
+                     os.path.join(target_dir, "vmlinuz"))
+        else:
+            tmp = TmpdirFilesystem()
+            system('7z x -o%s "%s" initrd-cdrom.gz vmlinuz' %
+                   (tmp.fname('/'), prj.text("mirror/cdrom")))
+            copyfile(tmp.fname("initrd-cdrom.gz"),
+                     os.path.join(target_dir, "initrd-cdrom.gz"))
+            # initrd.gz needs to be cdrom version !
+            copyfile(tmp.fname("initrd-cdrom.gz"),
+                     os.path.join(target_dir, "initrd.gz"))
+            copyfile(tmp.fname("vmlinuz"),
+                     os.path.join(target_dir, "vmlinuz"))
+    except IOError as e:
+        raise NoKinitrdException('IoError %s' % e.message)
+    except InvalidSignature as e:
+        raise NoKinitrdException('InvalidSignature %s' % e.message)
+    except HashValidationFailed as e:
+        raise NoKinitrdException('HashValidationFailed %s' % e.message)
diff --git a/elbepack/pkgutils.py b/elbepack/pkgutils.py
index 12d401b5..799c8c71 100644
--- a/elbepack/pkgutils.py
+++ b/elbepack/pkgutils.py
@@ -7,6 +7,7 @@
 from __future__ import print_function
 # different module names in python 2 and 3
     import urllib.request
@@ -27,6 +28,7 @@ import hashlib
 from tempfile import mkdtemp
 from pkg_resources import parse_version as V
 from elbepack.shellhelper import CommandError, system
@@ -48,8 +50,6 @@ except ImportError as e:
     virtapt_imported = False
-class NoKinitrdException(Exception):
-    pass
 def get_sources_list(prj, defs):
@@ -108,7 +108,7 @@ def get_url(arch, suite, target_pkg, mirror, comp='main'):
         pack_url = "%s/dists/%s/%s/binary-%s/Packages" % (
             mirror.replace("LOCALMACHINE", "localhost"), suite, comp, arch)
-        packages = urllib2.urlopen(pack_url, None, 10)
+        packages = urlopen(pack_url, None, 10)
         packages = packages.readlines()
         packages = [x for x in packages if x.startswith("Filename")]
@@ -290,27 +290,3 @@ def extract_pkg(prj, target_dir, defs, package, arch="default",
                 system('ar p "%s" data.tar.xz | tar xJ -C "%s"' % (ppath,
         system('rm -f "%s"' % ppath)
-def copy_kinitrd(prj, target_dir, defs, arch="default"):
-    target_pkg = get_initrd_pkg(prj, defs)
-    try:
-        tmpdir = mkdtemp()
-        extract_pkg(prj, tmpdir, defs, target_pkg, arch)
-        # copy is done twice, because paths in elbe-bootstarp_1.0 and 0.9
-        # differ
-        initrd = os.path.join(tmpdir, 'var', 'lib', 'elbe', 'initrd')
-        if prj.has("mirror/cdrom"):
-            system('cp "%s" "%s"' % (os.path.join(initrd, 'initrd-cdrom.gz'),
-                                     os.path.join(target_dir, "initrd.gz")))
-        else:
-            system('cp "%s" "%s"' % (os.path.join(initrd, 'initrd.gz'),
-                                     os.path.join(target_dir, "initrd.gz")))
-        system('cp "%s" "%s"' % (os.path.join(initrd, 'vmlinuz'),
-                                 os.path.join(target_dir, "vmlinuz")))
-    finally:
-        system('rm -rf "%s"' % tmpdir)

More information about the elbe-devel mailing list