[elbe-devel] [PATCH 4/9] virtapt: massive refactoring to make it work again

Torben Hohn torben.hohn at linutronix.de
Wed Jun 26 11:50:23 CEST 2019


virtapt went through a big refactoring, for the get_uri function for
the sdk. The sdk is implemented differently now, and the single remaining
user of virtapt is "elbe check_updates".

The result was, that check update did not work anymore.

Clean it up and make <raw-key> work.

- use TmpdirFilesystem (removes os.path.join() usage)
- make constructor take an ElbeXML object
- drop the old gpg code, and add code supporting <raw-key>
- do not rely on <sources_list> and <apt_prefs>, but generate
  them using the same code that a normal elbe build uses

adjust check_update (the sole user of virtapt) to the new constructor
signature.

Signed-off-by: Torben Hohn <torben.hohn at linutronix.de>
---
 elbepack/commands/check_updates.py |  16 +---
 elbepack/virtapt.py                | 149 +++++++++++++++++--------------------
 2 files changed, 71 insertions(+), 94 deletions(-)

diff --git a/elbepack/commands/check_updates.py b/elbepack/commands/check_updates.py
index 2e5da5a1d..2d75a385f 100644
--- a/elbepack/commands/check_updates.py
+++ b/elbepack/commands/check_updates.py
@@ -16,6 +16,7 @@ from elbepack.treeutils import etree
 from elbepack import virtapt
 from elbepack.validate import validate_xml
 from elbepack.xmldefaults import ElbeDefaults
+from elbepack.elbexml import ElbeXML
 
 
 def run_command(argv):
@@ -51,27 +52,18 @@ def run_command(argv):
 
     print("checking %s" % args[0])
 
-    xml = etree(args[0])
+    xml = ElbeXML(args[0])
 
-    if xml.has("project/buildtype"):
-        buildtype = xml.text("/project/buildtype")
-    else:
-        buildtype = "nodefaults"
-
-    defs = ElbeDefaults(buildtype)
-
-    arch = xml.text("project/buildimage/arch", default=defs, key="arch")
-    suite = xml.text("project/suite")
 
     apt_sources = xml.text("sources_list").replace("10.0.2.2", "localhost")
     apt_prefs = xml.text("apt_prefs")
 
     fullp = xml.node("fullpkgs")
 
-    v = virtapt.VirtApt(arch, suite, apt_sources, apt_prefs)
+    v = virtapt.VirtApt(xml)
 
     d = virtapt.apt_pkg.DepCache(v.cache)
-    d.read_pinfile(v.projectpath + "/etc/apt/preferences")
+    d.read_pinfile(v.basefs.path + "/etc/apt/preferences")
 
     for p in fullp:
         pname = p.et.text
diff --git a/elbepack/virtapt.py b/elbepack/virtapt.py
index c48bc9bee..7b836b1cf 100644
--- a/elbepack/virtapt.py
+++ b/elbepack/virtapt.py
@@ -11,8 +11,6 @@ from __future__ import print_function
 import os
 import sys
 
-from tempfile import mkdtemp
-
 from multiprocessing.managers import BaseManager
 
 # don't remove the apt import, it is really needed, due to some magic in
@@ -23,6 +21,9 @@ import apt_pkg
 
 
 from elbepack.shellhelper import CommandError, system
+from elbepack.filesystem import TmpdirFilesystem
+from elbepack.xmldefaults import ElbeDefaults
+from elbepack.rfs import create_apt_prefs
 
 
 def getdeps(pkg):
@@ -72,37 +73,51 @@ def lookup_uri(v, d, target_pkg):
     return target_pkg, uri, hashval
 
 
-
 class VirtApt(object):
-    def __init__(self, arch, suite, sources, prefs, keylist=None, noauth=False):
+    def __init__(self, xml):
+
+        self.xml = xml
+
+        if xml.has("project/buildtype"):
+            buildtype = xml.text("/project/buildtype")
+        else:
+            buildtype = "nodefaults"
 
-        # pylint: disable=too-many-arguments
+        arch = xml.text("project/buildimage/arch", key="arch")
+        suite = xml.text("project/suite")
 
-        self.projectpath = mkdtemp()
+        self.basefs = TmpdirFilesystem()
         self.initialize_dirs()
 
-        self.create_apt_sources_list(sources)
-        self.create_apt_prefs(prefs)
+        create_apt_prefs(self.xml, self.basefs)
+
+        mirror = self.xml.create_apt_sources_list(build_sources=True, initvm=False)
+        self.basefs.write_file("etc/apt/sources.list", 0o644, mirror)
+
         self.setup_gpg()
-        if keylist:
-            for k in keylist:
-                self.add_pubkey_url(k)
+        self.import_keys()
 
         apt_pkg.config.set("APT::Architecture", arch)
         apt_pkg.config.set("APT::Architectures", arch)
         apt_pkg.config.set("Acquire::http::Proxy::127.0.0.1", "DIRECT")
         apt_pkg.config.set("APT::Install-Recommends", "0")
-        apt_pkg.config.set("Dir::Etc", self.projectpath)
+        apt_pkg.config.set("Dir::Etc", self.basefs.fname('/'))
+        apt_pkg.config.set("Dir::Etc::Trusted",
+                           self.basefs.fname('/etc/apt/trusted.gpg'))
+        apt_pkg.config.set("Dir::Etc::TrustedParts",
+                           self.basefs.fname('/etc/apt/trusted.gpg.d'))
         apt_pkg.config.set("APT::Cache-Limit", "0")
         apt_pkg.config.set("APT::Cache-Start", "32505856")
         apt_pkg.config.set("APT::Cache-Grow", "2097152")
-        apt_pkg.config.set("Dir::State", os.path.join(self.projectpath, "state"))
-        apt_pkg.config.set("Dir::State::status", os.path.join(self.projectpath, "state/status"))
-        apt_pkg.config.set("Dir::Cache", os.path.join(self.projectpath, "cache"))
-        apt_pkg.config.set("Dir::Cache::archives", os.path.join(self.projectpath, "cache/archives"))
-        apt_pkg.config.set("Dir::Etc", os.path.join(self.projectpath, "etc/apt"))
-        apt_pkg.config.set("Dir::Log", os.path.join(self.projectpath, "log"))
-        if noauth:
+        apt_pkg.config.set("Dir::State", self.basefs.fname("state"))
+        apt_pkg.config.set("Dir::State::status",
+                           self.basefs.fname("state/status"))
+        apt_pkg.config.set("Dir::Cache", self.basefs.fname("cache"))
+        apt_pkg.config.set("Dir::Cache::archives",
+                           self.basefs.fname("cache/archives"))
+        apt_pkg.config.set("Dir::Etc", self.basefs.fname("etc/apt"))
+        apt_pkg.config.set("Dir::Log", self.basefs.fname("log"))
+        if self.xml.has('project/noauth'):
             apt_pkg.config.set("APT::Get::AllowUnauthenticated", "1")
             apt_pkg.config.set("Acquire::AllowInsecureRepositories", "1")
         else:
@@ -127,8 +142,27 @@ class VirtApt(object):
         except BaseException as e:
             print(e)
 
-    def __del__(self):
-        os.system('rm -rf "%s"' % self.projectpath)
+    def add_key(self, key):
+        cmd = 'echo "%s" > %s' % (key, self.basefs.fname("tmp/key.pub"))
+        clean = 'rm -f %s' % self.basefs.fname("tmp/key.pub")
+        system(cmd)
+        system('fakeroot apt-key --keyring "%s" add "%s"' %
+               (self.basefs.fname('/etc/apt/trusted.gpg'),
+                self.basefs.fname("tmp/key.pub")))
+        system(clean)
+
+    def import_keys(self):
+        if self.xml.has('project/mirror/url-list'):
+            # Should we use self.xml.prj.has("noauth")???
+            #
+            # If so, this is related to issue #220 -
+            # https://github.com/Linutronix/elbe/issues/220
+            #
+            # I could make a none global 'noauth' flag for mirrors
+            for url in self.xml.node('project/mirror/url-list'):
+                if url.has('raw-key'):
+                    key = "\n".join(line.strip(" \t") for line in url.text('raw-key').splitlines()[1:-1])
+                    self.add_key(key)
 
     def start(self):
         pass
@@ -155,24 +189,18 @@ class VirtApt(object):
             # mode is not set correctly
             os.system("chmod 777 " + newdir)
 
-    def touch(self, fname):
-        if os.path.exists(fname):
-            os.utime(fname, None)
-        else:
-            fp = open(fname, "w")
-            fp.close()
-
     def initialize_dirs(self):
-        self.mkdir_p(os.path.join(self.projectpath, "cache/archives/partial"))
-        self.mkdir_p(os.path.join(self.projectpath, "etc/apt/preferences.d"))
-        self.mkdir_p(os.path.join(self.projectpath, "etc/apt/trusted.gpg.d"))
-        self.mkdir_p(os.path.join(self.projectpath, "db"))
-        self.mkdir_p(os.path.join(self.projectpath, "log"))
-        self.mkdir_p(os.path.join(self.projectpath, "state/lists/partial"))
-        self.touch(os.path.join(self.projectpath, "state/status"))
+        self.basefs.mkdir_p("cache/archives/partial")
+        self.basefs.mkdir_p("etc/apt/preferences.d")
+        self.basefs.mkdir_p("etc/apt/trusted.gpg.d")
+        self.basefs.mkdir_p("db")
+        self.basefs.mkdir_p("log")
+        self.basefs.mkdir_p("state/lists/partial")
+        self.basefs.mkdir_p("tmp")
+        self.basefs.touch_file("state/status")
 
     def setup_gpg(self):
-        ring_path = os.path.join(self.projectpath, "etc/apt/trusted.gpg")
+        ring_path = self.basefs.fname("etc/apt/trusted.gpg")
         if not os.path.isdir("/etc/apt/trusted.gpg.d"):
             print("/etc/apt/trusted.gpg.d doesn't exist")
             print("apt-get install debian-archive-keyring may "
@@ -182,55 +210,12 @@ class VirtApt(object):
         if os.path.exists("/etc/apt/trusted.gpg"):
             system('cp /etc/apt/trusted.gpg "%s"' % ring_path)
 
-        gpg_options = '--keyring "%s" --no-auto-check-trustdb ' \
-                      '--trust-model always --no-default-keyring ' \
-                      '--homedir "%s"' % (ring_path, self.projectpath)
-
         trustkeys = os.listdir("/etc/apt/trusted.gpg.d")
         for key in trustkeys:
-            print("Import %s: " % key)
-            try:
-                system('gpg %s --import "%s"' % (
-                    gpg_options,
-                    os.path.join("/etc/apt/trusted.gpg.d", key)))
-            except CommandError:
-                print("adding elbe-pubkey to keyring failed")
-
-    def add_pubkey_url(self, url):
-        ring_path = os.path.join(self.projectpath, "etc/apt/trusted.gpg")
-        tmpkey_path = os.path.join(self.projectpath, "tmpkey.gpg")
-
-        gpg_options = '--keyring "%s" --no-auto-check-trustdb ' \
-                      '--trust-model always --no-default-keyring ' \
-                      '--homedir "%s"' % (ring_path, self.projectpath)
-
-        try:
-            system('wget -O "%s" "%s"' % (tmpkey_path, url))
-            system('gpg %s --import "%s"' % (
-                gpg_options,
-                tmpkey_path))
-        finally:
-            system('rm "%s"' % tmpkey_path, allow_fail=True)
-
-    def create_apt_sources_list(self, mirror):
-        filename = os.path.join(self.projectpath, "etc/apt/sources.list")
-
-        if os.path.exists(filename):
-            os.remove(filename)
-
-        file = open(filename, "w")
-        file.write(mirror)
-        file.close()
-
-    def create_apt_prefs(self, prefs):
-        filename = os.path.join(self.projectpath, "etc/apt/preferences")
-
-        if os.path.exists(filename):
-            os.remove(filename)
+            system('cp "/etc/apt/trusted.gpg.d/%s" "%s"' % (
+                   key,
+                   ring_path + '.d'))
 
-        file = open(filename, "w")
-        file.write(prefs)
-        file.close()
 
     def get_uri(self, target_pkg, incl_deps=False):
 
-- 
2.11.0




More information about the elbe-devel mailing list